Merge pull request #2317 from jwillemsen/jwi-deleteop
[ACE_TAO.git] / TAO / performance-tests / Protocols / sender.cpp
bloba39c1ad51bb17466682315c7589e7dd7212ef2d8
1 #include "ace/Get_Opt.h"
2 #include "ace/High_Res_Timer.h"
3 #include "ace/Stats.h"
4 #include "ace/Throughput_Stats.h"
5 #include "ace/Sample_History.h"
6 #include "ace/OS_NS_unistd.h"
7 #include "ace/OS_NS_string.h"
8 #include "ace/OS_NS_stdlib.h"
9 #include "ace/OS_NS_time.h"
10 #include "tao/RTCORBA/RTCORBA.h"
11 #include "tao/RTCORBA/RT_Policy_i.h"
12 #include "tao/RTCORBA/Network_Priority_Mapping_Manager.h"
13 #include "tao/RTCORBA/Network_Priority_Mapping.h"
14 #include "tao/ORB_Constants.h"
15 #include "tao/Policy_ManagerC.h"
16 #include "Custom_Network_Priority_Mapping.h"
17 #include "Custom_Network_Priority_Mapping.cpp"
18 #include "tao/debug.h"
19 #include "testC.h"
21 // Types of tests supported.
22 enum Test_Type
24 PACED,
25 THROUGHPUT,
26 LATENCY
29 static const ACE_TCHAR *ior = ACE_TEXT ("file://distributor.ior");
30 static int shutdown_server = 0;
31 static CORBA::ULong iterations = 5;
32 static CORBA::ULong invocation_rate = 5;
33 static int count_missed_end_deadlines = 0;
34 static ACE_High_Res_Timer::global_scale_factor_type gsf = 0;
35 static int do_dump_history = 0;
36 static int print_missed_invocations = 0;
37 static CORBA::ULong message_size = 0;
38 static const ACE_TCHAR *test_protocol = ACE_TEXT ("IIOP");
39 static int print_statistics = 1;
40 static int number_of_connection_attempts = 20;
41 static int enable_diffserv_code_points = 0;
42 static RTCORBA::Priority corba_priority = RTCORBA::minPriority;
43 static Test_Type test_type = PACED;
45 static int
46 parse_args (int argc, ACE_TCHAR **argv)
48 ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("a:b:c:d:e:i:k:m:p:r:s:t:x:"));
49 int c;
51 while ((c = get_opts ()) != -1)
52 switch (c)
54 case 'a':
55 test_type = static_cast<Test_Type> (ACE_OS::atoi (get_opts.opt_arg ()));
56 break;
58 case 'b':
59 enable_diffserv_code_points = ACE_OS::atoi (get_opts.opt_arg ());
60 break;
62 case 'c':
63 corba_priority = ACE_OS::atoi (get_opts.opt_arg ());
64 break;
66 case 'd':
67 do_dump_history = ACE_OS::atoi (get_opts.opt_arg ());
68 break;
70 case 'e':
71 count_missed_end_deadlines = ACE_OS::atoi (get_opts.opt_arg ());
72 break;
74 case 'i':
75 iterations = ACE_OS::atoi (get_opts.opt_arg ());
76 break;
78 case 'k':
79 ior = get_opts.opt_arg ();
80 break;
82 case 'm':
83 print_missed_invocations = ACE_OS::atoi (get_opts.opt_arg ());
84 break;
86 case 'p':
87 test_protocol = get_opts.opt_arg ();
88 break;
90 case 'r':
91 invocation_rate = ACE_OS::atoi (get_opts.opt_arg ());
92 break;
94 case 's':
95 message_size = ACE_OS::atoi (get_opts.opt_arg ());
96 break;
98 case 't':
99 print_statistics = ACE_OS::atoi (get_opts.opt_arg ());
100 break;
102 case 'x':
103 shutdown_server = ACE_OS::atoi (get_opts.opt_arg ());
104 break;
106 default:
108 const ACE_TCHAR *test = 0;
109 switch (test_type)
111 case PACED:
112 test = ACE_TEXT ("PACED");
113 break;
114 case THROUGHPUT:
115 test = ACE_TEXT ("THROUGHPUT");
116 break;
117 case LATENCY:
118 test = ACE_TEXT ("LATENCY");
119 break;
122 ACE_ERROR_RETURN ((LM_ERROR,
123 "usage: %s\n"
124 "\t-a <test type> (defaults to %s [valid values are PACED(%d), THROUGHPUT(%d), and LATENCY(%d))\n"
125 "\t-b <enable diffserv code points> (defaults to %d)\n"
126 "\t-c <corba priority> (defaults to %d)\n"
127 "\t-d <show history> (defaults to %d)\n"
128 "\t-e <count missed end deadlines> (defaults to %d)\n"
129 "\t-h <help: shows options menu>\n"
130 "\t-i <iterations> (defaults to %d)\n"
131 "\t-k <ior> (defaults to %s)\n"
132 "\t-m <print missed invocations for paced workers> (defaults to %d)\n"
133 "\t-p <test protocol> (defaults to %s [valid values are IIOP, DIOP, and SCIOP])\n"
134 "\t-r <invocation rate> (defaults to %d)\n"
135 "\t-s <message size> (defaults to %d)\n"
136 "\t-t <print stats> (defaults to %d)\n"
137 "\t-x <shutdown server> (defaults to %d)\n"
138 "\n",
139 argv[0],
140 test, PACED, THROUGHPUT, LATENCY,
141 enable_diffserv_code_points,
142 corba_priority,
143 do_dump_history,
144 count_missed_end_deadlines,
145 iterations,
146 ior,
147 print_missed_invocations,
148 test_protocol,
149 invocation_rate,
150 message_size,
151 print_statistics,
152 shutdown_server),
153 -1);
157 return 0;
160 double
161 to_seconds (ACE_UINT64 hrtime,
162 ACE_High_Res_Timer::global_scale_factor_type sf)
164 double seconds = static_cast<double> (
165 ACE_UINT64_DBLCAST_ADAPTER (hrtime / sf));
166 seconds /= ACE_HR_SCALE_CONVERSION;
168 return seconds;
171 ACE_UINT64
172 to_hrtime (double seconds,
173 ACE_High_Res_Timer::global_scale_factor_type sf)
175 return ACE_UINT64 (seconds * sf * ACE_HR_SCALE_CONVERSION);
178 class Worker
180 public:
181 Worker (CORBA::ORB_ptr orb,
182 RTCORBA::RTORB_ptr rtorb,
183 CORBA::PolicyManager_ptr policy_manager,
184 test_ptr test);
186 void run ();
188 void print_stats ();
190 void setup ();
192 private:
193 ACE_hrtime_t deadline_for_current_call (CORBA::ULong i);
194 void missed_start_deadline (CORBA::ULong invocation);
195 void missed_end_deadline (CORBA::ULong invocation);
197 RTCORBA::RTORB_var rtorb_;
198 CORBA::PolicyManager_var policy_manager_;
199 test_var test_;
200 ACE_Sample_History history_;
201 ACE_hrtime_t interval_between_calls_;
202 ACE_hrtime_t test_start_;
203 ACE_hrtime_t test_end_;
204 CORBA::ULong missed_start_deadlines_;
205 CORBA::ULong missed_end_deadlines_;
207 typedef ACE_Array_Base<CORBA::ULong> Missed_Invocations;
208 Missed_Invocations missed_start_invocations_;
209 Missed_Invocations missed_end_invocations_;
211 CORBA::PolicyList base_protocol_policy_;
212 CORBA::PolicyList test_protocol_policy_;
214 CORBA::Long session_id_;
217 Worker::Worker (CORBA::ORB_ptr orb,
218 RTCORBA::RTORB_ptr rtorb,
219 CORBA::PolicyManager_ptr policy_manager,
220 test_ptr test)
221 : rtorb_ (RTCORBA::RTORB::_duplicate (rtorb)),
222 policy_manager_ (CORBA::PolicyManager::_duplicate (policy_manager)),
223 test_ (test::_duplicate (test)),
224 history_ (iterations),
225 interval_between_calls_ (),
226 missed_start_deadlines_ (0),
227 missed_end_deadlines_ (0),
228 missed_start_invocations_ (iterations),
229 missed_end_invocations_ (iterations)
231 // Each sender will have a random session id. This helps in
232 // identifying late packets arriving at the server.
233 ACE_OS::srand ((unsigned) ACE_OS::time (0));
234 this->session_id_ = ACE_OS::rand ();
236 // Interval is inverse of rate.
237 this->interval_between_calls_ =
238 to_hrtime (1 / double (invocation_rate), gsf);
240 // Base protocol is used for setting up and tearing down the test.
241 this->base_protocol_policy_.length (1);
243 // Test protocol is the one being tested.
244 this->test_protocol_policy_.length (1);
246 RTCORBA::ProtocolProperties_var base_transport_protocol_properties =
247 TAO_Protocol_Properties_Factory::create_transport_protocol_property (IOP::TAG_INTERNET_IOP,
248 orb->orb_core ());
250 RTCORBA::TCPProtocolProperties_var tcp_base_transport_protocol_properties =
251 RTCORBA::TCPProtocolProperties::_narrow (base_transport_protocol_properties.in ());
253 tcp_base_transport_protocol_properties->enable_network_priority (enable_diffserv_code_points);
255 RTCORBA::ProtocolList protocols;
256 protocols.length (1);
257 protocols[0].transport_protocol_properties =
258 base_transport_protocol_properties;
259 protocols[0].orb_protocol_properties =
260 RTCORBA::ProtocolProperties::_nil ();
262 // IIOP is always used for the base protocol.
263 protocols[0].protocol_type = IOP::TAG_INTERNET_IOP;
265 // User decides the test protocol.
266 this->base_protocol_policy_[0] =
267 this->rtorb_->create_client_protocol_policy (protocols);
269 if (ACE_OS::strcmp (test_protocol, ACE_TEXT ("DIOP")) == 0)
271 if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, "test protocol is DIOP\n"));
272 protocols[0].protocol_type = TAO_TAG_DIOP_PROFILE;
274 else if (ACE_OS::strcmp (test_protocol, ACE_TEXT ("SCIOP")) == 0)
276 if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, "test protocol is SCIOP\n"));
277 protocols[0].protocol_type = TAO_TAG_SCIOP_PROFILE;
279 else
281 if (TAO_debug_level) ACE_DEBUG ((LM_DEBUG, "test protocol is IIOP\n"));
282 protocols[0].protocol_type = IOP::TAG_INTERNET_IOP;
285 RTCORBA::ProtocolProperties_var test_transport_protocol_properties =
286 TAO_Protocol_Properties_Factory::create_transport_protocol_property (protocols[0].protocol_type,
287 orb->orb_core ());
289 if (protocols[0].protocol_type == TAO_TAG_DIOP_PROFILE)
291 RTCORBA::UserDatagramProtocolProperties_var udp_test_transport_protocol_properties =
292 RTCORBA::UserDatagramProtocolProperties::_narrow (test_transport_protocol_properties.in ());
294 udp_test_transport_protocol_properties->enable_network_priority (enable_diffserv_code_points);
296 else if (protocols[0].protocol_type == TAO_TAG_SCIOP_PROFILE)
298 RTCORBA::StreamControlProtocolProperties_var sctp_test_transport_protocol_properties =
299 RTCORBA::StreamControlProtocolProperties::_narrow (test_transport_protocol_properties.in ());
301 sctp_test_transport_protocol_properties->enable_network_priority (enable_diffserv_code_points);
303 else if (protocols[0].protocol_type == IOP::TAG_INTERNET_IOP)
305 RTCORBA::TCPProtocolProperties_var tcp_test_transport_protocol_properties =
306 RTCORBA::TCPProtocolProperties::_narrow (test_transport_protocol_properties.in ());
308 tcp_test_transport_protocol_properties->enable_network_priority (enable_diffserv_code_points);
311 protocols[0].transport_protocol_properties =
312 test_transport_protocol_properties;
314 this->test_protocol_policy_[0] =
315 this->rtorb_->create_client_protocol_policy (protocols);
318 void
319 Worker::print_stats ()
321 CORBA::ULong missed_total_deadlines =
322 this->missed_start_deadlines_ + this->missed_end_deadlines_;
324 CORBA::ULong made_total_deadlines =
325 iterations - missed_total_deadlines;
327 ACE_DEBUG ((LM_DEBUG,
328 "\n************ Statistics ************\n\n"));
331 // Senders-side stats for PACED invocations are not too relevant
332 // since we are doing one way calls.
334 if (test_type == PACED)
336 ACE_DEBUG ((LM_DEBUG,
337 "Rate = %d/sec; Iterations = %d; ",
338 invocation_rate,
339 iterations));
341 if (count_missed_end_deadlines)
342 ACE_DEBUG ((LM_DEBUG,
343 "Deadlines made/missed[start,end]/%% = %d/%d[%d,%d]/%.2f%%; Effective Rate = %.2f\n",
344 made_total_deadlines,
345 missed_total_deadlines,
346 this->missed_start_deadlines_,
347 this->missed_end_deadlines_,
348 made_total_deadlines * 100 / (double) iterations,
349 made_total_deadlines / to_seconds (this->test_end_ - this->test_start_, gsf)));
350 else
351 ACE_DEBUG ((LM_DEBUG,
352 "Deadlines made/missed/%% = %d/%d/%.2f%%; Effective Rate = %.2f\n",
353 made_total_deadlines,
354 missed_total_deadlines,
355 made_total_deadlines * 100 / (double) iterations,
356 made_total_deadlines / to_seconds (this->test_end_ - this->test_start_, gsf)));
358 if (print_missed_invocations)
360 ACE_DEBUG ((LM_DEBUG, "\nMissed start invocations are:\n"));
362 for (CORBA::ULong j = 0;
363 j < this->missed_start_deadlines_;
364 ++j)
366 ACE_DEBUG ((LM_DEBUG,
367 "%d ",
368 this->missed_start_invocations_[j]));
371 ACE_DEBUG ((LM_DEBUG, "\n"));
373 if (count_missed_end_deadlines)
375 ACE_DEBUG ((LM_DEBUG, "\nMissed end invocations are:\n"));
377 for (CORBA::ULong j = 0;
378 j < this->missed_end_deadlines_;
379 ++j)
381 ACE_DEBUG ((LM_DEBUG,
382 "%d ",
383 this->missed_end_invocations_[j]));
386 ACE_DEBUG ((LM_DEBUG, "\n"));
391 // Individual calls are relevant for the PACED and LATENCY tests.
392 if (test_type == PACED ||
393 test_type == LATENCY)
395 if (do_dump_history)
397 this->history_.dump_samples (ACE_TEXT("HISTORY"), gsf);
400 ACE_Basic_Stats stats;
401 this->history_.collect_basic_stats (stats);
402 stats.dump_results (ACE_TEXT("Total"), gsf);
404 ACE_Throughput_Stats::dump_throughput (ACE_TEXT("Total"), gsf,
405 this->test_end_ - this->test_start_,
406 iterations);
408 else
410 ACE_hrtime_t elapsed_time =
411 this->test_end_ - this->test_start_;
413 double seconds =
414 to_seconds (elapsed_time, gsf);
416 ACE_hrtime_t bits = iterations;
417 bits *= message_size * 8;
419 ACE_DEBUG ((LM_DEBUG,
420 "%Q bits sent in %5.1f seconds at a rate of %5.2f Mbps\n",
421 bits,
422 seconds,
423 ACE_UINT64_DBLCAST_ADAPTER (bits) / seconds / 1000 / 1000));
427 ACE_hrtime_t
428 Worker::deadline_for_current_call (CORBA::ULong i)
430 ACE_hrtime_t deadline_for_current_call =
431 this->interval_between_calls_;
433 deadline_for_current_call *= (i + 1);
435 deadline_for_current_call += this->test_start_;
437 return deadline_for_current_call;
440 void
441 Worker::missed_start_deadline (CORBA::ULong invocation)
443 this->missed_start_invocations_[this->missed_start_deadlines_++] =
444 invocation;
447 void
448 Worker::missed_end_deadline (CORBA::ULong invocation)
450 if (count_missed_end_deadlines)
451 this->missed_end_invocations_[this->missed_end_deadlines_++] =
452 invocation;
455 void
456 Worker::setup ()
458 // Make sure we have a connection to the server using the test
459 // protocol.
460 this->policy_manager_->set_policy_overrides (this->test_protocol_policy_,
461 CORBA::SET_OVERRIDE);
463 // Since the network maybe unavailable temporarily, make sure to try
464 // for a few times before giving up.
465 for (int j = 0;;)
467 test_protocol_setup:
471 // Send a message to ensure that the connection is setup.
472 this->test_->oneway_sync ();
474 goto test_protocol_success;
476 catch (const CORBA::TRANSIENT&)
478 ++j;
480 if (j < number_of_connection_attempts)
482 ACE_OS::sleep (1);
483 goto test_protocol_setup;
487 ACE_ERROR ((LM_ERROR,
488 "Cannot setup test protocol\n"));
490 ACE_OS::exit (-1);
493 test_protocol_success:
495 // Use IIOP for setting up the test since the test protocol maybe
496 // unreliable.
497 this->policy_manager_->set_policy_overrides (this->base_protocol_policy_,
498 CORBA::SET_OVERRIDE);
500 // Since the network maybe unavailable temporarily, make sure to try
501 // for a few times before giving up.
502 for (int k = 0;;)
504 base_protocol_setup:
508 // Let the server know what to expect..
509 this->test_->start_test (this->session_id_,
510 ACE_TEXT_ALWAYS_CHAR (test_protocol),
511 invocation_rate,
512 message_size,
513 iterations);
515 goto base_protocol_success;
517 catch (const CORBA::TRANSIENT&)
519 ACE_OS::sleep (1);
521 if (k < number_of_connection_attempts)
523 ACE_OS::sleep (1);
524 goto base_protocol_setup;
528 ACE_ERROR ((LM_ERROR,
529 "Cannot setup base protocol\n"));
531 ACE_OS::exit (-1);
534 base_protocol_success:
536 return;
539 void
540 Worker::run ()
542 // Select the test protocol for these invocation.
543 this->policy_manager_->set_policy_overrides (this->test_protocol_policy_,
544 CORBA::SET_OVERRIDE);
546 // Payload.
547 ::test::octets_var payload (new ::test::octets);
548 payload->length (message_size);
550 CORBA::Octet *buffer =
551 payload->get_buffer ();
553 // Not necessary but good for debugging.
554 ACE_OS::memset (buffer,
556 message_size * sizeof (CORBA::Octet));
558 // Record the start time of the test.
559 this->test_start_ =
560 ACE_OS::gethrtime ();
562 // Test with several iterations.
563 for (CORBA::ULong i = 0;
564 i < iterations;
565 ++i)
567 ACE_hrtime_t time_before_call = 0;
568 ACE_hrtime_t deadline_for_current_call = 0;
570 // For PACED and LATENCY, each sender call is individually
571 // noted.
572 if (test_type == PACED ||
573 test_type == LATENCY)
575 time_before_call =
576 ACE_OS::gethrtime ();
578 // Pacing code.
579 if (test_type == PACED)
581 deadline_for_current_call =
582 this->deadline_for_current_call (i);
584 if (time_before_call > deadline_for_current_call)
586 this->missed_start_deadline (i);
587 continue;
592 // Use oneways for PACING and THROUGHPUT.
593 if (test_type == PACED ||
594 test_type == THROUGHPUT)
596 this->test_->oneway_method (this->session_id_,
598 payload.in ());
600 else
602 // Use twoway calls for LATENCY.
603 this->test_->twoway_method (this->session_id_,
605 payload.inout ());
608 // For PACED and LATENCY, each sender call is individually
609 // noted.
610 if (test_type == PACED ||
611 test_type == LATENCY)
613 ACE_hrtime_t time_after_call =
614 ACE_OS::gethrtime ();
616 if (test_type == LATENCY)
617 this->history_.sample ((time_after_call - time_before_call) / 2);
618 else
619 this->history_.sample (time_after_call - time_before_call);
621 if (test_type == PACED)
623 if (time_after_call > deadline_for_current_call)
625 this->missed_end_deadline (i);
626 continue;
629 ACE_hrtime_t sleep_time =
630 deadline_for_current_call - time_after_call;
632 ACE_OS::sleep (ACE_Time_Value (0,
633 long (to_seconds (sleep_time, gsf) *
634 ACE_ONE_SECOND_IN_USECS)));
639 // This call is used to ensure that all the THROUGHPUT related data
640 // has reached the server.
641 if (test_type == THROUGHPUT &&
642 ACE_OS::strcmp (test_protocol, ACE_TEXT ("DIOP")) != 0)
644 this->test_->twoway_sync ();
647 // Record end time for the test.
648 this->test_end_ = ACE_OS::gethrtime ();
650 // Use IIOP to indicate end of test to server.
651 this->policy_manager_->set_policy_overrides (this->base_protocol_policy_,
652 CORBA::SET_OVERRIDE);
654 // Tell server that the test is over.
655 this->test_->end_test ();
659 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
661 gsf = ACE_High_Res_Timer::global_scale_factor ();
665 CORBA::ORB_var orb =
666 CORBA::ORB_init (argc, argv);
668 CORBA::Object_var object =
669 orb->resolve_initial_references ("RTORB");
671 RTCORBA::RTORB_var rtorb =
672 RTCORBA::RTORB::_narrow (object.in ());
674 object =
675 orb->resolve_initial_references ("ORBPolicyManager");
677 CORBA::PolicyManager_var policy_manager =
678 CORBA::PolicyManager::_narrow (object.in ());
680 int parse_args_result =
681 parse_args (argc, argv);
682 if (parse_args_result != 0)
683 return parse_args_result;
685 // Resolve the Network priority Mapping Manager
686 object =
687 orb->resolve_initial_references ("NetworkPriorityMappingManager");
689 RTCORBA::NetworkPriorityMappingManager_var mapping_manager =
690 RTCORBA::NetworkPriorityMappingManager::_narrow (object.in ());
692 // Initialize the custom priority mapping
693 Custom_Network_Priority_Mapping *cnpm = 0;
694 ACE_NEW_RETURN (cnpm,
695 Custom_Network_Priority_Mapping,
696 -1);
698 // Set the desired corba priority on the network mapping manager
699 cnpm->corba_priority (corba_priority);
701 // Load the custom network priority mapping object in the
702 // network priority mapping manager. The user can thus add his
703 // own priority mapping.
704 mapping_manager->mapping (cnpm);
706 object =
707 orb->string_to_object (ior);
709 test_var test =
710 test::_narrow (object.in ());
712 Worker worker (orb.in (),
713 rtorb.in (),
714 policy_manager.in (),
715 test.in ());
717 worker.setup ();
719 worker.run ();
721 if (print_statistics)
722 worker.print_stats ();
724 if (shutdown_server)
726 test->shutdown ();
728 ACE_OS::sleep(1);
730 catch (const CORBA::Exception& ex)
732 ex._tao_print_exception ("Error!");
733 return -1;
736 return 0;