Merge pull request #1710 from likema/cfg-assign-not-null-str
[ACE_TAO.git] / TAO / orbsvcs / tests / Notify / performance-tests / Throughput / Throughput.cpp
blob8705f8d4586d5732f02d38a036be3f291150046a
1 #include "Throughput.h"
3 #include "ace/Arg_Shifter.h"
4 #include "ace/Get_Opt.h"
5 #include "ace/Synch.h"
6 #include "ace/Dynamic_Service.h"
7 #include "tao/Strategies/advanced_resource.h"
8 #include "tao/Messaging/Messaging.h"
9 #include "orbsvcs/Notify/Service.h"
10 #include "orbsvcs/Time_Utilities.h"
12 /***************************************************************************/
14 Throughput_StructuredPushConsumer::Throughput_StructuredPushConsumer (
15 Notify_Throughput *test_client
17 : test_client_ (test_client),
18 push_count_ (0)
22 void
23 Throughput_StructuredPushConsumer::accumulate_into (
24 ACE_Throughput_Stats &throughput) const
26 throughput.accumulate (this->throughput_);
29 void
30 Throughput_StructuredPushConsumer::dump_stats (
31 const ACE_TCHAR* msg,
32 ACE_High_Res_Timer::global_scale_factor_type gsf)
34 this->throughput_.dump_results (msg, gsf);
37 void
38 Throughput_StructuredPushConsumer::push_structured_event (
39 const CosNotification::StructuredEvent & notification
42 // Extract payload.
43 const char* msg;
44 CORBA::Boolean ok = (notification.remainder_of_body >>= msg);
46 if (!ok)
47 ACE_DEBUG ((LM_DEBUG, "(%t) Error extracting message body\n"));
49 TimeBase::TimeT Throughput_base_recorded;
50 ACE_hrtime_t Throughput_base;
52 notification.filterable_data[0].value >>= Throughput_base_recorded;
54 ORBSVCS_Time::TimeT_to_hrtime (Throughput_base,
55 Throughput_base_recorded);
57 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
59 if (this->push_count_ == 0)
61 this->throughput_start_ = ACE_OS::gethrtime ();
64 ++this->push_count_;
66 // Grab timestamp again.
67 ACE_hrtime_t now = ACE_OS::gethrtime ();
69 // Record statistics.
70 this->throughput_.sample (now - this->throughput_start_,
71 now - Throughput_base);
73 if (this->push_count_%1000 == 0)
75 ACE_DEBUG ((LM_DEBUG,
76 "(%P)(%t) event count = %d\n",
77 this->push_count_));
80 if (push_count_ == test_client_->perconsumer_count_)
82 ACE_DEBUG ((LM_DEBUG,
83 "(%t)expected count reached\n"));
84 test_client_->peer_done ();
88 /***************************************************************************/
90 Throughput_StructuredPushSupplier::Throughput_StructuredPushSupplier (
91 Notify_Throughput* test_client
93 :test_client_ (test_client),
94 push_count_ (0)
98 Throughput_StructuredPushSupplier::~Throughput_StructuredPushSupplier ()
102 void
103 Throughput_StructuredPushSupplier::accumulate_into (
104 ACE_Throughput_Stats &throughput) const
106 throughput.accumulate (this->throughput_);
109 void
110 Throughput_StructuredPushSupplier::dump_stats (
111 const ACE_TCHAR* msg,
112 ACE_High_Res_Timer::global_scale_factor_type gsf)
114 this->throughput_.dump_results (msg, gsf);
118 Throughput_StructuredPushSupplier::svc ()
120 // Initialize a time value to pace the test.
121 ACE_Time_Value tv (0, test_client_->burst_pause_);
123 // Operations.
124 CosNotification::StructuredEvent event;
126 // EventHeader
128 // FixedEventHeader
129 // EventType
130 // string
131 event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*");
132 // string
133 event.header.fixed_header.event_type.type_name = CORBA::string_dup("*");
134 // string
135 event.header.fixed_header.event_name = CORBA::string_dup("myevent");
137 // OptionalHeaderFields
138 // PropertySeq
139 // sequence<Property>: string name, any value
140 CosNotification::PropertySeq& qos = event.header.variable_header;
141 qos.length (0); // put nothing here
143 // FilterableEventBody
144 // PropertySeq
145 // sequence<Property>: string name, any value
146 event.filterable_data.length (1);
147 event.filterable_data[0].name = CORBA::string_dup("Throughput_base");
149 event.remainder_of_body <<= test_client_->payload_;
152 this->throughput_start_ = ACE_OS::gethrtime ();
154 for (int i = 0; i < test_client_->burst_count_; ++i)
156 for (int j = 0; j < test_client_->burst_size_; ++j)
158 // Record current time.
159 ACE_hrtime_t start = ACE_OS::gethrtime ();
160 TimeBase::TimeT Throughput_base;
161 ORBSVCS_Time::hrtime_to_TimeT (Throughput_base,
162 start);
163 // Any.
164 event.filterable_data[0].value <<= Throughput_base;
166 this->proxy_->push_structured_event (event);
168 ACE_hrtime_t end = ACE_OS::gethrtime ();
169 this->throughput_.sample (end - this->throughput_start_,
170 end - start);
173 ACE_OS::sleep (tv);
176 ACE_DEBUG ((LM_DEBUG, "(%P) (%t) Supplier done\n"));
177 test_client_->peer_done ();
178 return 0;
181 /***************************************************************************/
182 Notify_Throughput::Notify_Throughput ()
183 : collocated_ec_ (0),
184 burst_count_ (1),
185 burst_pause_ (10000),
186 burst_size_ (1000),
187 payload_size_ (1024),
188 payload_ (0),
189 consumer_count_ (1),
190 supplier_count_ (1),
191 perconsumer_count_ (burst_size_*burst_count_*supplier_count_),
192 consumers_ (0),
193 suppliers_ (0),
194 nthreads_ (1),
195 peer_done_count_ (consumer_count_ + supplier_count_),
196 condition_ (lock_)
200 Notify_Throughput::~Notify_Throughput ()
202 this->orb_->shutdown (false);
204 delete[] payload_;
208 Notify_Throughput::init (int argc, ACE_TCHAR* argv [])
210 // Initialize base class.
211 Notify_Test_Client::init_ORB (argc,
212 argv);
214 #if (TAO_HAS_CORBA_MESSAGING == 1)
215 CORBA::Object_var manager_object =
216 orb_->resolve_initial_references ("ORBPolicyManager");
218 CORBA::PolicyManager_var policy_manager =
219 CORBA::PolicyManager::_narrow (manager_object.in ());
221 CORBA::Any sync_scope;
222 sync_scope <<= Messaging::SYNC_WITH_TARGET;
224 CORBA::PolicyList policy_list (1);
225 policy_list.length (1);
226 policy_list[0] =
227 orb_->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
228 sync_scope);
229 policy_manager->set_policy_overrides (policy_list,
230 CORBA::SET_OVERRIDE);
231 #else
232 ACE_DEBUG ((LM_DEBUG,
233 "CORBA Messaging disabled in this configuration,"
234 " test may not be optimally configured\n"));
235 #endif /* TAO_HAS_MESSAGING */
237 worker_.orb (this->orb_.in ());
239 if (worker_.activate (THR_NEW_LWP | THR_JOINABLE,
240 this->nthreads_) != 0)
242 ACE_ERROR ((LM_ERROR,
243 "Cannot activate client threads\n"));
246 // Create all participents ...
247 this->create_EC ();
249 CosNotifyChannelAdmin::AdminID adminid;
251 supplier_admin_ =
252 ec_->new_for_suppliers (this->ifgop_, adminid);
254 ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));
256 consumer_admin_ =
257 ec_->new_for_consumers (this->ifgop_, adminid);
259 ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ()));
261 ACE_NEW_RETURN (consumers_,
262 Throughput_StructuredPushConsumer*[this->consumer_count_],
263 -1);
264 ACE_NEW_RETURN (suppliers_,
265 Throughput_StructuredPushSupplier*[this->supplier_count_],
266 -1);
268 // ----
270 int i = 0;
272 for (i = 0; i < this->consumer_count_; ++i)
274 ACE_NEW_RETURN (consumers_[i],
275 Throughput_StructuredPushConsumer (this),
276 -1);
277 consumers_[i]->init (root_poa_.in ());
279 consumers_[i]->connect (this->consumer_admin_.in ());
282 for (i = 0; i < this->supplier_count_; ++i)
284 ACE_NEW_RETURN (suppliers_[i],
285 Throughput_StructuredPushSupplier (this),
286 -1);
287 suppliers_[i]->TAO_Notify_Tests_StructuredPushSupplier::init (
288 root_poa_.in ());
289 suppliers_[i]->connect (this->supplier_admin_.in ());
292 return 0;
296 Notify_Throughput::parse_args(int argc, ACE_TCHAR *argv[])
298 ACE_Arg_Shifter arg_shifter (argc, argv);
300 const ACE_TCHAR* current_arg = 0;
301 while (arg_shifter.is_anything_left ())
303 if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-collocated_ec")) == 0)
305 this->collocated_ec_ = 1;
306 arg_shifter.consume_arg ();
308 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-consumers"))))
310 this->consumer_count_ = ACE_OS::atoi (current_arg);
311 // The number of events to send/receive.
312 arg_shifter.consume_arg ();
314 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-suppliers"))))
316 this->supplier_count_ = ACE_OS::atoi (current_arg);
317 // The number of events to send/receive.
318 arg_shifter.consume_arg ();
320 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-burst_size"))))
322 this->burst_size_ = ACE_OS::atoi (current_arg);
323 // The number of events to send/receive.
324 arg_shifter.consume_arg ();
326 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-burst_count"))))
328 this->burst_count_ = ACE_OS::atoi (current_arg);
330 arg_shifter.consume_arg ();
332 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-burst_pause"))))
334 this->burst_pause_ = ACE_OS::atoi (current_arg);
336 arg_shifter.consume_arg ();
338 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-payload"))))
340 this->payload_size_ = ACE_OS::atoi (current_arg);
341 ACE_NEW_RETURN (this->payload_,
342 char [this->payload_size_],
343 -1);
345 arg_shifter.consume_arg ();
347 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-EC"))))
349 this->ec_name_ = ACE_TEXT_ALWAYS_CHAR(current_arg);
351 arg_shifter.consume_arg ();
353 else if (0 != (current_arg =
354 arg_shifter.get_the_parameter (ACE_TEXT("-ExpectedCount"))))
356 this->perconsumer_count_ = ACE_OS::atoi (current_arg);
358 arg_shifter.consume_arg ();
360 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-?")) == 0)
362 ACE_DEBUG((LM_DEBUG,
363 "usage: %s "
364 "-collocated_ec, "
365 "-consumers [count], "
366 "-suppliers [count], "
367 "-burst_size [size], "
368 "-burst_count [count], "
369 "-burst_pause [time(uS)], "
370 "-payload [size]"
371 "-EC [Channel Name]"
372 "-ExpectedCount [count]\n",
373 argv[0], argv[0]));
375 arg_shifter.consume_arg ();
377 return -1;
379 else
381 arg_shifter.ignore_arg ();
384 // Recalculate.
385 peer_done_count_ = consumer_count_ + supplier_count_;
386 return 0;
389 void
390 Notify_Throughput::create_EC ()
392 if (this->collocated_ec_ == 1)
394 TAO_Notify_Service* notify_service = ACE_Dynamic_Service<TAO_Notify_Service>::instance (TAO_NOTIFICATION_SERVICE_NAME);
396 if (notify_service == 0)
398 ACE_DEBUG ((LM_DEBUG, "Service not found! check conf. file\n"));
399 return;
402 // Activate the factory
403 this->notify_factory_ =
404 notify_service->create (this->root_poa_.in ());
406 ACE_ASSERT (!CORBA::is_nil (this->notify_factory_.in ()));
408 else
410 this->resolve_naming_service ();
411 this->resolve_Notify_factory ();
414 // A channel name was specified, use that to resolve the service.
415 if (this->ec_name_.length () != 0)
417 CosNaming::Name name (1);
418 name.length (1);
419 name[0].id = CORBA::string_dup (ec_name_.c_str ());
421 CORBA::Object_var obj =
422 this->naming_context_->resolve (name);
424 this->ec_ =
425 CosNotifyChannelAdmin::EventChannel::_narrow (obj.in ());
427 else
429 CosNotifyChannelAdmin::ChannelID id;
431 ec_ = notify_factory_->create_channel (initial_qos_,
432 initial_admin_,
433 id);
436 ACE_ASSERT (!CORBA::is_nil (ec_.in ()));
439 void
440 Notify_Throughput::run_test ()
442 ACE_DEBUG ((LM_DEBUG, "collocated_ec_ %d ,"
443 "burst_count_ %d, "
444 "burst_pause_ %d, "
445 "burst_size_ %d, "
446 "payload_size_ %d, "
447 "consumer_count_ %d, "
448 "supplier_count_ %d "
449 "expected count %d\n",
450 collocated_ec_,
451 burst_count_ ,
452 burst_pause_ ,
453 burst_size_ ,
454 payload_size_,
455 consumer_count_ ,
456 supplier_count_ ,
457 perconsumer_count_));
459 for (int i = 0; i < this->supplier_count_; ++i)
461 suppliers_[i]->
462 TAO_Notify_Tests_StructuredPushSupplier::init (root_poa_.in ());
464 if (suppliers_[i]->ACE_Task_Base::activate (THR_NEW_LWP | THR_JOINABLE) != 0)
466 ACE_ERROR ((LM_ERROR,
467 "Cannot activate client threads\n"));
471 // Wait till we're signalled done.
473 ACE_DEBUG ((LM_DEBUG, "(%t)Waiting for shutdown signal in main..\n"));
474 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, lock_);
476 while (this->peer_done_count_ != 0)
478 condition_.wait ();
482 if (this->ec_name_.length () == 0) // we are not using a global EC
484 // Destroy the ec.
485 this->ec_->destroy ();
488 // Signal the workers.
489 this->worker_.done_ = 1;
492 void
493 Notify_Throughput::peer_done ()
495 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, lock_);
497 if (--this->peer_done_count_ == 0)
499 ACE_DEBUG ((LM_DEBUG, "calling shutdown\n"));
500 condition_.broadcast ();
504 void
505 Notify_Throughput::dump_results ()
507 ACE_Throughput_Stats throughput;
508 ACE_High_Res_Timer::global_scale_factor_type gsf =
509 ACE_High_Res_Timer::global_scale_factor ();
510 ACE_TCHAR buf[BUFSIZ];
512 for (int j = 0; j < this->consumer_count_; ++j)
514 ACE_OS::sprintf (buf, ACE_TEXT("Consumer [%02d]"), j);
516 this->consumers_[j]->dump_stats (buf, gsf);
517 this->consumers_[j]->accumulate_into (throughput);
520 ACE_DEBUG ((LM_DEBUG, "\n"));
522 ACE_Throughput_Stats suppliers;
524 for (int i = 0; i < this->supplier_count_; ++i)
526 ACE_OS::sprintf (buf, ACE_TEXT("Supplier [%02d]"), i);
528 this->suppliers_[i]->dump_stats (buf, gsf);
529 this->suppliers_[i]->accumulate_into (suppliers);
532 ACE_DEBUG ((LM_DEBUG, "\nTotals:\n"));
533 throughput.dump_results (ACE_TEXT("Notify_Consumer/totals"), gsf);
535 ACE_DEBUG ((LM_DEBUG, "\n"));
536 suppliers.dump_results (ACE_TEXT("Notify_Supplier/totals"), gsf);
539 /***************************************************************************/
542 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
544 ACE_High_Res_Timer::calibrate ();
546 Notify_Throughput events;
548 if (events.parse_args (argc, argv) == -1)
550 return 1;
555 events.init (argc, argv); //Init the Client
557 events.run_test ();
559 ACE_DEBUG ((LM_DEBUG, "Waiting for threads to exit...\n"));
560 ACE_Thread_Manager::instance ()->wait ();
561 events.dump_results();
563 ACE_DEBUG ((LM_DEBUG, "ending main...\n"));
565 catch (const CORBA::Exception& se)
567 se._tao_print_exception ("Error: ");
568 return 1;
571 return 0;
575 // ****************************************************************
577 Worker::Worker ()
578 :done_ (0)
582 void
583 Worker::orb (CORBA::ORB_ptr orb)
585 orb_ = CORBA::ORB::_duplicate (orb);
589 Worker::svc ()
591 ACE_Time_Value tv(5);
595 this->orb_->run (tv);
596 }while (!this->done_);
598 ACE_DEBUG ((LM_DEBUG, "(%P) (%t) done\n"));
600 return 0;