1 #include "ECT_Throughput.h"
3 #include "orbsvcs/Event_Utilities.h"
4 #include "orbsvcs/Event_Service_Constants.h"
5 #include "orbsvcs/Scheduler_Factory.h"
6 #include "orbsvcs/Time_Utilities.h"
7 #include "orbsvcs/Sched/Config_Scheduler.h"
8 #include "orbsvcs/Runtime_Scheduler.h"
9 #include "orbsvcs/Event/EC_Event_Channel.h"
10 #include "orbsvcs/Event/EC_Default_Factory.h"
12 #include "tao/Timeprobe.h"
13 #include "tao/debug.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Sched_Params.h"
17 #include "ace/High_Res_Timer.h"
18 #include "ace/OS_NS_strings.h"
19 #include "ace/OS_NS_errno.h"
20 #include "ace/OS_NS_unistd.h"
24 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
26 TAO_EC_Default_Factory::init_svcs ();
28 ECT_Throughput driver
;
29 return driver
.run (argc
, argv
);
32 // ****************************************************************
34 ECT_Throughput::ECT_Throughput ()
41 consumer_type_start_ (ACE_ES_EVENT_UNDEFINED
),
42 consumer_type_count_ (1),
43 consumer_type_shift_ (0),
44 supplier_type_start_ (ACE_ES_EVENT_UNDEFINED
),
45 supplier_type_count_ (1),
46 supplier_type_shift_ (0),
49 ec_concurrency_hwm_ (1),
50 thr_create_flags_ (THR_NEW_LWP
|THR_BOUND
|THR_SCHED_FIFO
)
54 ECT_Throughput::~ECT_Throughput ()
59 ECT_Throughput::run (int argc
, ACE_TCHAR
* argv
[])
63 // Calibrate the high resolution timer *before* starting the
65 ACE_High_Res_Timer::calibrate ();
68 CORBA::ORB_init (argc
, argv
);
70 CORBA::Object_var poa_object
=
71 this->orb_
->resolve_initial_references("RootPOA");
73 if (CORBA::is_nil (poa_object
.in ()))
74 ACE_ERROR_RETURN ((LM_ERROR
,
75 " (%P|%t) Unable to initialize the POA.\n"),
78 PortableServer::POA_var root_poa
=
79 PortableServer::POA::_narrow (poa_object
.in ());
81 PortableServer::POAManager_var poa_manager
=
82 root_poa
->the_POAManager ();
84 poa_manager
->activate ();
86 if (this->parse_args (argc
, argv
))
89 if (TAO_debug_level
> 0)
92 "Execution parameters:\n"
95 " burst count = <%d>\n"
96 " burst size = <%d>\n"
97 " event size = <%d>\n"
98 " burst pause = <%d>\n"
99 " consumer type start = <%d>\n"
100 " consumer type count = <%d>\n"
101 " consumer type shift = <%d>\n"
102 " supplier type start = <%d>\n"
103 " supplier type count = <%d>\n"
104 " supplier type shift = <%d>\n"
105 " pid file name = <%s>\n"
106 " concurrency HWM = <%d>\n",
114 this->consumer_type_start_
,
115 this->consumer_type_count_
,
116 this->consumer_type_shift_
,
117 this->supplier_type_start_
,
118 this->supplier_type_count_
,
119 this->supplier_type_shift_
,
121 this->pid_file_name_
?this->pid_file_name_
:ACE_TEXT("nil"),
122 this->ec_concurrency_hwm_
));
125 if (this->pid_file_name_
!= 0)
127 FILE* pid
= ACE_OS::fopen (this->pid_file_name_
, "w");
130 ACE_OS::fprintf (pid
, "%ld\n",
131 static_cast<long> (ACE_OS::getpid ()));
132 ACE_OS::fclose (pid
);
137 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
)
138 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO
)) / 2;
139 priority
= ACE_Sched_Params::next_priority (ACE_SCHED_FIFO
, priority
);
141 // Enable FIFO scheduling
142 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
144 ACE_SCOPE_PROCESS
)) != 0)
146 if (ACE_OS::last_error () == EPERM
)
148 ACE_DEBUG ((LM_DEBUG
,
149 "%s: user is not superuser, "
150 "so remain in time-sharing class\n", argv
[0]));
151 this->thr_create_flags_
= THR_NEW_LWP
;
154 ACE_ERROR ((LM_ERROR
,
155 "%s: ACE_OS::sched_params failed\n", argv
[0]));
158 if (ACE_OS::thr_setprio (priority
) == -1)
160 ACE_ERROR ((LM_ERROR
, "(%P|%t) main thr_setprio failed,"
161 "no real-time features\n"));
165 ACE_Config_Scheduler scheduler_impl
;
167 #include "ECT_Scheduler_Info.h"
168 ACE_Runtime_Scheduler
scheduler_impl (
169 runtime_configs_size
,
174 RtecScheduler::Scheduler_var scheduler
=
175 scheduler_impl
._this ();
178 CORBA::Object_var naming_obj
=
179 this->orb_
->resolve_initial_references ("NameService");
181 if (CORBA::is_nil (naming_obj
.in ()))
182 ACE_ERROR_RETURN ((LM_ERROR
,
183 " (%P|%t) Unable to get the Naming Service.\n"),
186 CosNaming::NamingContext_var naming_context
=
187 CosNaming::NamingContext::_narrow (naming_obj
.in ());
189 // This is the name we (potentially) register the Scheduling
190 // Service in the Naming Service.
191 CosNaming::Name
schedule_name (1);
192 schedule_name
.length (1);
193 schedule_name
[0].id
= CORBA::string_dup ("ScheduleService");
195 CORBA::String_var str
=
196 this->orb_
->object_to_string (scheduler
.in ());
197 ACE_DEBUG ((LM_DEBUG
, "The (local) scheduler IOR is <%s>\n",
200 // Register the servant with the Naming Context....
201 naming_context
->rebind (schedule_name
, scheduler
.in ());
203 ACE_Scheduler_Factory::use_config (naming_context
.in ());
206 TAO_EC_Event_Channel_Attributes
attr (root_poa
.in (),
209 TAO_EC_Event_Channel
*ec
=
210 new TAO_EC_Event_Channel (attr
);
214 std::unique_ptr
<POA_RtecEventChannelAdmin::EventChannel
> ec_impl (ec
);
216 RtecEventChannelAdmin::EventChannel_var channel
=
219 this->connect_consumers (scheduler
.in (),
222 ACE_DEBUG ((LM_DEBUG
, "connected consumer(s)\n"));
224 this->connect_suppliers (scheduler
.in (),
227 ACE_DEBUG ((LM_DEBUG
, "connected supplier(s)\n"));
229 this->activate_suppliers ();
231 ACE_DEBUG ((LM_DEBUG
, "suppliers are active\n"));
233 // Wait for the supplier threads...
234 if (ACE_Thread_Manager::instance ()->wait () == -1)
236 ACE_ERROR ((LM_ERROR
, "Thread_Manager wait failed\n"));
240 ACE_DEBUG ((LM_DEBUG
, "suppliers finished\n"));
242 this->dump_results ();
244 this->disconnect_consumers ();
246 ACE_DEBUG ((LM_DEBUG
, "consumers disconnected\n"));
248 this->disconnect_suppliers ();
250 ACE_DEBUG ((LM_DEBUG
, "suppliers disconnected\n"));
254 ACE_DEBUG ((LM_DEBUG
, "channel destroyed\n"));
258 PortableServer::POA_var poa
=
259 ec_impl
->_default_POA ();
260 PortableServer::ObjectId_var id
=
261 poa
->servant_to_id (ec_impl
.get ());
262 poa
->deactivate_object (id
.in ());
264 ACE_DEBUG ((LM_DEBUG
, "EC deactivated\n"));
268 // Deactivate the Scheduler
269 PortableServer::POA_var poa
=
270 scheduler_impl
._default_POA ();
271 PortableServer::ObjectId_var id
=
272 poa
->servant_to_id (&scheduler_impl
);
273 poa
->deactivate_object (id
.in ());
275 ACE_DEBUG ((LM_DEBUG
, "scheduler deactivated\n"));
278 catch (const CORBA::Exception
& ex
)
280 ex
._tao_print_exception ("ECT_Throughput::run");
284 ACE_ERROR ((LM_ERROR
, "non-corba exception raised\n"));
291 ECT_Throughput::shutdown_consumer (void*)
294 // (reinterpret_cast<Test_Consumer**> (consumer_cookie)
295 // - this->consumers_);
297 // ACE_DEBUG ((LM_DEBUG, "(%t) events received by consumer %d\n", ID));
299 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
300 this->active_count_
--;
301 if (this->active_count_
<= 0)
303 ACE_DEBUG ((LM_DEBUG
,
304 "(%t) shutting down the ORB\n"));
305 // Not needed: this->orb_->shutdown (false);
310 ECT_Throughput::connect_consumers
311 (RtecScheduler::Scheduler_ptr scheduler
,
312 RtecEventChannelAdmin::EventChannel_ptr channel
)
315 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
316 this->active_count_
= this->n_consumers_
;
318 for (int i
= 0; i
< this->n_consumers_
; ++i
)
321 ACE_OS::sprintf (buf
, "consumer_%02d", i
);
323 ACE_NEW (this->consumers_
[i
],
325 this->consumers_
+ i
,
326 this->n_suppliers_
));
328 int start
= this->consumer_type_start_
329 + i
* this->consumer_type_shift_
;
331 this->consumers_
[i
]->connect (scheduler
,
334 this->consumer_type_count_
,
340 ECT_Throughput::connect_suppliers
341 (RtecScheduler::Scheduler_ptr scheduler
,
342 RtecEventChannelAdmin::EventChannel_ptr channel
)
344 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
347 ACE_OS::sprintf (buf
, "supplier_%02d", i
);
349 ACE_NEW (this->suppliers_
[i
], Test_Supplier (this));
351 int start
= this->supplier_type_start_
+ i
*this->supplier_type_shift_
;
352 this->suppliers_
[i
]->connect (scheduler
,
359 this->supplier_type_count_
,
365 ECT_Throughput::activate_suppliers ()
368 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
)
369 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO
)) / 2;
371 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
373 if (this->suppliers_
[i
]->activate (this->thr_create_flags_
,
374 1, 0, priority
) == -1)
376 ACE_ERROR ((LM_ERROR
,
377 "Cannot activate thread for supplier %d\n",
384 ECT_Throughput::disconnect_suppliers ()
386 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
388 this->suppliers_
[i
]->disconnect ();
393 ECT_Throughput::disconnect_consumers ()
395 for (int i
= 0; i
< this->n_consumers_
; ++i
)
397 this->consumers_
[i
]->disconnect ();
402 ECT_Throughput::dump_results ()
404 ACE_High_Res_Timer::global_scale_factor_type gsf
=
405 ACE_High_Res_Timer::global_scale_factor ();
407 ACE_Throughput_Stats consumers
;
408 for (int j
= 0; j
< this->n_consumers_
; ++j
)
410 ACE_TCHAR buf
[BUFSIZ
];
411 ACE_OS::sprintf (buf
, ACE_TEXT("consumer_%02d"), j
);
413 this->consumers_
[j
]->dump_results (buf
, gsf
);
414 this->consumers_
[j
]->accumulate (consumers
);
416 consumers
.dump_results (ACE_TEXT("ECT_Consumer/totals"), gsf
);
418 ACE_Throughput_Stats suppliers
;
419 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
421 ACE_TCHAR buf
[BUFSIZ
];
422 ACE_OS::sprintf (buf
, ACE_TEXT("supplier_%02d"), i
);
424 this->suppliers_
[i
]->dump_results (buf
, gsf
);
425 this->suppliers_
[i
]->accumulate (suppliers
);
427 suppliers
.dump_results (ACE_TEXT("ECT_Supplier/totals"), gsf
);
431 ECT_Throughput::parse_args (int argc
, ACE_TCHAR
*argv
[])
433 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("dc:s:u:n:t:b:h:l:p:w:"));
436 while ((opt
= get_opt ()) != EOF
)
441 this->n_consumers_
= ACE_OS::atoi (get_opt
.opt_arg ());
445 this->n_suppliers_
= ACE_OS::atoi (get_opt
.opt_arg ());
449 this->burst_count_
= ACE_OS::atoi (get_opt
.opt_arg ());
453 this->burst_size_
= ACE_OS::atoi (get_opt
.opt_arg ());
457 this->event_size_
= ACE_OS::atoi (get_opt
.opt_arg ());
461 this->burst_pause_
= ACE_OS::atoi (get_opt
.opt_arg ());
467 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
469 this->consumer_type_start_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
470 arg
= ACE_OS::strtok_r (0, ",", &aux
);
471 this->consumer_type_count_
= ACE_OS::atoi (arg
);
472 arg
= ACE_OS::strtok_r (0, ",", &aux
);
473 this->consumer_type_shift_
= ACE_OS::atoi (arg
);
480 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
482 this->supplier_type_start_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
483 arg
= ACE_OS::strtok_r (0, ",", &aux
);
484 this->supplier_type_count_
= ACE_OS::atoi (arg
);
485 arg
= ACE_OS::strtok_r (0, ",", &aux
);
486 this->supplier_type_shift_
= ACE_OS::atoi (arg
);
491 this->pid_file_name_
= get_opt
.opt_arg ();
495 this->ec_concurrency_hwm_
= ACE_OS::atoi (get_opt
.opt_arg ());
500 ACE_DEBUG ((LM_DEBUG
,
508 "-b <event payload size> "
509 "-t <burst pause (usecs)> "
510 "-h <consumer_start,consumer_count,consumer_shift> "
511 "-l <supplier_start,supplier_count,supplier_shift> "
512 "-p <pid file name> "
513 "-w <concurrency HWM> "
521 if (this->burst_count_
<= 0)
523 ACE_DEBUG ((LM_DEBUG
,
524 "%s: burst count (%d) is out of range, "
525 "reset to default (%d)\n",
526 argv
[0], this->burst_count_
,
528 this->burst_count_
= 100;
531 if (this->burst_size_
<= 0)
533 ACE_DEBUG ((LM_DEBUG
,
534 "%s: burst size (%d) is out of range, "
535 "reset to default (%d)\n",
536 argv
[0], this->burst_size_
,
538 this->burst_size_
= 10;
541 if (this->event_size_
< 0)
543 ACE_DEBUG ((LM_DEBUG
,
544 "%s: event size (%d) is out of range, "
545 "reseting to default (%d)\n",
546 argv
[0], this->event_size_
,
548 this->event_size_
= 128;
551 if (this->n_consumers_
< 0
552 || this->n_consumers_
>= ECT_Throughput::MAX_CONSUMERS
)
554 this->n_consumers_
= 1;
555 ACE_ERROR_RETURN ((LM_ERROR
,
556 "%s: number of consumers or "
557 "suppliers out of range, "
558 "reset to default (%d)\n",
562 if (this->n_suppliers_
< 0
563 || this->n_suppliers_
>= ECT_Throughput::MAX_SUPPLIERS
)
565 this->n_suppliers_
= 1;
566 ACE_ERROR_RETURN ((LM_ERROR
,
567 "%s: number of suppliers out of range, "
568 "reset to default (%d)\n",
572 if (this->n_suppliers_
== 0 && this->n_consumers_
== 0)
574 this->n_suppliers_
= 1;
575 this->n_consumers_
= 1;
576 ACE_ERROR_RETURN ((LM_ERROR
,
577 "%s: no suppliers or consumers, "
578 "reset to default (%d of each)\n",
582 if (this->ec_concurrency_hwm_
<= 0)
584 this->ec_concurrency_hwm_
= 1;
585 ACE_ERROR_RETURN ((LM_ERROR
,
586 "%s: invalid concurrency HWM, "
587 "reset to default (%d)\n",