1 #include "ECT_Throughput.h"
3 #include "orbsvcs/Event_Utilities.h"
4 #include "orbsvcs/Event_Service_Constants.h"
5 #include "orbsvcs/Scheduler_Factory.h"
6 #include "orbsvcs/Time_Utilities.h"
7 #include "orbsvcs/Sched/Config_Scheduler.h"
8 #include "orbsvcs/Runtime_Scheduler.h"
9 #include "orbsvcs/Event/EC_Event_Channel.h"
10 #include "orbsvcs/Event/EC_Default_Factory.h"
12 #include "tao/Timeprobe.h"
13 #include "tao/debug.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Auto_Ptr.h"
17 #include "ace/Sched_Params.h"
18 #include "ace/High_Res_Timer.h"
19 #include "ace/OS_NS_strings.h"
20 #include "ace/OS_NS_errno.h"
21 #include "ace/OS_NS_unistd.h"
24 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
26 TAO_EC_Default_Factory::init_svcs ();
28 ECT_Throughput driver
;
29 return driver
.run (argc
, argv
);
32 // ****************************************************************
34 ECT_Throughput::ECT_Throughput (void)
41 consumer_type_start_ (ACE_ES_EVENT_UNDEFINED
),
42 consumer_type_count_ (1),
43 consumer_type_shift_ (0),
44 supplier_type_start_ (ACE_ES_EVENT_UNDEFINED
),
45 supplier_type_count_ (1),
46 supplier_type_shift_ (0),
49 ec_concurrency_hwm_ (1),
50 thr_create_flags_ (THR_NEW_LWP
|THR_BOUND
|THR_SCHED_FIFO
)
54 ECT_Throughput::~ECT_Throughput (void)
59 ECT_Throughput::run (int argc
, ACE_TCHAR
* argv
[])
63 // Calibrate the high resolution timer *before* starting the
65 ACE_High_Res_Timer::calibrate ();
68 CORBA::ORB_init (argc
, argv
);
70 CORBA::Object_var poa_object
=
71 this->orb_
->resolve_initial_references("RootPOA");
73 if (CORBA::is_nil (poa_object
.in ()))
74 ACE_ERROR_RETURN ((LM_ERROR
,
75 " (%P|%t) Unable to initialize the POA.\n"),
78 PortableServer::POA_var root_poa
=
79 PortableServer::POA::_narrow (poa_object
.in ());
81 PortableServer::POAManager_var poa_manager
=
82 root_poa
->the_POAManager ();
84 poa_manager
->activate ();
86 if (this->parse_args (argc
, argv
))
89 if (TAO_debug_level
> 0)
92 "Execution parameters:\n"
95 " burst count = <%d>\n"
96 " burst size = <%d>\n"
97 " event size = <%d>\n"
98 " burst pause = <%d>\n"
99 " consumer type start = <%d>\n"
100 " consumer type count = <%d>\n"
101 " consumer type shift = <%d>\n"
102 " supplier type start = <%d>\n"
103 " supplier type count = <%d>\n"
104 " supplier type shift = <%d>\n"
105 " pid file name = <%s>\n"
106 " concurrency HWM = <%d>\n",
114 this->consumer_type_start_
,
115 this->consumer_type_count_
,
116 this->consumer_type_shift_
,
117 this->supplier_type_start_
,
118 this->supplier_type_count_
,
119 this->supplier_type_shift_
,
121 this->pid_file_name_
?this->pid_file_name_
:ACE_TEXT("nil"),
122 this->ec_concurrency_hwm_
126 if (this->pid_file_name_
!= 0)
128 FILE* pid
= ACE_OS::fopen (this->pid_file_name_
, "w");
131 ACE_OS::fprintf (pid
, "%ld\n",
132 static_cast<long> (ACE_OS::getpid ()));
133 ACE_OS::fclose (pid
);
138 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
)
139 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO
)) / 2;
140 priority
= ACE_Sched_Params::next_priority (ACE_SCHED_FIFO
,
142 // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
144 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
146 ACE_SCOPE_PROCESS
)) != 0)
148 if (ACE_OS::last_error () == EPERM
)
150 ACE_DEBUG ((LM_DEBUG
,
151 "%s: user is not superuser, "
152 "so remain in time-sharing class\n", argv
[0]));
153 this->thr_create_flags_
= THR_NEW_LWP
;
156 ACE_ERROR ((LM_ERROR
,
157 "%s: ACE_OS::sched_params failed\n", argv
[0]));
160 if (ACE_OS::thr_setprio (priority
) == -1)
162 ACE_ERROR ((LM_ERROR
, "(%P|%t) main thr_setprio failed,"
163 "no real-time features\n"));
167 ACE_Config_Scheduler scheduler_impl
;
169 #include "ECT_Scheduler_Info.h"
170 ACE_Runtime_Scheduler
scheduler_impl (
171 runtime_configs_size
,
176 RtecScheduler::Scheduler_var scheduler
=
177 scheduler_impl
._this ();
180 CORBA::Object_var naming_obj
=
181 this->orb_
->resolve_initial_references ("NameService");
183 if (CORBA::is_nil (naming_obj
.in ()))
184 ACE_ERROR_RETURN ((LM_ERROR
,
185 " (%P|%t) Unable to get the Naming Service.\n"),
188 CosNaming::NamingContext_var naming_context
=
189 CosNaming::NamingContext::_narrow (naming_obj
.in ());
191 // This is the name we (potentially) register the Scheduling
192 // Service in the Naming Service.
193 CosNaming::Name
schedule_name (1);
194 schedule_name
.length (1);
195 schedule_name
[0].id
= CORBA::string_dup ("ScheduleService");
197 CORBA::String_var str
=
198 this->orb_
->object_to_string (scheduler
.in ());
199 ACE_DEBUG ((LM_DEBUG
, "The (local) scheduler IOR is <%s>\n",
202 // Register the servant with the Naming Context....
203 naming_context
->rebind (schedule_name
, scheduler
.in ());
205 ACE_Scheduler_Factory::use_config (naming_context
.in ());
208 auto_ptr
<POA_RtecEventChannelAdmin::EventChannel
> ec_impl
;
210 TAO_EC_Event_Channel_Attributes
attr (root_poa
.in (),
213 TAO_EC_Event_Channel
*ec
=
214 new TAO_EC_Event_Channel (attr
);
218 auto_ptr
<POA_RtecEventChannelAdmin::EventChannel
> auto_ec_impl (ec
);
219 ec_impl
= auto_ec_impl
;
221 RtecEventChannelAdmin::EventChannel_var channel
=
224 this->connect_consumers (scheduler
.in (),
227 ACE_DEBUG ((LM_DEBUG
, "connected consumer(s)\n"));
229 this->connect_suppliers (scheduler
.in (),
232 ACE_DEBUG ((LM_DEBUG
, "connected supplier(s)\n"));
234 this->activate_suppliers ();
236 ACE_DEBUG ((LM_DEBUG
, "suppliers are active\n"));
238 // Wait for the supplier threads...
239 if (ACE_Thread_Manager::instance ()->wait () == -1)
241 ACE_ERROR ((LM_ERROR
, "Thread_Manager wait failed\n"));
245 ACE_DEBUG ((LM_DEBUG
, "suppliers finished\n"));
247 this->dump_results ();
249 this->disconnect_consumers ();
251 ACE_DEBUG ((LM_DEBUG
, "consumers disconnected\n"));
253 this->disconnect_suppliers ();
255 ACE_DEBUG ((LM_DEBUG
, "suppliers disconnected\n"));
259 ACE_DEBUG ((LM_DEBUG
, "channel destroyed\n"));
263 PortableServer::POA_var poa
=
264 ec_impl
->_default_POA ();
265 PortableServer::ObjectId_var id
=
266 poa
->servant_to_id (ec_impl
.get ());
267 poa
->deactivate_object (id
.in ());
269 ACE_DEBUG ((LM_DEBUG
, "EC deactivated\n"));
273 // Deactivate the Scheduler
274 PortableServer::POA_var poa
=
275 scheduler_impl
._default_POA ();
276 PortableServer::ObjectId_var id
=
277 poa
->servant_to_id (&scheduler_impl
);
278 poa
->deactivate_object (id
.in ());
280 ACE_DEBUG ((LM_DEBUG
, "scheduler deactivated\n"));
283 catch (const CORBA::Exception
& ex
)
285 ex
._tao_print_exception ("ECT_Throughput::run");
289 ACE_ERROR ((LM_ERROR
, "non-corba exception raised\n"));
296 ECT_Throughput::shutdown_consumer (void*)
299 // (reinterpret_cast<Test_Consumer**> (consumer_cookie)
300 // - this->consumers_);
302 // ACE_DEBUG ((LM_DEBUG, "(%t) events received by consumer %d\n", ID));
304 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
305 this->active_count_
--;
306 if (this->active_count_
<= 0)
308 ACE_DEBUG ((LM_DEBUG
,
309 "(%t) shutting down the ORB\n"));
310 // Not needed: this->orb_->shutdown (0);
315 ECT_Throughput::connect_consumers
316 (RtecScheduler::Scheduler_ptr scheduler
,
317 RtecEventChannelAdmin::EventChannel_ptr channel
)
320 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
321 this->active_count_
= this->n_consumers_
;
323 for (int i
= 0; i
< this->n_consumers_
; ++i
)
326 ACE_OS::sprintf (buf
, "consumer_%02d", i
);
328 ACE_NEW (this->consumers_
[i
],
330 this->consumers_
+ i
,
331 this->n_suppliers_
));
333 int start
= this->consumer_type_start_
334 + i
* this->consumer_type_shift_
;
336 this->consumers_
[i
]->connect (scheduler
,
339 this->consumer_type_count_
,
345 ECT_Throughput::connect_suppliers
346 (RtecScheduler::Scheduler_ptr scheduler
,
347 RtecEventChannelAdmin::EventChannel_ptr channel
)
349 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
352 ACE_OS::sprintf (buf
, "supplier_%02d", i
);
354 ACE_NEW (this->suppliers_
[i
], Test_Supplier (this));
356 int start
= this->supplier_type_start_
+ i
*this->supplier_type_shift_
;
357 this->suppliers_
[i
]->connect (scheduler
,
364 this->supplier_type_count_
,
370 ECT_Throughput::activate_suppliers (void)
373 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
)
374 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO
)) / 2;
376 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
378 if (this->suppliers_
[i
]->activate (this->thr_create_flags_
,
379 1, 0, priority
) == -1)
381 ACE_ERROR ((LM_ERROR
,
382 "Cannot activate thread for supplier %d\n",
389 ECT_Throughput::disconnect_suppliers (void)
391 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
393 this->suppliers_
[i
]->disconnect ();
398 ECT_Throughput::disconnect_consumers (void)
400 for (int i
= 0; i
< this->n_consumers_
; ++i
)
402 this->consumers_
[i
]->disconnect ();
407 ECT_Throughput::dump_results (void)
409 ACE_High_Res_Timer::global_scale_factor_type gsf
=
410 ACE_High_Res_Timer::global_scale_factor ();
412 ACE_Throughput_Stats consumers
;
413 for (int j
= 0; j
< this->n_consumers_
; ++j
)
415 ACE_TCHAR buf
[BUFSIZ
];
416 ACE_OS::sprintf (buf
, ACE_TEXT("consumer_%02d"), j
);
418 this->consumers_
[j
]->dump_results (buf
, gsf
);
419 this->consumers_
[j
]->accumulate (consumers
);
421 consumers
.dump_results (ACE_TEXT("ECT_Consumer/totals"), gsf
);
423 ACE_Throughput_Stats suppliers
;
424 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
426 ACE_TCHAR buf
[BUFSIZ
];
427 ACE_OS::sprintf (buf
, ACE_TEXT("supplier_%02d"), i
);
429 this->suppliers_
[i
]->dump_results (buf
, gsf
);
430 this->suppliers_
[i
]->accumulate (suppliers
);
432 suppliers
.dump_results (ACE_TEXT("ECT_Supplier/totals"), gsf
);
436 ECT_Throughput::parse_args (int argc
, ACE_TCHAR
*argv
[])
438 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("dc:s:u:n:t:b:h:l:p:w:"));
441 while ((opt
= get_opt ()) != EOF
)
446 this->n_consumers_
= ACE_OS::atoi (get_opt
.opt_arg ());
450 this->n_suppliers_
= ACE_OS::atoi (get_opt
.opt_arg ());
454 this->burst_count_
= ACE_OS::atoi (get_opt
.opt_arg ());
458 this->burst_size_
= ACE_OS::atoi (get_opt
.opt_arg ());
462 this->event_size_
= ACE_OS::atoi (get_opt
.opt_arg ());
466 this->burst_pause_
= ACE_OS::atoi (get_opt
.opt_arg ());
472 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
474 this->consumer_type_start_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
475 arg
= ACE_OS::strtok_r (0, ",", &aux
);
476 this->consumer_type_count_
= ACE_OS::atoi (arg
);
477 arg
= ACE_OS::strtok_r (0, ",", &aux
);
478 this->consumer_type_shift_
= ACE_OS::atoi (arg
);
485 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
487 this->supplier_type_start_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
488 arg
= ACE_OS::strtok_r (0, ",", &aux
);
489 this->supplier_type_count_
= ACE_OS::atoi (arg
);
490 arg
= ACE_OS::strtok_r (0, ",", &aux
);
491 this->supplier_type_shift_
= ACE_OS::atoi (arg
);
496 this->pid_file_name_
= get_opt
.opt_arg ();
500 this->ec_concurrency_hwm_
= ACE_OS::atoi (get_opt
.opt_arg ());
505 ACE_DEBUG ((LM_DEBUG
,
513 "-b <event payload size> "
514 "-t <burst pause (usecs)> "
515 "-h <consumer_start,consumer_count,consumer_shift> "
516 "-l <supplier_start,supplier_count,supplier_shift> "
517 "-p <pid file name> "
518 "-w <concurrency HWM> "
526 if (this->burst_count_
<= 0)
528 ACE_DEBUG ((LM_DEBUG
,
529 "%s: burst count (%d) is out of range, "
530 "reset to default (%d)\n",
531 argv
[0], this->burst_count_
,
533 this->burst_count_
= 100;
536 if (this->burst_size_
<= 0)
538 ACE_DEBUG ((LM_DEBUG
,
539 "%s: burst size (%d) is out of range, "
540 "reset to default (%d)\n",
541 argv
[0], this->burst_size_
,
543 this->burst_size_
= 10;
546 if (this->event_size_
< 0)
548 ACE_DEBUG ((LM_DEBUG
,
549 "%s: event size (%d) is out of range, "
550 "reseting to default (%d)\n",
551 argv
[0], this->event_size_
,
553 this->event_size_
= 128;
556 if (this->n_consumers_
< 0
557 || this->n_consumers_
>= ECT_Throughput::MAX_CONSUMERS
)
559 this->n_consumers_
= 1;
560 ACE_ERROR_RETURN ((LM_ERROR
,
561 "%s: number of consumers or "
562 "suppliers out of range, "
563 "reset to default (%d)\n",
567 if (this->n_suppliers_
< 0
568 || this->n_suppliers_
>= ECT_Throughput::MAX_SUPPLIERS
)
570 this->n_suppliers_
= 1;
571 ACE_ERROR_RETURN ((LM_ERROR
,
572 "%s: number of suppliers out of range, "
573 "reset to default (%d)\n",
577 if (this->n_suppliers_
== 0 && this->n_consumers_
== 0)
579 this->n_suppliers_
= 1;
580 this->n_consumers_
= 1;
581 ACE_ERROR_RETURN ((LM_ERROR
,
582 "%s: no suppliers or consumers, "
583 "reset to default (%d of each)\n",
587 if (this->ec_concurrency_hwm_
<= 0)
589 this->ec_concurrency_hwm_
= 1;
590 ACE_ERROR_RETURN ((LM_ERROR
,
591 "%s: invalid concurrency HWM, "
592 "reset to default (%d)\n",