3 #include "RedGreen_Test.h"
4 #include "ace/Arg_Shifter.h"
5 #include "ace/Get_Opt.h"
6 #include "ace/OS_NS_unistd.h"
7 #include "orbsvcs/Time_Utilities.h"
10 #define NOTIFY_FACTORY_NAME "NotifyEventChannelFactory"
11 #define NAMING_SERVICE_NAME "NameService"
13 #define DOMAIN_GREEN "DOMAIN_GREEN"
14 #define DOMAIN_RED "DOMAIN_RED"
16 #define TYPE_GREEN "TYPE_GREEN"
17 #define TYPE_RED "TYPE_RED"
19 ACE_Atomic_Op
<TAO_SYNCH_MUTEX
, int> g_result_count
= 0;
20 ACE_hrtime_t g_throughput_start_
;
22 static bool consumer_is_done
= false;
25 RedGreen_Test::parse_args (int argc
,
28 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
30 const ACE_TCHAR
*current_arg
= 0;
32 while (arg_shifter
.is_anything_left ())
34 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-burst_size"))))
36 this->burst_size_
= ACE_OS::atoi (current_arg
);
40 // The number of events to send/receive.
41 arg_shifter
.consume_arg ();
43 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT ("-?")) == 0)
47 "-burst_size [count]\n",
51 arg_shifter
.consume_arg ();
57 arg_shifter
.ignore_arg ();
64 RedGreen_Test::RedGreen_Test ()
69 ifgop_
= CosNotifyChannelAdmin::OR_OP
;
73 RedGreen_Test::destroy_ec ()
75 if (!CORBA::is_nil (ec_
.in ()))
77 // Even though we still have a reference, there's no guarantee
78 // the EC is still around. So, trap exceptions.
81 this->ec_
->destroy ();
83 catch (const CORBA::COMM_FAILURE
&)
85 // Silently swallow this b/c this could mean the EC is gone
86 // or that the network is hosed. Either way, we're not waiting
87 // around to figure out the problem. Report the incident to the
88 // log and be done with it.
90 "INFO: Got a COMM_FAILURE exception while trying to\n"
91 " invoke `destroy()' on the Event Channel in the\n"
92 " RedGreen destroy_ec. This is likely not a problem.\n"));
94 catch (const CORBA::Exception
& ex
)
96 ex
._tao_print_exception ("in RedGreen destructor; swallowing.\n");
99 this->ec_
= CosNotifyChannelAdmin::EventChannel::_nil ();
105 RedGreen_Test::init (int argc
,
108 this->init_ORB (argc
,
110 this->resolve_naming_service ();
111 this->resolve_Notify_factory ();
113 this->create_supplieradmin ();
114 this->create_consumeradmin ();
115 this->create_consumers ();
116 this->create_suppliers ();
120 RedGreen_Test::run ()
124 this->send_events ();
126 while (! consumer_is_done
)
128 ACE_Time_Value
tv(0, 10 * 1000);
133 this->orb_
->shutdown (false);
135 catch (const CORBA::Exception
& ex
)
137 ex
._tao_print_exception ("Supplier:");
141 worker_
.thr_mgr ()->wait ();
145 RedGreen_Test::done ()
152 RedGreen_Test::init_ORB (int argc
, ACE_TCHAR
*argv
[])
154 this->orb_
= CORBA::ORB_init (argc
, argv
);
156 CORBA::Object_ptr poa_object
=
157 this->orb_
->resolve_initial_references("RootPOA");
159 if (CORBA::is_nil (poa_object
))
161 ACE_ERROR ((LM_ERROR
,
162 " (%P|%t) Unable to initialize the POA.\n"));
166 PortableServer::POA::_narrow (poa_object
);
168 PortableServer::POAManager_var poa_manager
=
169 root_poa_
->the_POAManager ();
171 poa_manager
->activate ();
173 worker_
.orb (this->orb_
.in ());
175 if (worker_
.activate (THR_NEW_LWP
| THR_JOINABLE
,
176 this->nthreads_
) != 0)
178 ACE_ERROR ((LM_ERROR
,
179 "Cannot activate client threads\n"));
184 RedGreen_Test::resolve_naming_service ()
186 CORBA::Object_var naming_obj
=
187 this->orb_
->resolve_initial_references (NAMING_SERVICE_NAME
);
189 // Need to check return value for errors.
190 if (CORBA::is_nil (naming_obj
.in ()))
192 throw CORBA::UNKNOWN ();
195 this->naming_context_
=
196 CosNaming::NamingContext::_narrow (naming_obj
.in ());
200 RedGreen_Test::resolve_Notify_factory ()
202 CosNaming::Name
name (1);
204 name
[0].id
= CORBA::string_dup (NOTIFY_FACTORY_NAME
);
206 CORBA::Object_var obj
=
207 this->naming_context_
->resolve (name
);
209 this->notify_factory_
=
210 CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj
.in ());
214 RedGreen_Test::create_EC ()
216 CosNotifyChannelAdmin::ChannelID id
;
218 this->ec_
= notify_factory_
->create_channel (this->initial_qos_
,
219 this->initial_admin_
,
222 ACE_ASSERT (!CORBA::is_nil (ec_
.in ()));
226 RedGreen_Test::create_supplieradmin ()
228 CosNotifyChannelAdmin::AdminID adminid
;
231 ec_
->new_for_suppliers (this->ifgop_
,
234 ACE_ASSERT (!CORBA::is_nil (supplier_admin_
.in ()));
238 RedGreen_Test::create_consumeradmin ()
240 CosNotifyChannelAdmin::AdminID adminid
;
243 ec_
->new_for_consumers (this->ifgop_
,
246 ACE_ASSERT (!CORBA::is_nil (consumer_admin_
.in ()));
250 RedGreen_Test::create_consumers ()
252 ACE_NEW (this->normal_consumer_
,
253 RedGreen_Test_StructuredPushConsumer (this));
254 this->normal_consumer_
->connect (this->consumer_admin_
.in ());
256 ACE_NEW (this->slow_consumer_
,
257 SlowConsumer (this));
258 this->slow_consumer_
->connect (this->consumer_admin_
.in ());
262 RedGreen_Test::create_suppliers ()
264 ACE_NEW (this->supplier_
,
265 RedGreen_Test_StructuredPushSupplier ());
266 this->supplier_
->connect (this->supplier_admin_
.in ());
270 RedGreen_Test::send_events ()
272 // Setup the Consumer 1 to receive
273 //event_type : "DOMAIN_GREEN", "DOMAIN_GREEN".
274 CosNotification::EventTypeSeq
added_1(1);
276 CosNotification::EventTypeSeq
removed_1 (0);
277 removed_1
.length (0);
279 added_1
[0].domain_name
= CORBA::string_dup (DOMAIN_GREEN
);
280 added_1
[0].type_name
= CORBA::string_dup (TYPE_GREEN
);
282 this->normal_consumer_
->get_proxy_supplier ()->subscription_change (added_1
, removed_1
);
284 // Setup the Consumer 2 to receive event_type : "DOMAIN_RED", "TYPE_RED"
285 CosNotification::EventTypeSeq
added_2(1);
287 CosNotification::EventTypeSeq
removed_2 (0);
288 removed_2
.length (0);
290 added_2
[0].domain_name
= CORBA::string_dup (DOMAIN_RED
);
291 added_2
[0].type_name
= CORBA::string_dup (TYPE_RED
);
293 this->slow_consumer_
->get_proxy_supplier ()->subscription_change (added_2
, removed_2
);
295 // Create the events - one of each type
298 CosNotification::StructuredEvent green_event
;
299 green_event
.header
.fixed_header
.event_type
.domain_name
=
300 CORBA::string_dup(DOMAIN_GREEN
);
301 green_event
.header
.fixed_header
.event_type
.type_name
=
302 CORBA::string_dup(TYPE_GREEN
);
303 green_event
.header
.fixed_header
.event_name
= CORBA::string_dup ("");
304 green_event
.header
.variable_header
.length (0); // put nothing here
305 green_event
.filterable_data
.length (0);
306 green_event
.remainder_of_body
<<= (CORBA::Long
)10;
309 CosNotification::StructuredEvent red_event
;
310 red_event
.header
.fixed_header
.event_type
.domain_name
=
311 CORBA::string_dup(DOMAIN_RED
);
312 red_event
.header
.fixed_header
.event_type
.type_name
=
313 CORBA::string_dup(TYPE_RED
);
314 red_event
.header
.fixed_header
.event_name
= CORBA::string_dup("");
315 red_event
.header
.variable_header
.length (0); // put nothing here
316 red_event
.filterable_data
.length (0);
317 red_event
.remainder_of_body
<<= (CORBA::Long
)10;
319 g_throughput_start_
= ACE_OS::gethrtime ();
321 // let supplier 1 send all these events
322 for (int i
= 0; i
< this->burst_size_
; ++i
)
324 this->supplier_
->send_event (red_event
);
326 this->supplier_
->send_event (green_event
);
332 RedGreen_Test::dump_results ()
334 ACE_Throughput_Stats throughput
;
335 ACE_High_Res_Timer::global_scale_factor_type gsf
=
336 ACE_High_Res_Timer::global_scale_factor ();
337 ACE_TCHAR buf
[BUFSIZ
];
339 ACE_OS::sprintf (buf
,
340 ACE_TEXT ("Normal Consumer [%02d]"),
342 normal_consumer_
->dump_stats (buf
,
344 normal_consumer_
->accumulate_into (throughput
);
346 ACE_OS::sprintf (buf
,
347 ACE_TEXT ("Slow Consumer [%02d]"),
349 slow_consumer_
->dump_stats (buf
,
351 slow_consumer_
->accumulate_into (throughput
);
353 ACE_DEBUG ((LM_DEBUG
,
356 ACE_Throughput_Stats suppliers
;
358 ACE_OS::sprintf (buf
,
359 ACE_TEXT ("Supplier [%02d]"),
362 this->supplier_
->dump_stats (buf
, gsf
);
363 this->supplier_
->accumulate_into (suppliers
);
365 ACE_DEBUG ((LM_DEBUG
, "\nTotals:\n"));
366 throughput
.dump_results (ACE_TEXT("Notify_Consumer/totals"), gsf
);
368 ACE_DEBUG ((LM_DEBUG
, "\n"));
369 suppliers
.dump_results (ACE_TEXT("Notify_Supplier/totals"), gsf
);
372 // *****************************************************************
374 RedGreen_Test_StructuredPushConsumer::RedGreen_Test_StructuredPushConsumer (
375 RedGreen_Test
* RedGreen_Test
377 : RedGreen_Test_ (RedGreen_Test
),
382 RedGreen_Test_StructuredPushConsumer::~RedGreen_Test_StructuredPushConsumer (
389 RedGreen_Test_StructuredPushConsumer::accumulate_into (
390 ACE_Throughput_Stats
&throughput
) const
392 throughput
.accumulate (this->throughput_
);
396 RedGreen_Test_StructuredPushConsumer::dump_stats (
397 const ACE_TCHAR
* msg
,
398 ACE_High_Res_Timer::global_scale_factor_type gsf
)
400 this->throughput_
.dump_results (msg
, gsf
);
404 RedGreen_Test_StructuredPushConsumer::connect (
405 CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
408 // Activate the consumer with the default_POA_.
409 CosNotifyComm::StructuredPushConsumer_var objref
=
412 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier
=
413 consumer_admin
->obtain_notification_push_supplier (
414 CosNotifyChannelAdmin::STRUCTURED_EVENT
,
417 ACE_ASSERT (!CORBA::is_nil (proxysupplier
.in ()));
420 this->proxy_supplier_
=
421 CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow (
422 proxysupplier
.in ());
424 ACE_ASSERT (!CORBA::is_nil (proxy_supplier_
.in ()));
426 proxy_supplier_
->connect_structured_push_consumer (objref
.in ());
430 RedGreen_Test_StructuredPushConsumer::disconnect ()
432 this->proxy_supplier_
->
433 disconnect_structured_push_supplier ();
437 RedGreen_Test_StructuredPushConsumer::offer_change (
438 const CosNotification::EventTypeSeq
& /*added*/,
439 const CosNotification::EventTypeSeq
& /*removed*/
446 RedGreen_Test_StructuredPushConsumer::push_structured_event (
447 const CosNotification::StructuredEvent
& notification
450 ACE_GUARD (TAO_SYNCH_MUTEX
,
453 const char* domain_name
=
454 notification
.header
.fixed_header
.event_type
.domain_name
;
456 const char* type_name
=
457 notification
.header
.fixed_header
.event_type
.type_name
;
460 ACE_DEBUG ((LM_DEBUG
,
461 "Consumer %d event, domain = %s, type = %s\n",
462 this->proxy_supplier_id_
,
466 TimeBase::TimeT latency_base_recorded
;
467 ACE_hrtime_t latency_base
;
469 notification
.filterable_data
[0].value
>>= latency_base_recorded
;
471 ORBSVCS_Time::TimeT_to_hrtime (latency_base
,
472 latency_base_recorded
);
476 // Grab timestamp again.
477 ACE_hrtime_t now
= ACE_OS::gethrtime ();
479 // Record statistics.
480 this->throughput_
.sample (now
- g_throughput_start_
,
484 if (++g_result_count
== 2*RedGreen_Test_
->burst_size_
)
486 RedGreen_Test_
->done ();
491 RedGreen_Test_StructuredPushConsumer::disconnect_structured_push_consumer (
497 CosNotifyChannelAdmin::StructuredProxyPushSupplier_ptr
498 RedGreen_Test_StructuredPushConsumer::get_proxy_supplier ()
500 return this->proxy_supplier_
.in ();
503 // *****************************************************************
505 SlowConsumer::SlowConsumer (RedGreen_Test
* RedGreen_Test
)
506 : RedGreen_Test_StructuredPushConsumer (RedGreen_Test
)
511 SlowConsumer::push_structured_event (
512 const CosNotification::StructuredEvent
& notification
518 RedGreen_Test_StructuredPushConsumer::push_structured_event (notification
);
521 // *****************************************************************
523 RedGreen_Test_StructuredPushSupplier::RedGreen_Test_StructuredPushSupplier ()
527 RedGreen_Test_StructuredPushSupplier::~RedGreen_Test_StructuredPushSupplier ()
532 RedGreen_Test_StructuredPushSupplier::accumulate_into (
533 ACE_Throughput_Stats
&throughput
) const
535 throughput
.accumulate (this->throughput_
);
539 RedGreen_Test_StructuredPushSupplier::dump_stats (
540 const ACE_TCHAR
* msg
,
541 ACE_High_Res_Timer::global_scale_factor_type gsf
)
543 this->throughput_
.dump_results (msg
, gsf
);
547 RedGreen_Test_StructuredPushSupplier::connect (
548 CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin
551 CosNotifyComm::StructuredPushSupplier_var objref
=
554 CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer
=
555 supplier_admin
->obtain_notification_push_consumer (
556 CosNotifyChannelAdmin::STRUCTURED_EVENT
,
559 ACE_ASSERT (!CORBA::is_nil (proxyconsumer
.in ()));
562 this->proxy_consumer_
=
563 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (
564 proxyconsumer
.in ());
566 ACE_ASSERT (!CORBA::is_nil (proxy_consumer_
.in ()));
568 proxy_consumer_
->connect_structured_push_supplier (objref
.in ());
572 RedGreen_Test_StructuredPushSupplier::disconnect ()
574 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_
.in ()));
576 this->proxy_consumer_
->disconnect_structured_push_consumer ();
580 RedGreen_Test_StructuredPushSupplier::subscription_change (
581 const CosNotification::EventTypeSeq
& /*added*/,
582 const CosNotification::EventTypeSeq
&) /*removed */
588 RedGreen_Test_StructuredPushSupplier::send_event (
589 CosNotification::StructuredEvent
& event
)
591 event
.filterable_data
.length (1);
592 event
.filterable_data
[0].name
= CORBA::string_dup("latency_base");
594 // Record current time.
595 ACE_hrtime_t start
= ACE_OS::gethrtime ();
597 TimeBase::TimeT latency_base
;
598 ORBSVCS_Time::hrtime_to_TimeT (latency_base
,
601 event
.filterable_data
[0].value
<<= latency_base
;
602 proxy_consumer_
->push_structured_event (event
);
604 ACE_hrtime_t end
= ACE_OS::gethrtime ();
606 this->throughput_
.sample (end
- g_throughput_start_
,
611 RedGreen_Test_StructuredPushSupplier::disconnect_structured_push_supplier ()
616 //*****************************************************************
623 Worker::orb (CORBA::ORB_ptr orb
)
625 orb_
= CORBA::ORB::_duplicate (orb
);
631 consumer_is_done
= true;
641 catch (const CORBA::Exception
& ex
)
643 ex
._tao_print_exception ("Consumer:");