Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / TAO / orbsvcs / tests / Notify / performance-tests / RedGreen / RedGreen_Test.cpp
blob224aa3988d9defc7a09bb82f3cb5c46236d1f448
1 // -*- C++ -*-
3 #include "RedGreen_Test.h"
4 #include "ace/Arg_Shifter.h"
5 #include "ace/Get_Opt.h"
6 #include "ace/OS_NS_unistd.h"
7 #include "orbsvcs/Time_Utilities.h"
8 #include "tao/debug.h"
10 #define NOTIFY_FACTORY_NAME "NotifyEventChannelFactory"
11 #define NAMING_SERVICE_NAME "NameService"
13 #define DOMAIN_GREEN "DOMAIN_GREEN"
14 #define DOMAIN_RED "DOMAIN_RED"
16 #define TYPE_GREEN "TYPE_GREEN"
17 #define TYPE_RED "TYPE_RED"
19 ACE_Atomic_Op <TAO_SYNCH_MUTEX, int> g_result_count = 0;
20 ACE_hrtime_t g_throughput_start_;
22 static bool consumer_is_done = false;
24 int
25 RedGreen_Test::parse_args (int argc,
26 ACE_TCHAR *argv[])
28 ACE_Arg_Shifter arg_shifter (argc, argv);
30 const ACE_TCHAR *current_arg = 0;
32 while (arg_shifter.is_anything_left ())
34 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-burst_size"))))
36 this->burst_size_ = ACE_OS::atoi (current_arg);
37 ACE_DEBUG ((LM_DEBUG,
38 "Burst size = %d\n",
39 burst_size_));
40 // The number of events to send/receive.
41 arg_shifter.consume_arg ();
43 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT ("-?")) == 0)
45 ACE_DEBUG((LM_DEBUG,
46 "usage: %s "
47 "-burst_size [count]\n",
48 argv[0],
49 argv[0]));
51 arg_shifter.consume_arg ();
53 return -1;
55 else
57 arg_shifter.ignore_arg ();
61 return 0;
64 RedGreen_Test::RedGreen_Test ()
65 : burst_size_ (10),
66 nthreads_ (2)
68 // No-Op.
69 ifgop_ = CosNotifyChannelAdmin::OR_OP;
72 void
73 RedGreen_Test::destroy_ec ()
75 if (!CORBA::is_nil (ec_.in ()))
77 // Even though we still have a reference, there's no guarantee
78 // the EC is still around. So, trap exceptions.
79 try
81 this->ec_->destroy ();
83 catch (const CORBA::COMM_FAILURE&)
85 // Silently swallow this b/c this could mean the EC is gone
86 // or that the network is hosed. Either way, we're not waiting
87 // around to figure out the problem. Report the incident to the
88 // log and be done with it.
89 ACE_DEBUG ((LM_INFO,
90 "INFO: Got a COMM_FAILURE exception while trying to\n"
91 " invoke `destroy()' on the Event Channel in the\n"
92 " RedGreen destroy_ec. This is likely not a problem.\n"));
94 catch (const CORBA::Exception& ex)
96 ex._tao_print_exception ("in RedGreen destructor; swallowing.\n");
99 this->ec_ = CosNotifyChannelAdmin::EventChannel::_nil ();
104 void
105 RedGreen_Test::init (int argc,
106 ACE_TCHAR *argv [])
108 this->init_ORB (argc,
109 argv);
110 this->resolve_naming_service ();
111 this->resolve_Notify_factory ();
112 this->create_EC ();
113 this->create_supplieradmin ();
114 this->create_consumeradmin ();
115 this->create_consumers ();
116 this->create_suppliers ();
119 void
120 RedGreen_Test::run ()
124 this->send_events ();
126 while (! consumer_is_done)
128 ACE_Time_Value tv(0, 10 * 1000);
129 this->orb_->run(tv);
132 this->destroy_ec ();
133 this->orb_->shutdown (false);
135 catch (const CORBA::Exception& ex)
137 ex._tao_print_exception ("Supplier:");
138 throw;
141 worker_.thr_mgr ()->wait ();
144 void
145 RedGreen_Test::done ()
147 dump_results ();
148 worker_.done ();
151 void
152 RedGreen_Test::init_ORB (int argc, ACE_TCHAR *argv [])
154 this->orb_ = CORBA::ORB_init (argc, argv);
156 CORBA::Object_ptr poa_object =
157 this->orb_->resolve_initial_references("RootPOA");
159 if (CORBA::is_nil (poa_object))
161 ACE_ERROR ((LM_ERROR,
162 " (%P|%t) Unable to initialize the POA.\n"));
163 return;
165 this->root_poa_ =
166 PortableServer::POA::_narrow (poa_object);
168 PortableServer::POAManager_var poa_manager =
169 root_poa_->the_POAManager ();
171 poa_manager->activate ();
173 worker_.orb (this->orb_.in ());
175 if (worker_.activate (THR_NEW_LWP | THR_JOINABLE,
176 this->nthreads_) != 0)
178 ACE_ERROR ((LM_ERROR,
179 "Cannot activate client threads\n"));
183 void
184 RedGreen_Test::resolve_naming_service ()
186 CORBA::Object_var naming_obj =
187 this->orb_->resolve_initial_references (NAMING_SERVICE_NAME);
189 // Need to check return value for errors.
190 if (CORBA::is_nil (naming_obj.in ()))
192 throw CORBA::UNKNOWN ();
195 this->naming_context_ =
196 CosNaming::NamingContext::_narrow (naming_obj.in ());
199 void
200 RedGreen_Test::resolve_Notify_factory ()
202 CosNaming::Name name (1);
203 name.length (1);
204 name[0].id = CORBA::string_dup (NOTIFY_FACTORY_NAME);
206 CORBA::Object_var obj =
207 this->naming_context_->resolve (name);
209 this->notify_factory_ =
210 CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in ());
213 void
214 RedGreen_Test::create_EC ()
216 CosNotifyChannelAdmin::ChannelID id;
218 this->ec_ = notify_factory_->create_channel (this->initial_qos_,
219 this->initial_admin_,
220 id);
222 ACE_ASSERT (!CORBA::is_nil (ec_.in ()));
225 void
226 RedGreen_Test::create_supplieradmin ()
228 CosNotifyChannelAdmin::AdminID adminid;
230 supplier_admin_ =
231 ec_->new_for_suppliers (this->ifgop_,
232 adminid);
234 ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));
237 void
238 RedGreen_Test::create_consumeradmin ()
240 CosNotifyChannelAdmin::AdminID adminid;
242 consumer_admin_ =
243 ec_->new_for_consumers (this->ifgop_,
244 adminid);
246 ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ()));
249 void
250 RedGreen_Test::create_consumers ()
252 ACE_NEW (this->normal_consumer_,
253 RedGreen_Test_StructuredPushConsumer (this));
254 this->normal_consumer_->connect (this->consumer_admin_.in ());
256 ACE_NEW (this->slow_consumer_,
257 SlowConsumer (this));
258 this->slow_consumer_->connect (this->consumer_admin_.in ());
261 void
262 RedGreen_Test::create_suppliers ()
264 ACE_NEW (this->supplier_,
265 RedGreen_Test_StructuredPushSupplier ());
266 this->supplier_->connect (this->supplier_admin_.in ());
269 void
270 RedGreen_Test::send_events ()
272 // Setup the Consumer 1 to receive
273 //event_type : "DOMAIN_GREEN", "DOMAIN_GREEN".
274 CosNotification::EventTypeSeq added_1(1);
275 added_1.length (1);
276 CosNotification::EventTypeSeq removed_1 (0);
277 removed_1.length (0);
279 added_1[0].domain_name = CORBA::string_dup (DOMAIN_GREEN);
280 added_1[0].type_name = CORBA::string_dup (TYPE_GREEN);
282 this->normal_consumer_->get_proxy_supplier ()->subscription_change (added_1, removed_1);
284 // Setup the Consumer 2 to receive event_type : "DOMAIN_RED", "TYPE_RED"
285 CosNotification::EventTypeSeq added_2(1);
286 added_2.length (1);
287 CosNotification::EventTypeSeq removed_2 (0);
288 removed_2.length (0);
290 added_2[0].domain_name = CORBA::string_dup (DOMAIN_RED);
291 added_2[0].type_name = CORBA::string_dup (TYPE_RED);
293 this->slow_consumer_->get_proxy_supplier ()->subscription_change (added_2, removed_2);
295 // Create the events - one of each type
297 // Event 2
298 CosNotification::StructuredEvent green_event;
299 green_event.header.fixed_header.event_type.domain_name =
300 CORBA::string_dup(DOMAIN_GREEN);
301 green_event.header.fixed_header.event_type.type_name =
302 CORBA::string_dup(TYPE_GREEN);
303 green_event.header.fixed_header.event_name = CORBA::string_dup ("");
304 green_event.header.variable_header.length (0); // put nothing here
305 green_event.filterable_data.length (0);
306 green_event.remainder_of_body <<= (CORBA::Long)10;
308 // event 3
309 CosNotification::StructuredEvent red_event;
310 red_event.header.fixed_header.event_type.domain_name =
311 CORBA::string_dup(DOMAIN_RED);
312 red_event.header.fixed_header.event_type.type_name =
313 CORBA::string_dup(TYPE_RED);
314 red_event.header.fixed_header.event_name = CORBA::string_dup("");
315 red_event.header.variable_header.length (0); // put nothing here
316 red_event.filterable_data.length (0);
317 red_event.remainder_of_body <<= (CORBA::Long)10;
319 g_throughput_start_ = ACE_OS::gethrtime ();
321 // let supplier 1 send all these events
322 for (int i = 0; i < this->burst_size_; ++i)
324 this->supplier_->send_event (red_event);
326 this->supplier_->send_event (green_event);
331 void
332 RedGreen_Test::dump_results ()
334 ACE_Throughput_Stats throughput;
335 ACE_High_Res_Timer::global_scale_factor_type gsf =
336 ACE_High_Res_Timer::global_scale_factor ();
337 ACE_TCHAR buf[BUFSIZ];
339 ACE_OS::sprintf (buf,
340 ACE_TEXT ("Normal Consumer [%02d]"),
342 normal_consumer_->dump_stats (buf,
343 gsf);
344 normal_consumer_->accumulate_into (throughput);
346 ACE_OS::sprintf (buf,
347 ACE_TEXT ("Slow Consumer [%02d]"),
349 slow_consumer_->dump_stats (buf,
350 gsf);
351 slow_consumer_->accumulate_into (throughput);
353 ACE_DEBUG ((LM_DEBUG,
354 "\n"));
356 ACE_Throughput_Stats suppliers;
358 ACE_OS::sprintf (buf,
359 ACE_TEXT ("Supplier [%02d]"),
362 this->supplier_->dump_stats (buf, gsf);
363 this->supplier_->accumulate_into (suppliers);
365 ACE_DEBUG ((LM_DEBUG, "\nTotals:\n"));
366 throughput.dump_results (ACE_TEXT("Notify_Consumer/totals"), gsf);
368 ACE_DEBUG ((LM_DEBUG, "\n"));
369 suppliers.dump_results (ACE_TEXT("Notify_Supplier/totals"), gsf);
372 // *****************************************************************
374 RedGreen_Test_StructuredPushConsumer::RedGreen_Test_StructuredPushConsumer (
375 RedGreen_Test* RedGreen_Test
377 : RedGreen_Test_ (RedGreen_Test),
378 push_count_ (0)
382 RedGreen_Test_StructuredPushConsumer::~RedGreen_Test_StructuredPushConsumer (
383 void
388 void
389 RedGreen_Test_StructuredPushConsumer::accumulate_into (
390 ACE_Throughput_Stats &throughput) const
392 throughput.accumulate (this->throughput_);
395 void
396 RedGreen_Test_StructuredPushConsumer::dump_stats (
397 const ACE_TCHAR* msg,
398 ACE_High_Res_Timer::global_scale_factor_type gsf)
400 this->throughput_.dump_results (msg, gsf);
403 void
404 RedGreen_Test_StructuredPushConsumer::connect (
405 CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
408 // Activate the consumer with the default_POA_.
409 CosNotifyComm::StructuredPushConsumer_var objref =
410 this->_this ();
412 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier =
413 consumer_admin->obtain_notification_push_supplier (
414 CosNotifyChannelAdmin::STRUCTURED_EVENT,
415 proxy_supplier_id_);
417 ACE_ASSERT (!CORBA::is_nil (proxysupplier.in ()));
419 // narrow
420 this->proxy_supplier_ =
421 CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow (
422 proxysupplier.in ());
424 ACE_ASSERT (!CORBA::is_nil (proxy_supplier_.in ()));
426 proxy_supplier_->connect_structured_push_consumer (objref.in ());
429 void
430 RedGreen_Test_StructuredPushConsumer::disconnect ()
432 this->proxy_supplier_->
433 disconnect_structured_push_supplier ();
436 void
437 RedGreen_Test_StructuredPushConsumer::offer_change (
438 const CosNotification::EventTypeSeq & /*added*/,
439 const CosNotification::EventTypeSeq & /*removed*/
442 // No-Op.
445 void
446 RedGreen_Test_StructuredPushConsumer::push_structured_event (
447 const CosNotification::StructuredEvent & notification
450 ACE_GUARD (TAO_SYNCH_MUTEX,
451 ace_mon,
452 this->lock_);
453 const char* domain_name =
454 notification.header.fixed_header.event_type.domain_name;
456 const char* type_name =
457 notification.header.fixed_header.event_type.type_name;
459 if (TAO_debug_level)
460 ACE_DEBUG ((LM_DEBUG,
461 "Consumer %d event, domain = %s, type = %s\n",
462 this->proxy_supplier_id_,
463 domain_name,
464 type_name));
466 TimeBase::TimeT latency_base_recorded;
467 ACE_hrtime_t latency_base;
469 notification.filterable_data[0].value >>= latency_base_recorded;
471 ORBSVCS_Time::TimeT_to_hrtime (latency_base,
472 latency_base_recorded);
474 ++this->push_count_;
476 // Grab timestamp again.
477 ACE_hrtime_t now = ACE_OS::gethrtime ();
479 // Record statistics.
480 this->throughput_.sample (now - g_throughput_start_,
481 now - latency_base);
484 if (++g_result_count == 2*RedGreen_Test_->burst_size_)
486 RedGreen_Test_->done ();
490 void
491 RedGreen_Test_StructuredPushConsumer::disconnect_structured_push_consumer (
494 // No-Op.
497 CosNotifyChannelAdmin::StructuredProxyPushSupplier_ptr
498 RedGreen_Test_StructuredPushConsumer::get_proxy_supplier ()
500 return this->proxy_supplier_.in ();
503 // *****************************************************************
505 SlowConsumer::SlowConsumer (RedGreen_Test* RedGreen_Test)
506 : RedGreen_Test_StructuredPushConsumer (RedGreen_Test)
510 void
511 SlowConsumer::push_structured_event (
512 const CosNotification::StructuredEvent & notification
515 // Slow it down ...
516 ACE_OS::sleep (1);
518 RedGreen_Test_StructuredPushConsumer::push_structured_event (notification);
521 // *****************************************************************
523 RedGreen_Test_StructuredPushSupplier::RedGreen_Test_StructuredPushSupplier ()
527 RedGreen_Test_StructuredPushSupplier::~RedGreen_Test_StructuredPushSupplier ()
531 void
532 RedGreen_Test_StructuredPushSupplier::accumulate_into (
533 ACE_Throughput_Stats &throughput) const
535 throughput.accumulate (this->throughput_);
538 void
539 RedGreen_Test_StructuredPushSupplier::dump_stats (
540 const ACE_TCHAR* msg,
541 ACE_High_Res_Timer::global_scale_factor_type gsf)
543 this->throughput_.dump_results (msg, gsf);
546 void
547 RedGreen_Test_StructuredPushSupplier::connect (
548 CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin
551 CosNotifyComm::StructuredPushSupplier_var objref =
552 this->_this ();
554 CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer =
555 supplier_admin->obtain_notification_push_consumer (
556 CosNotifyChannelAdmin::STRUCTURED_EVENT,
557 proxy_consumer_id_);
559 ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ()));
561 // narrow
562 this->proxy_consumer_ =
563 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (
564 proxyconsumer.in ());
566 ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ()));
568 proxy_consumer_->connect_structured_push_supplier (objref.in ());
571 void
572 RedGreen_Test_StructuredPushSupplier::disconnect ()
574 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
576 this->proxy_consumer_->disconnect_structured_push_consumer ();
579 void
580 RedGreen_Test_StructuredPushSupplier::subscription_change (
581 const CosNotification::EventTypeSeq & /*added*/,
582 const CosNotification::EventTypeSeq &) /*removed */
584 //No-Op.
587 void
588 RedGreen_Test_StructuredPushSupplier::send_event (
589 CosNotification::StructuredEvent& event)
591 event.filterable_data.length (1);
592 event.filterable_data[0].name = CORBA::string_dup("latency_base");
594 // Record current time.
595 ACE_hrtime_t start = ACE_OS::gethrtime ();
597 TimeBase::TimeT latency_base;
598 ORBSVCS_Time::hrtime_to_TimeT (latency_base,
599 start);
600 // any
601 event.filterable_data[0].value <<= latency_base;
602 proxy_consumer_->push_structured_event (event);
604 ACE_hrtime_t end = ACE_OS::gethrtime ();
606 this->throughput_.sample (end - g_throughput_start_,
607 end - start);
610 void
611 RedGreen_Test_StructuredPushSupplier::disconnect_structured_push_supplier ()
613 // No-Op.
616 //*****************************************************************
618 Worker::Worker ()
622 void
623 Worker::orb (CORBA::ORB_ptr orb)
625 orb_ = CORBA::ORB::_duplicate (orb);
628 void
629 Worker::done ()
631 consumer_is_done = true;
635 Worker::svc ()
639 this->orb_->run ();
641 catch (const CORBA::Exception& ex)
643 ex._tao_print_exception ("Consumer:");
646 return 0;