1 #include "Throughput.h"
3 #include "ace/Arg_Shifter.h"
4 #include "ace/Get_Opt.h"
6 #include "ace/Dynamic_Service.h"
7 #include "tao/Strategies/advanced_resource.h"
8 #include "tao/Messaging/Messaging.h"
9 #include "orbsvcs/Notify/Service.h"
10 #include "orbsvcs/Time_Utilities.h"
12 /***************************************************************************/
14 Throughput_StructuredPushConsumer::Throughput_StructuredPushConsumer (
15 Notify_Throughput
*test_client
17 : test_client_ (test_client
),
23 Throughput_StructuredPushConsumer::accumulate_into (
24 ACE_Throughput_Stats
&throughput
) const
26 throughput
.accumulate (this->throughput_
);
30 Throughput_StructuredPushConsumer::dump_stats (
32 ACE_High_Res_Timer::global_scale_factor_type gsf
)
34 this->throughput_
.dump_results (msg
, gsf
);
38 Throughput_StructuredPushConsumer::push_structured_event (
39 const CosNotification::StructuredEvent
& notification
44 CORBA::Boolean ok
= (notification
.remainder_of_body
>>= msg
);
47 ACE_DEBUG ((LM_DEBUG
, "(%t) Error extracting message body\n"));
49 TimeBase::TimeT Throughput_base_recorded
;
50 ACE_hrtime_t Throughput_base
;
52 notification
.filterable_data
[0].value
>>= Throughput_base_recorded
;
54 ORBSVCS_Time::TimeT_to_hrtime (Throughput_base
,
55 Throughput_base_recorded
);
57 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
59 if (this->push_count_
== 0)
61 this->throughput_start_
= ACE_OS::gethrtime ();
66 // Grab timestamp again.
67 ACE_hrtime_t now
= ACE_OS::gethrtime ();
70 this->throughput_
.sample (now
- this->throughput_start_
,
71 now
- Throughput_base
);
73 if (this->push_count_
%1000 == 0)
76 "(%P)(%t) event count = %d\n",
80 if (push_count_
== test_client_
->perconsumer_count_
)
83 "(%t)expected count reached\n"));
84 test_client_
->peer_done ();
88 /***************************************************************************/
90 Throughput_StructuredPushSupplier::Throughput_StructuredPushSupplier (
91 Notify_Throughput
* test_client
93 :test_client_ (test_client
),
98 Throughput_StructuredPushSupplier::~Throughput_StructuredPushSupplier ()
103 Throughput_StructuredPushSupplier::accumulate_into (
104 ACE_Throughput_Stats
&throughput
) const
106 throughput
.accumulate (this->throughput_
);
110 Throughput_StructuredPushSupplier::dump_stats (
111 const ACE_TCHAR
* msg
,
112 ACE_High_Res_Timer::global_scale_factor_type gsf
)
114 this->throughput_
.dump_results (msg
, gsf
);
118 Throughput_StructuredPushSupplier::svc ()
120 // Initialize a time value to pace the test.
121 ACE_Time_Value
tv (0, test_client_
->burst_pause_
);
124 CosNotification::StructuredEvent event
;
131 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup("*");
133 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup("*");
135 event
.header
.fixed_header
.event_name
= CORBA::string_dup("myevent");
137 // OptionalHeaderFields
139 // sequence<Property>: string name, any value
140 CosNotification::PropertySeq
& qos
= event
.header
.variable_header
;
141 qos
.length (0); // put nothing here
143 // FilterableEventBody
145 // sequence<Property>: string name, any value
146 event
.filterable_data
.length (1);
147 event
.filterable_data
[0].name
= CORBA::string_dup("Throughput_base");
149 event
.remainder_of_body
<<= test_client_
->payload_
;
152 this->throughput_start_
= ACE_OS::gethrtime ();
154 for (int i
= 0; i
< test_client_
->burst_count_
; ++i
)
156 for (int j
= 0; j
< test_client_
->burst_size_
; ++j
)
158 // Record current time.
159 ACE_hrtime_t start
= ACE_OS::gethrtime ();
160 TimeBase::TimeT Throughput_base
;
161 ORBSVCS_Time::hrtime_to_TimeT (Throughput_base
,
164 event
.filterable_data
[0].value
<<= Throughput_base
;
166 this->proxy_
->push_structured_event (event
);
168 ACE_hrtime_t end
= ACE_OS::gethrtime ();
169 this->throughput_
.sample (end
- this->throughput_start_
,
176 ACE_DEBUG ((LM_DEBUG
, "(%P) (%t) Supplier done\n"));
177 test_client_
->peer_done ();
181 /***************************************************************************/
182 Notify_Throughput::Notify_Throughput ()
183 : collocated_ec_ (0),
185 burst_pause_ (10000),
187 payload_size_ (1024),
191 perconsumer_count_ (burst_size_
*burst_count_
*supplier_count_
),
195 peer_done_count_ (consumer_count_
+ supplier_count_
),
200 Notify_Throughput::~Notify_Throughput ()
202 this->orb_
->shutdown (false);
208 Notify_Throughput::init (int argc
, ACE_TCHAR
* argv
[])
210 // Initialize base class.
211 Notify_Test_Client::init_ORB (argc
,
214 #if (TAO_HAS_CORBA_MESSAGING == 1)
215 CORBA::Object_var manager_object
=
216 orb_
->resolve_initial_references ("ORBPolicyManager");
218 CORBA::PolicyManager_var policy_manager
=
219 CORBA::PolicyManager::_narrow (manager_object
.in ());
221 CORBA::Any sync_scope
;
222 sync_scope
<<= Messaging::SYNC_WITH_TARGET
;
224 CORBA::PolicyList
policy_list (1);
225 policy_list
.length (1);
227 orb_
->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE
,
229 policy_manager
->set_policy_overrides (policy_list
,
230 CORBA::SET_OVERRIDE
);
232 ACE_DEBUG ((LM_DEBUG
,
233 "CORBA Messaging disabled in this configuration,"
234 " test may not be optimally configured\n"));
235 #endif /* TAO_HAS_MESSAGING */
237 worker_
.orb (this->orb_
.in ());
239 if (worker_
.activate (THR_NEW_LWP
| THR_JOINABLE
,
240 this->nthreads_
) != 0)
242 ACE_ERROR ((LM_ERROR
,
243 "Cannot activate client threads\n"));
246 // Create all participents ...
249 CosNotifyChannelAdmin::AdminID adminid
;
252 ec_
->new_for_suppliers (this->ifgop_
, adminid
);
254 ACE_ASSERT (!CORBA::is_nil (supplier_admin_
.in ()));
257 ec_
->new_for_consumers (this->ifgop_
, adminid
);
259 ACE_ASSERT (!CORBA::is_nil (consumer_admin_
.in ()));
261 ACE_NEW_RETURN (consumers_
,
262 Throughput_StructuredPushConsumer
*[this->consumer_count_
],
264 ACE_NEW_RETURN (suppliers_
,
265 Throughput_StructuredPushSupplier
*[this->supplier_count_
],
272 for (i
= 0; i
< this->consumer_count_
; ++i
)
274 ACE_NEW_RETURN (consumers_
[i
],
275 Throughput_StructuredPushConsumer (this),
277 consumers_
[i
]->init (root_poa_
.in ());
279 consumers_
[i
]->connect (this->consumer_admin_
.in ());
282 for (i
= 0; i
< this->supplier_count_
; ++i
)
284 ACE_NEW_RETURN (suppliers_
[i
],
285 Throughput_StructuredPushSupplier (this),
287 suppliers_
[i
]->TAO_Notify_Tests_StructuredPushSupplier::init (
289 suppliers_
[i
]->connect (this->supplier_admin_
.in ());
296 Notify_Throughput::parse_args(int argc
, ACE_TCHAR
*argv
[])
298 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
300 const ACE_TCHAR
* current_arg
= 0;
301 while (arg_shifter
.is_anything_left ())
303 if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-collocated_ec")) == 0)
305 this->collocated_ec_
= 1;
306 arg_shifter
.consume_arg ();
308 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-consumers"))))
310 this->consumer_count_
= ACE_OS::atoi (current_arg
);
311 // The number of events to send/receive.
312 arg_shifter
.consume_arg ();
314 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-suppliers"))))
316 this->supplier_count_
= ACE_OS::atoi (current_arg
);
317 // The number of events to send/receive.
318 arg_shifter
.consume_arg ();
320 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-burst_size"))))
322 this->burst_size_
= ACE_OS::atoi (current_arg
);
323 // The number of events to send/receive.
324 arg_shifter
.consume_arg ();
326 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-burst_count"))))
328 this->burst_count_
= ACE_OS::atoi (current_arg
);
330 arg_shifter
.consume_arg ();
332 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-burst_pause"))))
334 this->burst_pause_
= ACE_OS::atoi (current_arg
);
336 arg_shifter
.consume_arg ();
338 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-payload"))))
340 this->payload_size_
= ACE_OS::atoi (current_arg
);
341 ACE_NEW_RETURN (this->payload_
,
342 char [this->payload_size_
],
345 arg_shifter
.consume_arg ();
347 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-EC"))))
349 this->ec_name_
= ACE_TEXT_ALWAYS_CHAR(current_arg
);
351 arg_shifter
.consume_arg ();
353 else if (0 != (current_arg
=
354 arg_shifter
.get_the_parameter (ACE_TEXT("-ExpectedCount"))))
356 this->perconsumer_count_
= ACE_OS::atoi (current_arg
);
358 arg_shifter
.consume_arg ();
360 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-?")) == 0)
365 "-consumers [count], "
366 "-suppliers [count], "
367 "-burst_size [size], "
368 "-burst_count [count], "
369 "-burst_pause [time(uS)], "
372 "-ExpectedCount [count]\n",
375 arg_shifter
.consume_arg ();
381 arg_shifter
.ignore_arg ();
385 peer_done_count_
= consumer_count_
+ supplier_count_
;
390 Notify_Throughput::create_EC ()
392 if (this->collocated_ec_
== 1)
394 TAO_Notify_Service
* notify_service
= ACE_Dynamic_Service
<TAO_Notify_Service
>::instance (TAO_NOTIFICATION_SERVICE_NAME
);
396 if (notify_service
== 0)
398 ACE_DEBUG ((LM_DEBUG
, "Service not found! check conf. file\n"));
402 // Activate the factory
403 this->notify_factory_
=
404 notify_service
->create (this->root_poa_
.in ());
406 ACE_ASSERT (!CORBA::is_nil (this->notify_factory_
.in ()));
410 this->resolve_naming_service ();
411 this->resolve_Notify_factory ();
414 // A channel name was specified, use that to resolve the service.
415 if (this->ec_name_
.length () != 0)
417 CosNaming::Name
name (1);
419 name
[0].id
= CORBA::string_dup (ec_name_
.c_str ());
421 CORBA::Object_var obj
=
422 this->naming_context_
->resolve (name
);
425 CosNotifyChannelAdmin::EventChannel::_narrow (obj
.in ());
429 CosNotifyChannelAdmin::ChannelID id
;
431 ec_
= notify_factory_
->create_channel (initial_qos_
,
436 ACE_ASSERT (!CORBA::is_nil (ec_
.in ()));
440 Notify_Throughput::run_test ()
442 ACE_DEBUG ((LM_DEBUG
, "collocated_ec_ %d ,"
447 "consumer_count_ %d, "
448 "supplier_count_ %d "
449 "expected count %d\n",
457 perconsumer_count_
));
459 for (int i
= 0; i
< this->supplier_count_
; ++i
)
462 TAO_Notify_Tests_StructuredPushSupplier::init (root_poa_
.in ());
464 if (suppliers_
[i
]->ACE_Task_Base::activate (THR_NEW_LWP
| THR_JOINABLE
) != 0)
466 ACE_ERROR ((LM_ERROR
,
467 "Cannot activate client threads\n"));
471 // Wait till we're signalled done.
473 ACE_DEBUG ((LM_DEBUG
, "(%t)Waiting for shutdown signal in main..\n"));
474 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, lock_
);
476 while (this->peer_done_count_
!= 0)
482 if (this->ec_name_
.length () == 0) // we are not using a global EC
485 this->ec_
->destroy ();
488 // Signal the workers.
489 this->worker_
.done_
= 1;
493 Notify_Throughput::peer_done ()
495 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, lock_
);
497 if (--this->peer_done_count_
== 0)
499 ACE_DEBUG ((LM_DEBUG
, "calling shutdown\n"));
500 condition_
.broadcast ();
505 Notify_Throughput::dump_results ()
507 ACE_Throughput_Stats throughput
;
508 ACE_High_Res_Timer::global_scale_factor_type gsf
=
509 ACE_High_Res_Timer::global_scale_factor ();
510 ACE_TCHAR buf
[BUFSIZ
];
512 for (int j
= 0; j
< this->consumer_count_
; ++j
)
514 ACE_OS::sprintf (buf
, ACE_TEXT("Consumer [%02d]"), j
);
516 this->consumers_
[j
]->dump_stats (buf
, gsf
);
517 this->consumers_
[j
]->accumulate_into (throughput
);
520 ACE_DEBUG ((LM_DEBUG
, "\n"));
522 ACE_Throughput_Stats suppliers
;
524 for (int i
= 0; i
< this->supplier_count_
; ++i
)
526 ACE_OS::sprintf (buf
, ACE_TEXT("Supplier [%02d]"), i
);
528 this->suppliers_
[i
]->dump_stats (buf
, gsf
);
529 this->suppliers_
[i
]->accumulate_into (suppliers
);
532 ACE_DEBUG ((LM_DEBUG
, "\nTotals:\n"));
533 throughput
.dump_results (ACE_TEXT("Notify_Consumer/totals"), gsf
);
535 ACE_DEBUG ((LM_DEBUG
, "\n"));
536 suppliers
.dump_results (ACE_TEXT("Notify_Supplier/totals"), gsf
);
539 /***************************************************************************/
542 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
544 ACE_High_Res_Timer::calibrate ();
546 Notify_Throughput events
;
548 if (events
.parse_args (argc
, argv
) == -1)
555 events
.init (argc
, argv
); //Init the Client
559 ACE_DEBUG ((LM_DEBUG
, "Waiting for threads to exit...\n"));
560 ACE_Thread_Manager::instance ()->wait ();
561 events
.dump_results();
563 ACE_DEBUG ((LM_DEBUG
, "ending main...\n"));
565 catch (const CORBA::Exception
& se
)
567 se
._tao_print_exception ("Error: ");
575 // ****************************************************************
583 Worker::orb (CORBA::ORB_ptr orb
)
585 orb_
= CORBA::ORB::_duplicate (orb
);
591 ACE_Time_Value
tv(5);
595 this->orb_
->run (tv
);
596 }while (!this->done_
);
598 ACE_DEBUG ((LM_DEBUG
, "(%P) (%t) done\n"));