Merge pull request #2303 from jwillemsen/jwi-803
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Throughput / ECT_Throughput.cpp
blobcc1bc13d83727f7a070e9d33b1a40c88c3d5f7c0
1 #include "ECT_Throughput.h"
3 #include "orbsvcs/Event_Utilities.h"
4 #include "orbsvcs/Event_Service_Constants.h"
5 #include "orbsvcs/Scheduler_Factory.h"
6 #include "orbsvcs/Time_Utilities.h"
7 #include "orbsvcs/Sched/Config_Scheduler.h"
8 #include "orbsvcs/Runtime_Scheduler.h"
9 #include "orbsvcs/Event/EC_Event_Channel.h"
10 #include "orbsvcs/Event/EC_Default_Factory.h"
12 #include "tao/Timeprobe.h"
13 #include "tao/debug.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Sched_Params.h"
17 #include "ace/High_Res_Timer.h"
18 #include "ace/OS_NS_strings.h"
19 #include "ace/OS_NS_errno.h"
20 #include "ace/OS_NS_unistd.h"
21 #include <memory>
23 int
24 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
26 TAO_EC_Default_Factory::init_svcs ();
28 ECT_Throughput driver;
29 return driver.run (argc, argv);
32 // ****************************************************************
34 ECT_Throughput::ECT_Throughput ()
35 : n_consumers_ (1),
36 n_suppliers_ (1),
37 burst_count_ (10),
38 burst_size_ (100),
39 event_size_ (128),
40 burst_pause_ (100),
41 consumer_type_start_ (ACE_ES_EVENT_UNDEFINED),
42 consumer_type_count_ (1),
43 consumer_type_shift_ (0),
44 supplier_type_start_ (ACE_ES_EVENT_UNDEFINED),
45 supplier_type_count_ (1),
46 supplier_type_shift_ (0),
47 pid_file_name_ (0),
48 active_count_ (0),
49 ec_concurrency_hwm_ (1),
50 thr_create_flags_ (THR_NEW_LWP|THR_BOUND|THR_SCHED_FIFO)
54 ECT_Throughput::~ECT_Throughput ()
58 int
59 ECT_Throughput::run (int argc, ACE_TCHAR* argv[])
61 try
63 // Calibrate the high resolution timer *before* starting the
64 // test.
65 ACE_High_Res_Timer::calibrate ();
67 this->orb_ =
68 CORBA::ORB_init (argc, argv);
70 CORBA::Object_var poa_object =
71 this->orb_->resolve_initial_references("RootPOA");
73 if (CORBA::is_nil (poa_object.in ()))
74 ACE_ERROR_RETURN ((LM_ERROR,
75 " (%P|%t) Unable to initialize the POA.\n"),
76 1);
78 PortableServer::POA_var root_poa =
79 PortableServer::POA::_narrow (poa_object.in ());
81 PortableServer::POAManager_var poa_manager =
82 root_poa->the_POAManager ();
84 poa_manager->activate ();
86 if (this->parse_args (argc, argv))
87 return 1;
89 if (TAO_debug_level > 0)
91 ACE_DEBUG ((LM_DEBUG,
92 "Execution parameters:\n"
93 " consumers = <%d>\n"
94 " suppliers = <%d>\n"
95 " burst count = <%d>\n"
96 " burst size = <%d>\n"
97 " event size = <%d>\n"
98 " burst pause = <%d>\n"
99 " consumer type start = <%d>\n"
100 " consumer type count = <%d>\n"
101 " consumer type shift = <%d>\n"
102 " supplier type start = <%d>\n"
103 " supplier type count = <%d>\n"
104 " supplier type shift = <%d>\n"
105 " pid file name = <%s>\n"
106 " concurrency HWM = <%d>\n",
108 this->n_consumers_,
109 this->n_suppliers_,
110 this->burst_count_,
111 this->burst_size_,
112 this->event_size_,
113 this->burst_pause_,
114 this->consumer_type_start_,
115 this->consumer_type_count_,
116 this->consumer_type_shift_,
117 this->supplier_type_start_,
118 this->supplier_type_count_,
119 this->supplier_type_shift_,
121 this->pid_file_name_?this->pid_file_name_:ACE_TEXT("nil"),
122 this->ec_concurrency_hwm_));
125 if (this->pid_file_name_ != 0)
127 FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w");
128 if (pid != 0)
130 ACE_OS::fprintf (pid, "%ld\n",
131 static_cast<long> (ACE_OS::getpid ()));
132 ACE_OS::fclose (pid);
136 int priority =
137 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
138 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
139 priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, priority);
141 // Enable FIFO scheduling
142 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
143 priority,
144 ACE_SCOPE_PROCESS)) != 0)
146 if (ACE_OS::last_error () == EPERM)
148 ACE_DEBUG ((LM_DEBUG,
149 "%s: user is not superuser, "
150 "so remain in time-sharing class\n", argv[0]));
151 this->thr_create_flags_ = THR_NEW_LWP;
153 else
154 ACE_ERROR ((LM_ERROR,
155 "%s: ACE_OS::sched_params failed\n", argv[0]));
158 if (ACE_OS::thr_setprio (priority) == -1)
160 ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed,"
161 "no real-time features\n"));
164 #if 1
165 ACE_Config_Scheduler scheduler_impl;
166 #else
167 #include "ECT_Scheduler_Info.h"
168 ACE_Runtime_Scheduler scheduler_impl (
169 runtime_configs_size,
170 runtime_configs,
171 runtime_infos_size,
172 runtime_infos);
173 #endif
174 RtecScheduler::Scheduler_var scheduler =
175 scheduler_impl._this ();
177 #if 0
178 CORBA::Object_var naming_obj =
179 this->orb_->resolve_initial_references ("NameService");
181 if (CORBA::is_nil (naming_obj.in ()))
182 ACE_ERROR_RETURN ((LM_ERROR,
183 " (%P|%t) Unable to get the Naming Service.\n"),
186 CosNaming::NamingContext_var naming_context =
187 CosNaming::NamingContext::_narrow (naming_obj.in ());
189 // This is the name we (potentially) register the Scheduling
190 // Service in the Naming Service.
191 CosNaming::Name schedule_name (1);
192 schedule_name.length (1);
193 schedule_name[0].id = CORBA::string_dup ("ScheduleService");
195 CORBA::String_var str =
196 this->orb_->object_to_string (scheduler.in ());
197 ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n",
198 str.in ()));
200 // Register the servant with the Naming Context....
201 naming_context->rebind (schedule_name, scheduler.in ());
203 ACE_Scheduler_Factory::use_config (naming_context.in ());
204 #endif /* 0 */
206 TAO_EC_Event_Channel_Attributes attr (root_poa.in (),
207 root_poa.in ());
209 TAO_EC_Event_Channel *ec =
210 new TAO_EC_Event_Channel (attr);
212 ec->activate ();
214 std::unique_ptr<POA_RtecEventChannelAdmin::EventChannel> ec_impl (ec);
216 RtecEventChannelAdmin::EventChannel_var channel =
217 ec_impl->_this ();
219 this->connect_consumers (scheduler.in (),
220 channel.in ());
222 ACE_DEBUG ((LM_DEBUG, "connected consumer(s)\n"));
224 this->connect_suppliers (scheduler.in (),
225 channel.in ());
227 ACE_DEBUG ((LM_DEBUG, "connected supplier(s)\n"));
229 this->activate_suppliers ();
231 ACE_DEBUG ((LM_DEBUG, "suppliers are active\n"));
233 // Wait for the supplier threads...
234 if (ACE_Thread_Manager::instance ()->wait () == -1)
236 ACE_ERROR ((LM_ERROR, "Thread_Manager wait failed\n"));
237 return 1;
240 ACE_DEBUG ((LM_DEBUG, "suppliers finished\n"));
242 this->dump_results ();
244 this->disconnect_consumers ();
246 ACE_DEBUG ((LM_DEBUG, "consumers disconnected\n"));
248 this->disconnect_suppliers ();
250 ACE_DEBUG ((LM_DEBUG, "suppliers disconnected\n"));
252 channel->destroy ();
254 ACE_DEBUG ((LM_DEBUG, "channel destroyed\n"));
257 // Deactivate the EC
258 PortableServer::POA_var poa =
259 ec_impl->_default_POA ();
260 PortableServer::ObjectId_var id =
261 poa->servant_to_id (ec_impl.get ());
262 poa->deactivate_object (id.in ());
264 ACE_DEBUG ((LM_DEBUG, "EC deactivated\n"));
268 // Deactivate the Scheduler
269 PortableServer::POA_var poa =
270 scheduler_impl._default_POA ();
271 PortableServer::ObjectId_var id =
272 poa->servant_to_id (&scheduler_impl);
273 poa->deactivate_object (id.in ());
275 ACE_DEBUG ((LM_DEBUG, "scheduler deactivated\n"));
278 catch (const CORBA::Exception& ex)
280 ex._tao_print_exception ("ECT_Throughput::run");
282 catch (...)
284 ACE_ERROR ((LM_ERROR, "non-corba exception raised\n"));
287 return 0;
290 void
291 ECT_Throughput::shutdown_consumer (void*)
293 // int ID =
294 // (reinterpret_cast<Test_Consumer**> (consumer_cookie)
295 // - this->consumers_);
297 // ACE_DEBUG ((LM_DEBUG, "(%t) events received by consumer %d\n", ID));
299 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
300 this->active_count_--;
301 if (this->active_count_ <= 0)
303 ACE_DEBUG ((LM_DEBUG,
304 "(%t) shutting down the ORB\n"));
305 // Not needed: this->orb_->shutdown (false);
309 void
310 ECT_Throughput::connect_consumers
311 (RtecScheduler::Scheduler_ptr scheduler,
312 RtecEventChannelAdmin::EventChannel_ptr channel)
315 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
316 this->active_count_ = this->n_consumers_;
318 for (int i = 0; i < this->n_consumers_; ++i)
320 char buf[BUFSIZ];
321 ACE_OS::sprintf (buf, "consumer_%02d", i);
323 ACE_NEW (this->consumers_[i],
324 Test_Consumer (this,
325 this->consumers_ + i,
326 this->n_suppliers_));
328 int start = this->consumer_type_start_
329 + i * this->consumer_type_shift_;
331 this->consumers_[i]->connect (scheduler,
332 buf,
333 start,
334 this->consumer_type_count_,
335 channel);
339 void
340 ECT_Throughput::connect_suppliers
341 (RtecScheduler::Scheduler_ptr scheduler,
342 RtecEventChannelAdmin::EventChannel_ptr channel)
344 for (int i = 0; i < this->n_suppliers_; ++i)
346 char buf[BUFSIZ];
347 ACE_OS::sprintf (buf, "supplier_%02d", i);
349 ACE_NEW (this->suppliers_[i], Test_Supplier (this));
351 int start = this->supplier_type_start_ + i*this->supplier_type_shift_;
352 this->suppliers_[i]->connect (scheduler,
353 buf,
354 this->burst_count_,
355 this->burst_size_,
356 this->event_size_,
357 this->burst_pause_,
358 start,
359 this->supplier_type_count_,
360 channel);
364 void
365 ECT_Throughput::activate_suppliers ()
367 int priority =
368 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
369 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
371 for (int i = 0; i < this->n_suppliers_; ++i)
373 if (this->suppliers_[i]->activate (this->thr_create_flags_,
374 1, 0, priority) == -1)
376 ACE_ERROR ((LM_ERROR,
377 "Cannot activate thread for supplier %d\n",
378 i));
383 void
384 ECT_Throughput::disconnect_suppliers ()
386 for (int i = 0; i < this->n_suppliers_; ++i)
388 this->suppliers_[i]->disconnect ();
392 void
393 ECT_Throughput::disconnect_consumers ()
395 for (int i = 0; i < this->n_consumers_; ++i)
397 this->consumers_[i]->disconnect ();
401 void
402 ECT_Throughput::dump_results ()
404 ACE_High_Res_Timer::global_scale_factor_type gsf =
405 ACE_High_Res_Timer::global_scale_factor ();
407 ACE_Throughput_Stats consumers;
408 for (int j = 0; j < this->n_consumers_; ++j)
410 ACE_TCHAR buf[BUFSIZ];
411 ACE_OS::sprintf (buf, ACE_TEXT("consumer_%02d"), j);
413 this->consumers_[j]->dump_results (buf, gsf);
414 this->consumers_[j]->accumulate (consumers);
416 consumers.dump_results (ACE_TEXT("ECT_Consumer/totals"), gsf);
418 ACE_Throughput_Stats suppliers;
419 for (int i = 0; i < this->n_suppliers_; ++i)
421 ACE_TCHAR buf[BUFSIZ];
422 ACE_OS::sprintf (buf, ACE_TEXT("supplier_%02d"), i);
424 this->suppliers_[i]->dump_results (buf, gsf);
425 this->suppliers_[i]->accumulate (suppliers);
427 suppliers.dump_results (ACE_TEXT("ECT_Supplier/totals"), gsf);
431 ECT_Throughput::parse_args (int argc, ACE_TCHAR *argv [])
433 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("dc:s:u:n:t:b:h:l:p:w:"));
434 int opt;
436 while ((opt = get_opt ()) != EOF)
438 switch (opt)
440 case 'c':
441 this->n_consumers_ = ACE_OS::atoi (get_opt.opt_arg ());
442 break;
444 case 's':
445 this->n_suppliers_ = ACE_OS::atoi (get_opt.opt_arg ());
446 break;
448 case 'u':
449 this->burst_count_ = ACE_OS::atoi (get_opt.opt_arg ());
450 break;
452 case 'n':
453 this->burst_size_ = ACE_OS::atoi (get_opt.opt_arg ());
454 break;
456 case 'b':
457 this->event_size_ = ACE_OS::atoi (get_opt.opt_arg ());
458 break;
460 case 't':
461 this->burst_pause_ = ACE_OS::atoi (get_opt.opt_arg ());
462 break;
464 case 'h':
466 char* aux;
467 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
469 this->consumer_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
470 arg = ACE_OS::strtok_r (0, ",", &aux);
471 this->consumer_type_count_ = ACE_OS::atoi (arg);
472 arg = ACE_OS::strtok_r (0, ",", &aux);
473 this->consumer_type_shift_ = ACE_OS::atoi (arg);
475 break;
477 case 'l':
479 char* aux;
480 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
482 this->supplier_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
483 arg = ACE_OS::strtok_r (0, ",", &aux);
484 this->supplier_type_count_ = ACE_OS::atoi (arg);
485 arg = ACE_OS::strtok_r (0, ",", &aux);
486 this->supplier_type_shift_ = ACE_OS::atoi (arg);
488 break;
490 case 'p':
491 this->pid_file_name_ = get_opt.opt_arg ();
492 break;
494 case 'w':
495 this->ec_concurrency_hwm_ = ACE_OS::atoi (get_opt.opt_arg ());
496 break;
498 case '?':
499 default:
500 ACE_DEBUG ((LM_DEBUG,
501 "Usage: %s "
502 "[ORB options] "
503 "-r -d -x "
504 "-c <n_consumers> "
505 "-s <n_suppliers> "
506 "-u <burst count> "
507 "-n <burst size> "
508 "-b <event payload size> "
509 "-t <burst pause (usecs)> "
510 "-h <consumer_start,consumer_count,consumer_shift> "
511 "-l <supplier_start,supplier_count,supplier_shift> "
512 "-p <pid file name> "
513 "-w <concurrency HWM> "
514 "-r "
515 "\n",
516 argv[0]));
517 return -1;
521 if (this->burst_count_ <= 0)
523 ACE_DEBUG ((LM_DEBUG,
524 "%s: burst count (%d) is out of range, "
525 "reset to default (%d)\n",
526 argv[0], this->burst_count_,
527 100));
528 this->burst_count_ = 100;
531 if (this->burst_size_ <= 0)
533 ACE_DEBUG ((LM_DEBUG,
534 "%s: burst size (%d) is out of range, "
535 "reset to default (%d)\n",
536 argv[0], this->burst_size_,
537 10));
538 this->burst_size_ = 10;
541 if (this->event_size_ < 0)
543 ACE_DEBUG ((LM_DEBUG,
544 "%s: event size (%d) is out of range, "
545 "reseting to default (%d)\n",
546 argv[0], this->event_size_,
547 128));
548 this->event_size_ = 128;
551 if (this->n_consumers_ < 0
552 || this->n_consumers_ >= ECT_Throughput::MAX_CONSUMERS)
554 this->n_consumers_ = 1;
555 ACE_ERROR_RETURN ((LM_ERROR,
556 "%s: number of consumers or "
557 "suppliers out of range, "
558 "reset to default (%d)\n",
559 argv[0], 1), -1);
562 if (this->n_suppliers_ < 0
563 || this->n_suppliers_ >= ECT_Throughput::MAX_SUPPLIERS)
565 this->n_suppliers_ = 1;
566 ACE_ERROR_RETURN ((LM_ERROR,
567 "%s: number of suppliers out of range, "
568 "reset to default (%d)\n",
569 argv[0], 1), -1);
572 if (this->n_suppliers_ == 0 && this->n_consumers_ == 0)
574 this->n_suppliers_ = 1;
575 this->n_consumers_ = 1;
576 ACE_ERROR_RETURN ((LM_ERROR,
577 "%s: no suppliers or consumers, "
578 "reset to default (%d of each)\n",
579 argv[0], 1), -1);
582 if (this->ec_concurrency_hwm_ <= 0)
584 this->ec_concurrency_hwm_ = 1;
585 ACE_ERROR_RETURN ((LM_ERROR,
586 "%s: invalid concurrency HWM, "
587 "reset to default (%d)\n",
588 argv[0], 1), -1);
591 return 0;