Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Throughput / ECT_Throughput.cpp
blob315d6b03e94147d98daf900ee19ea5b85af1fc23
1 #include "ECT_Throughput.h"
3 #include "orbsvcs/Event_Utilities.h"
4 #include "orbsvcs/Event_Service_Constants.h"
5 #include "orbsvcs/Scheduler_Factory.h"
6 #include "orbsvcs/Time_Utilities.h"
7 #include "orbsvcs/Sched/Config_Scheduler.h"
8 #include "orbsvcs/Runtime_Scheduler.h"
9 #include "orbsvcs/Event/EC_Event_Channel.h"
10 #include "orbsvcs/Event/EC_Default_Factory.h"
12 #include "tao/Timeprobe.h"
13 #include "tao/debug.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Auto_Ptr.h"
17 #include "ace/Sched_Params.h"
18 #include "ace/High_Res_Timer.h"
19 #include "ace/OS_NS_strings.h"
20 #include "ace/OS_NS_errno.h"
21 #include "ace/OS_NS_unistd.h"
23 int
24 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
26 TAO_EC_Default_Factory::init_svcs ();
28 ECT_Throughput driver;
29 return driver.run (argc, argv);
32 // ****************************************************************
34 ECT_Throughput::ECT_Throughput (void)
35 : n_consumers_ (1),
36 n_suppliers_ (1),
37 burst_count_ (10),
38 burst_size_ (100),
39 event_size_ (128),
40 burst_pause_ (100),
41 consumer_type_start_ (ACE_ES_EVENT_UNDEFINED),
42 consumer_type_count_ (1),
43 consumer_type_shift_ (0),
44 supplier_type_start_ (ACE_ES_EVENT_UNDEFINED),
45 supplier_type_count_ (1),
46 supplier_type_shift_ (0),
47 pid_file_name_ (0),
48 active_count_ (0),
49 ec_concurrency_hwm_ (1),
50 thr_create_flags_ (THR_NEW_LWP|THR_BOUND|THR_SCHED_FIFO)
54 ECT_Throughput::~ECT_Throughput (void)
58 int
59 ECT_Throughput::run (int argc, ACE_TCHAR* argv[])
61 try
63 // Calibrate the high resolution timer *before* starting the
64 // test.
65 ACE_High_Res_Timer::calibrate ();
67 this->orb_ =
68 CORBA::ORB_init (argc, argv);
70 CORBA::Object_var poa_object =
71 this->orb_->resolve_initial_references("RootPOA");
73 if (CORBA::is_nil (poa_object.in ()))
74 ACE_ERROR_RETURN ((LM_ERROR,
75 " (%P|%t) Unable to initialize the POA.\n"),
76 1);
78 PortableServer::POA_var root_poa =
79 PortableServer::POA::_narrow (poa_object.in ());
81 PortableServer::POAManager_var poa_manager =
82 root_poa->the_POAManager ();
84 poa_manager->activate ();
86 if (this->parse_args (argc, argv))
87 return 1;
89 if (TAO_debug_level > 0)
91 ACE_DEBUG ((LM_DEBUG,
92 "Execution parameters:\n"
93 " consumers = <%d>\n"
94 " suppliers = <%d>\n"
95 " burst count = <%d>\n"
96 " burst size = <%d>\n"
97 " event size = <%d>\n"
98 " burst pause = <%d>\n"
99 " consumer type start = <%d>\n"
100 " consumer type count = <%d>\n"
101 " consumer type shift = <%d>\n"
102 " supplier type start = <%d>\n"
103 " supplier type count = <%d>\n"
104 " supplier type shift = <%d>\n"
105 " pid file name = <%s>\n"
106 " concurrency HWM = <%d>\n",
108 this->n_consumers_,
109 this->n_suppliers_,
110 this->burst_count_,
111 this->burst_size_,
112 this->event_size_,
113 this->burst_pause_,
114 this->consumer_type_start_,
115 this->consumer_type_count_,
116 this->consumer_type_shift_,
117 this->supplier_type_start_,
118 this->supplier_type_count_,
119 this->supplier_type_shift_,
121 this->pid_file_name_?this->pid_file_name_:ACE_TEXT("nil"),
122 this->ec_concurrency_hwm_
123 ) );
126 if (this->pid_file_name_ != 0)
128 FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w");
129 if (pid != 0)
131 ACE_OS::fprintf (pid, "%ld\n",
132 static_cast<long> (ACE_OS::getpid ()));
133 ACE_OS::fclose (pid);
137 int priority =
138 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
139 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
140 priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO,
141 priority);
142 // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
144 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
145 priority,
146 ACE_SCOPE_PROCESS)) != 0)
148 if (ACE_OS::last_error () == EPERM)
150 ACE_DEBUG ((LM_DEBUG,
151 "%s: user is not superuser, "
152 "so remain in time-sharing class\n", argv[0]));
153 this->thr_create_flags_ = THR_NEW_LWP;
155 else
156 ACE_ERROR ((LM_ERROR,
157 "%s: ACE_OS::sched_params failed\n", argv[0]));
160 if (ACE_OS::thr_setprio (priority) == -1)
162 ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed,"
163 "no real-time features\n"));
166 #if 1
167 ACE_Config_Scheduler scheduler_impl;
168 #else
169 #include "ECT_Scheduler_Info.h"
170 ACE_Runtime_Scheduler scheduler_impl (
171 runtime_configs_size,
172 runtime_configs,
173 runtime_infos_size,
174 runtime_infos);
175 #endif
176 RtecScheduler::Scheduler_var scheduler =
177 scheduler_impl._this ();
179 #if 0
180 CORBA::Object_var naming_obj =
181 this->orb_->resolve_initial_references ("NameService");
183 if (CORBA::is_nil (naming_obj.in ()))
184 ACE_ERROR_RETURN ((LM_ERROR,
185 " (%P|%t) Unable to get the Naming Service.\n"),
188 CosNaming::NamingContext_var naming_context =
189 CosNaming::NamingContext::_narrow (naming_obj.in ());
191 // This is the name we (potentially) register the Scheduling
192 // Service in the Naming Service.
193 CosNaming::Name schedule_name (1);
194 schedule_name.length (1);
195 schedule_name[0].id = CORBA::string_dup ("ScheduleService");
197 CORBA::String_var str =
198 this->orb_->object_to_string (scheduler.in ());
199 ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n",
200 str.in ()));
202 // Register the servant with the Naming Context....
203 naming_context->rebind (schedule_name, scheduler.in ());
205 ACE_Scheduler_Factory::use_config (naming_context.in ());
206 #endif /* 0 */
208 auto_ptr<POA_RtecEventChannelAdmin::EventChannel> ec_impl;
210 TAO_EC_Event_Channel_Attributes attr (root_poa.in (),
211 root_poa.in ());
213 TAO_EC_Event_Channel *ec =
214 new TAO_EC_Event_Channel (attr);
216 ec->activate ();
218 auto_ptr<POA_RtecEventChannelAdmin::EventChannel> auto_ec_impl (ec);
219 ec_impl = auto_ec_impl;
221 RtecEventChannelAdmin::EventChannel_var channel =
222 ec_impl->_this ();
224 this->connect_consumers (scheduler.in (),
225 channel.in ());
227 ACE_DEBUG ((LM_DEBUG, "connected consumer(s)\n"));
229 this->connect_suppliers (scheduler.in (),
230 channel.in ());
232 ACE_DEBUG ((LM_DEBUG, "connected supplier(s)\n"));
234 this->activate_suppliers ();
236 ACE_DEBUG ((LM_DEBUG, "suppliers are active\n"));
238 // Wait for the supplier threads...
239 if (ACE_Thread_Manager::instance ()->wait () == -1)
241 ACE_ERROR ((LM_ERROR, "Thread_Manager wait failed\n"));
242 return 1;
245 ACE_DEBUG ((LM_DEBUG, "suppliers finished\n"));
247 this->dump_results ();
249 this->disconnect_consumers ();
251 ACE_DEBUG ((LM_DEBUG, "consumers disconnected\n"));
253 this->disconnect_suppliers ();
255 ACE_DEBUG ((LM_DEBUG, "suppliers disconnected\n"));
257 channel->destroy ();
259 ACE_DEBUG ((LM_DEBUG, "channel destroyed\n"));
262 // Deactivate the EC
263 PortableServer::POA_var poa =
264 ec_impl->_default_POA ();
265 PortableServer::ObjectId_var id =
266 poa->servant_to_id (ec_impl.get ());
267 poa->deactivate_object (id.in ());
269 ACE_DEBUG ((LM_DEBUG, "EC deactivated\n"));
273 // Deactivate the Scheduler
274 PortableServer::POA_var poa =
275 scheduler_impl._default_POA ();
276 PortableServer::ObjectId_var id =
277 poa->servant_to_id (&scheduler_impl);
278 poa->deactivate_object (id.in ());
280 ACE_DEBUG ((LM_DEBUG, "scheduler deactivated\n"));
283 catch (const CORBA::Exception& ex)
285 ex._tao_print_exception ("ECT_Throughput::run");
287 catch (...)
289 ACE_ERROR ((LM_ERROR, "non-corba exception raised\n"));
292 return 0;
295 void
296 ECT_Throughput::shutdown_consumer (void*)
298 // int ID =
299 // (reinterpret_cast<Test_Consumer**> (consumer_cookie)
300 // - this->consumers_);
302 // ACE_DEBUG ((LM_DEBUG, "(%t) events received by consumer %d\n", ID));
304 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
305 this->active_count_--;
306 if (this->active_count_ <= 0)
308 ACE_DEBUG ((LM_DEBUG,
309 "(%t) shutting down the ORB\n"));
310 // Not needed: this->orb_->shutdown (0);
314 void
315 ECT_Throughput::connect_consumers
316 (RtecScheduler::Scheduler_ptr scheduler,
317 RtecEventChannelAdmin::EventChannel_ptr channel)
320 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
321 this->active_count_ = this->n_consumers_;
323 for (int i = 0; i < this->n_consumers_; ++i)
325 char buf[BUFSIZ];
326 ACE_OS::sprintf (buf, "consumer_%02d", i);
328 ACE_NEW (this->consumers_[i],
329 Test_Consumer (this,
330 this->consumers_ + i,
331 this->n_suppliers_));
333 int start = this->consumer_type_start_
334 + i * this->consumer_type_shift_;
336 this->consumers_[i]->connect (scheduler,
337 buf,
338 start,
339 this->consumer_type_count_,
340 channel);
344 void
345 ECT_Throughput::connect_suppliers
346 (RtecScheduler::Scheduler_ptr scheduler,
347 RtecEventChannelAdmin::EventChannel_ptr channel)
349 for (int i = 0; i < this->n_suppliers_; ++i)
351 char buf[BUFSIZ];
352 ACE_OS::sprintf (buf, "supplier_%02d", i);
354 ACE_NEW (this->suppliers_[i], Test_Supplier (this));
356 int start = this->supplier_type_start_ + i*this->supplier_type_shift_;
357 this->suppliers_[i]->connect (scheduler,
358 buf,
359 this->burst_count_,
360 this->burst_size_,
361 this->event_size_,
362 this->burst_pause_,
363 start,
364 this->supplier_type_count_,
365 channel);
369 void
370 ECT_Throughput::activate_suppliers (void)
372 int priority =
373 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
374 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
376 for (int i = 0; i < this->n_suppliers_; ++i)
378 if (this->suppliers_[i]->activate (this->thr_create_flags_,
379 1, 0, priority) == -1)
381 ACE_ERROR ((LM_ERROR,
382 "Cannot activate thread for supplier %d\n",
383 i));
388 void
389 ECT_Throughput::disconnect_suppliers (void)
391 for (int i = 0; i < this->n_suppliers_; ++i)
393 this->suppliers_[i]->disconnect ();
397 void
398 ECT_Throughput::disconnect_consumers (void)
400 for (int i = 0; i < this->n_consumers_; ++i)
402 this->consumers_[i]->disconnect ();
406 void
407 ECT_Throughput::dump_results (void)
409 ACE_High_Res_Timer::global_scale_factor_type gsf =
410 ACE_High_Res_Timer::global_scale_factor ();
412 ACE_Throughput_Stats consumers;
413 for (int j = 0; j < this->n_consumers_; ++j)
415 ACE_TCHAR buf[BUFSIZ];
416 ACE_OS::sprintf (buf, ACE_TEXT("consumer_%02d"), j);
418 this->consumers_[j]->dump_results (buf, gsf);
419 this->consumers_[j]->accumulate (consumers);
421 consumers.dump_results (ACE_TEXT("ECT_Consumer/totals"), gsf);
423 ACE_Throughput_Stats suppliers;
424 for (int i = 0; i < this->n_suppliers_; ++i)
426 ACE_TCHAR buf[BUFSIZ];
427 ACE_OS::sprintf (buf, ACE_TEXT("supplier_%02d"), i);
429 this->suppliers_[i]->dump_results (buf, gsf);
430 this->suppliers_[i]->accumulate (suppliers);
432 suppliers.dump_results (ACE_TEXT("ECT_Supplier/totals"), gsf);
436 ECT_Throughput::parse_args (int argc, ACE_TCHAR *argv [])
438 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("dc:s:u:n:t:b:h:l:p:w:"));
439 int opt;
441 while ((opt = get_opt ()) != EOF)
443 switch (opt)
445 case 'c':
446 this->n_consumers_ = ACE_OS::atoi (get_opt.opt_arg ());
447 break;
449 case 's':
450 this->n_suppliers_ = ACE_OS::atoi (get_opt.opt_arg ());
451 break;
453 case 'u':
454 this->burst_count_ = ACE_OS::atoi (get_opt.opt_arg ());
455 break;
457 case 'n':
458 this->burst_size_ = ACE_OS::atoi (get_opt.opt_arg ());
459 break;
461 case 'b':
462 this->event_size_ = ACE_OS::atoi (get_opt.opt_arg ());
463 break;
465 case 't':
466 this->burst_pause_ = ACE_OS::atoi (get_opt.opt_arg ());
467 break;
469 case 'h':
471 char* aux;
472 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
474 this->consumer_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
475 arg = ACE_OS::strtok_r (0, ",", &aux);
476 this->consumer_type_count_ = ACE_OS::atoi (arg);
477 arg = ACE_OS::strtok_r (0, ",", &aux);
478 this->consumer_type_shift_ = ACE_OS::atoi (arg);
480 break;
482 case 'l':
484 char* aux;
485 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
487 this->supplier_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
488 arg = ACE_OS::strtok_r (0, ",", &aux);
489 this->supplier_type_count_ = ACE_OS::atoi (arg);
490 arg = ACE_OS::strtok_r (0, ",", &aux);
491 this->supplier_type_shift_ = ACE_OS::atoi (arg);
493 break;
495 case 'p':
496 this->pid_file_name_ = get_opt.opt_arg ();
497 break;
499 case 'w':
500 this->ec_concurrency_hwm_ = ACE_OS::atoi (get_opt.opt_arg ());
501 break;
503 case '?':
504 default:
505 ACE_DEBUG ((LM_DEBUG,
506 "Usage: %s "
507 "[ORB options] "
508 "-r -d -x "
509 "-c <n_consumers> "
510 "-s <n_suppliers> "
511 "-u <burst count> "
512 "-n <burst size> "
513 "-b <event payload size> "
514 "-t <burst pause (usecs)> "
515 "-h <consumer_start,consumer_count,consumer_shift> "
516 "-l <supplier_start,supplier_count,supplier_shift> "
517 "-p <pid file name> "
518 "-w <concurrency HWM> "
519 "-r "
520 "\n",
521 argv[0]));
522 return -1;
526 if (this->burst_count_ <= 0)
528 ACE_DEBUG ((LM_DEBUG,
529 "%s: burst count (%d) is out of range, "
530 "reset to default (%d)\n",
531 argv[0], this->burst_count_,
532 100));
533 this->burst_count_ = 100;
536 if (this->burst_size_ <= 0)
538 ACE_DEBUG ((LM_DEBUG,
539 "%s: burst size (%d) is out of range, "
540 "reset to default (%d)\n",
541 argv[0], this->burst_size_,
542 10));
543 this->burst_size_ = 10;
546 if (this->event_size_ < 0)
548 ACE_DEBUG ((LM_DEBUG,
549 "%s: event size (%d) is out of range, "
550 "reseting to default (%d)\n",
551 argv[0], this->event_size_,
552 128));
553 this->event_size_ = 128;
556 if (this->n_consumers_ < 0
557 || this->n_consumers_ >= ECT_Throughput::MAX_CONSUMERS)
559 this->n_consumers_ = 1;
560 ACE_ERROR_RETURN ((LM_ERROR,
561 "%s: number of consumers or "
562 "suppliers out of range, "
563 "reset to default (%d)\n",
564 argv[0], 1), -1);
567 if (this->n_suppliers_ < 0
568 || this->n_suppliers_ >= ECT_Throughput::MAX_SUPPLIERS)
570 this->n_suppliers_ = 1;
571 ACE_ERROR_RETURN ((LM_ERROR,
572 "%s: number of suppliers out of range, "
573 "reset to default (%d)\n",
574 argv[0], 1), -1);
577 if (this->n_suppliers_ == 0 && this->n_consumers_ == 0)
579 this->n_suppliers_ = 1;
580 this->n_consumers_ = 1;
581 ACE_ERROR_RETURN ((LM_ERROR,
582 "%s: no suppliers or consumers, "
583 "reset to default (%d of each)\n",
584 argv[0], 1), -1);
587 if (this->ec_concurrency_hwm_ <= 0)
589 this->ec_concurrency_hwm_ = 1;
590 ACE_ERROR_RETURN ((LM_ERROR,
591 "%s: invalid concurrency HWM, "
592 "reset to default (%d)\n",
593 argv[0], 1), -1);
596 return 0;