1 #include "EC_Multiple.h"
3 #include "Scheduler_Runtime1.h"
4 #include "Scheduler_Runtime2.h"
5 #include "Scheduler_Runtime_Dynamic.h" /* infos_3 */
7 #include "orbsvcs/Event_Utilities.h"
8 #include "orbsvcs/Event_Service_Constants.h"
9 #include "orbsvcs/Scheduler_Factory.h"
10 #include "orbsvcs/Time_Utilities.h"
11 #include "orbsvcs/RtecEventChannelAdminC.h"
12 #include "orbsvcs/Sched/Config_Scheduler.h"
13 #include "orbsvcs/Event/EC_Event_Channel.h"
14 #include "orbsvcs/Runtime_Scheduler.h"
16 #include "tao/ORB_Core.h"
18 #include "ace/Get_Opt.h"
19 #include "ace/Auto_Ptr.h"
20 #include "ace/Sched_Params.h"
21 #include "ace/OS_NS_errno.h"
22 #include "ace/OS_NS_strings.h"
25 # include <sys/lwp.h> /* for _lwp_self */
28 Test_ECG::Test_ECG (void)
29 : lcl_name_ ("Test_ECG"),
31 scheduling_type_ (Test_ECG::ss_runtime
),
32 consumer_disconnects_ (0),
33 supplier_disconnects_ (0),
39 hp_message_count_ (200),
40 hps_event_a_ (ACE_ES_EVENT_UNDEFINED
),
41 hps_event_b_ (ACE_ES_EVENT_UNDEFINED
+ 1),
42 hpc_event_a_ (ACE_ES_EVENT_UNDEFINED
),
43 hpc_event_b_ (ACE_ES_EVENT_UNDEFINED
+ 1),
47 lp_interval_ (100000),
48 lp_message_count_ (50),
56 ready_cnd_ (ready_mtx_
)
61 print_priority_info (const char *const name
)
63 #if defined (ACE_HAS_PTHREADS) || defined (sun)
64 #if defined (ACE_HAS_PTHREADS)
65 struct sched_param param
;
68 if ((status
= pthread_getschedparam (pthread_self (), &policy
,
72 "%C (%lu|%u); policy is %d, priority is %d\n",
77 policy
, param
.sched_priority
));
80 "%C (%lu|%u); policy is %d, priority is %d\n",
85 policy
, param
.sched_priority
));
88 ACE_DEBUG ((LM_DEBUG
,"pthread_getschedparam failed: %d\n", status
));
91 ACE_UNUSED_ARG (name
);
92 #endif /* ACE_HAS_PTHREADS */
95 // Find what scheduling class the thread's LWP is in.
96 //FUZZ: disable check_for_lack_ACE_OS
97 ACE_Sched_Params
sched_params (ACE_SCHED_OTHER
, 0);
98 //FUZZ: enable check_for_lack_ACE_OS
99 if (ACE_OS::lwp_getparams (sched_params
) == -1)
101 ACE_OS::perror ("ACE_OS::lwp_getparams");
104 else if (sched_params
.policy () == ACE_SCHED_FIFO
||
105 sched_params
.policy () == ACE_SCHED_RR
)
107 // This thread's LWP is in the RT class.
108 ACE_DEBUG ((LM_DEBUG
,
109 "RT class; priority: %d, quantum: %u msec\n",
110 sched_params
.priority (),
111 sched_params
.quantum ().msec ()));
115 ACE_DEBUG ((LM_DEBUG
,
116 "TS class; priority: %d\n",
117 sched_params
.priority ()));
121 ACE_UNUSED_ARG (name
);
122 #endif /* ACE_HAS_PTHREADS || sun */
126 Test_ECG::run (int argc
, ACE_TCHAR
* argv
[])
131 CORBA::ORB_init (argc
, argv
);
133 CORBA::Object_var poa_object
=
134 orb
->resolve_initial_references("RootPOA");
136 if (CORBA::is_nil (poa_object
.in ()))
137 ACE_ERROR_RETURN ((LM_ERROR
,
138 " (%P|%t) Unable to initialize the POA.\n"),
141 PortableServer::POA_var root_poa
=
142 PortableServer::POA::_narrow (poa_object
.in ());
144 PortableServer::POAManager_var poa_manager
=
145 root_poa
->the_POAManager ();
147 poa_manager
->activate ();
149 if (this->parse_args (argc
, argv
))
152 ACE_DEBUG ((LM_DEBUG
,
153 "Execution parameters:\n"
156 " scheduler type = <%d>\n"
157 " consumer disconnects = <%d>\n"
158 " supplier disconnects = <%d>\n"
159 " short circuit EC = <%d>\n"
160 " HP suppliers = <%d>\n"
161 " HP consumers = <%d>\n"
162 " HP workload = <%d> (iterations)\n"
163 " HP interval between events = <%d> (usecs)\n"
164 " HP message count = <%d>\n"
165 " HP supplier Event A = <%d>\n"
166 " HP supplier Event B = <%d>\n"
167 " HP consumer Event A = <%d>\n"
168 " HP consumer Event B = <%d>\n"
169 " LP suppliers = <%d>\n"
170 " LP consumers = <%d>\n"
171 " LP workload = <%d> (iterations)\n"
172 " LP interval between events = <%d> (usecs)\n"
173 " LP message count = <%d>\n"
174 " LP supplier Event A = <%d>\n"
175 " LP supplier Event B = <%d>\n"
176 " LP consumer Event A = <%d>\n"
177 " LP consumer Event B = <%d>\n"
178 " schedule_file = <%s>\n"
179 " pid file name = <%s>\n",
180 this->lcl_name_
.length () ? this->lcl_name_
.c_str () : "nil",
181 this->rmt_name_
.length () ? this->rmt_name_
.c_str () : "nil",
182 this->scheduling_type_
,
183 this->consumer_disconnects_
,
184 this->supplier_disconnects_
,
185 this->short_circuit_
,
191 this->hp_message_count_
,
201 this->lp_message_count_
,
207 this->schedule_file_
?this->schedule_file_
:ACE_TEXT("nil"),
208 this->pid_file_name_
?this->pid_file_name_
:ACE_TEXT("nil")) );
210 print_priority_info ("Test_ECG::run (Main)");
212 if (this->pid_file_name_
!= 0)
214 FILE* pid
= ACE_OS::fopen (this->pid_file_name_
, "w");
217 ACE_OS::fprintf (pid
, "%ld\n",
218 static_cast<long> (ACE_OS::getpid ()));
219 ACE_OS::fclose (pid
);
224 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
);
225 // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
227 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
229 ACE_SCOPE_PROCESS
)) != 0)
231 if (ACE_OS::last_error () == EPERM
)
232 ACE_DEBUG ((LM_DEBUG
,
233 "%s: user is not superuser, "
234 "so remain in time-sharing class\n", argv
[0]));
236 ACE_ERROR ((LM_ERROR
,
237 "%s: ACE_OS::sched_params failed\n", argv
[0]));
240 if (ACE_OS::thr_setprio (min_priority
) == -1)
242 ACE_ERROR ((LM_ERROR
, "(%P|%t) main thr_setprio failed\n"));
245 print_priority_info ("Test_ECG::run (Main after thr_setprio)");
247 CORBA::Object_var naming_obj
=
248 orb
->resolve_initial_references ("NameService");
250 if (CORBA::is_nil (naming_obj
.in ()))
251 ACE_ERROR_RETURN ((LM_ERROR
,
252 " (%P|%t) Unable to get the Naming Service.\n"),
255 CosNaming::NamingContext_var naming_context
=
256 CosNaming::NamingContext::_narrow (naming_obj
.in ());
258 auto_ptr
<POA_RtecScheduler::Scheduler
> scheduler_impl
;
259 RtecScheduler::Scheduler_var scheduler
;
261 switch (this->scheduling_type_
)
264 ACE_ERROR ((LM_WARNING
, "Unknown scheduling type %d\n",
265 this->scheduling_type_
));
267 case Test_ECG::ss_global
:
270 case Test_ECG::ss_local
:
272 auto_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl (new ACE_Config_Scheduler
);
273 scheduler_impl
= auto_scheduler_impl
;
275 if (scheduler_impl
.get () == 0)
277 scheduler
= scheduler_impl
->_this ();
280 case Test_ECG::ss_runtime
:
281 if (ACE_OS::strcmp (this->lcl_name_
.c_str (), "ECM1") == 0)
283 // This setups Scheduler_Factory to use the runtime version
284 ACE_Scheduler_Factory::use_runtime (
285 sizeof (runtime_configs_1
)/sizeof (runtime_configs_1
[0]),
287 sizeof (runtime_infos_1
)/sizeof (runtime_infos_1
[0]),
290 auto_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl
291 (new ACE_Runtime_Scheduler (runtime_configs_1_size
,
293 runtime_infos_1_size
,
295 scheduler_impl
= auto_scheduler_impl
;
297 if (scheduler_impl
.get () == 0)
299 scheduler
= scheduler_impl
->_this ();
301 else if (ACE_OS::strcmp (this->lcl_name_
.c_str (), "ECM2") == 0)
303 // This setups Scheduler_Factory to use the runtime version
304 ACE_Scheduler_Factory::use_runtime (
305 sizeof (runtime_configs_2
)/sizeof (runtime_configs_2
[0]),
307 sizeof (runtime_infos_2
)/sizeof (runtime_infos_2
[0]),
310 auto_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl
311 (new ACE_Runtime_Scheduler (runtime_configs_2_size
,
313 runtime_infos_2_size
,
315 scheduler_impl
= auto_scheduler_impl
;
317 if (scheduler_impl
.get () == 0)
319 scheduler
= scheduler_impl
->_this ();
321 else if (ACE_OS::strcmp (this->lcl_name_
.c_str (), "ECM3") == 0)
323 // This setups Scheduler_Factory to use the runtime version
324 ACE_Scheduler_Factory::use_runtime (
325 sizeof (runtime_configs_3
)/sizeof (runtime_configs_3
[0]),
327 sizeof (runtime_infos_3
)/sizeof (runtime_infos_3
[0]),
330 auto_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl
331 (new ACE_Runtime_Scheduler (runtime_configs_3_size
,
333 runtime_infos_3_size
,
335 scheduler_impl
= auto_scheduler_impl
;
337 if (scheduler_impl
.get () == 0)
339 scheduler
= scheduler_impl
->_this ();
343 ACE_ERROR ((LM_WARNING
,
344 "Unknown name <%C> defaulting to "
345 "config scheduler\n", this->lcl_name_
.c_str ()));
347 auto_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl (new ACE_Config_Scheduler
);
348 scheduler_impl
= auto_scheduler_impl
;
350 if (scheduler_impl
.get () == 0)
352 scheduler
= scheduler_impl
->_this ();
358 // We use this buffer to generate the names of the local
360 const int bufsize
= 512;
363 // Register the scheduler with the naming service.
364 switch (this->scheduling_type_
)
367 case Test_ECG::ss_global
:
370 case Test_ECG::ss_local
:
371 case Test_ECG::ss_runtime
:
373 CORBA::String_var str
=
374 orb
->object_to_string (scheduler
.in ());
375 ACE_DEBUG ((LM_DEBUG
, "The (local) scheduler IOR is <%C>\n",
378 ACE_OS::strcpy (buf
, "ScheduleService@");
379 ACE_OS::strcat (buf
, this->lcl_name_
.c_str ());
381 // Register the servant with the Naming Context....
382 CosNaming::Name
schedule_name (1);
383 schedule_name
.length (1);
384 schedule_name
[0].id
= CORBA::string_dup (buf
);
385 naming_context
->bind (schedule_name
, scheduler
.in ());
387 if (ACE_Scheduler_Factory::use_config (naming_context
.in (),
394 // Create the EventService implementation, but don't start its
396 TAO_EC_Event_Channel_Attributes
attr (root_poa
.in (),
398 TAO_EC_Event_Channel
ec_impl (attr
);
400 // Register Event_Service with the Naming Service.
401 RtecEventChannelAdmin::EventChannel_var ec
=
404 CORBA::String_var str
=
405 orb
->object_to_string (ec
.in ());
408 ACE_DEBUG ((LM_DEBUG
, "The (local) EC IOR is <%C>\n", str
.in ()));
410 ACE_OS::strcpy (buf
, "EventChannel@");
411 ACE_OS::strcat (buf
, this->lcl_name_
.c_str ());
413 CosNaming::Name
channel_name (1);
414 channel_name
.length (1);
415 channel_name
[0].id
= CORBA::string_dup (buf
);
416 naming_context
->bind (channel_name
, ec
.in ());
418 ACE_DEBUG ((LM_DEBUG
, "waiting to start\n"));
420 ACE_Time_Value
tv (15, 0);
422 if (this->rmt_name_
.length () != 0)
427 ACE_DEBUG ((LM_DEBUG
, "starting....\n"));
429 RtecEventChannelAdmin::EventChannel_var local_ec
=
430 this->get_ec (naming_context
.in (),
431 this->lcl_name_
.c_str ());
433 ACE_DEBUG ((LM_DEBUG
, "located local EC\n"));
435 for (int sd
= 0; sd
< this->supplier_disconnects_
; ++sd
)
437 this->connect_suppliers (local_ec
.in ());
438 this->disconnect_suppliers ();
440 ACE_DEBUG ((LM_DEBUG
, "Supplier disconnection %d\n", sd
));
443 this->connect_suppliers (local_ec
.in ());
445 ACE_DEBUG ((LM_DEBUG
, "connected supplier\n"));
447 RtecEventChannelAdmin::Observer_Handle observer_handle
= 0;
448 if (this->rmt_name_
.length () != 0)
453 RtecEventChannelAdmin::EventChannel_var remote_ec
=
454 this->get_ec (naming_context
.in (),
455 this->rmt_name_
.c_str ());
456 ACE_DEBUG ((LM_DEBUG
, "located remote EC\n"));
458 CosNaming::Name
rsch_name (1);
459 rsch_name
.length (1);
460 ACE_OS::strcpy (buf
, "ScheduleService");
461 if (this->scheduling_type_
!= Test_ECG::ss_global
)
463 ACE_OS::strcat (buf
, "@");
464 ACE_OS::strcat (buf
, this->rmt_name_
.c_str ());
466 rsch_name
[0].id
= CORBA::string_dup (buf
);
467 CORBA::Object_var tmpobj
=
468 naming_context
->resolve (rsch_name
);
470 RtecScheduler::Scheduler_var remote_sch
=
471 RtecScheduler::Scheduler::_narrow (tmpobj
.in ());
473 this->connect_ecg (local_ec
.in (),
477 ACE_DEBUG ((LM_DEBUG
, "connected proxy\n"));
482 RtecEventChannelAdmin::Observer_ptr observer
=
484 observer_handle
= ec_impl
.append_observer (observer
);
487 for (int cd
= 0; cd
< this->consumer_disconnects_
; ++cd
)
489 this->connect_consumers (local_ec
.in ());
490 this->disconnect_consumers ();
492 ACE_DEBUG ((LM_DEBUG
, "Consumer disconnection %d\n", cd
));
494 this->connect_consumers (local_ec
.in ());
496 ACE_DEBUG ((LM_DEBUG
, "connected consumer\n"));
498 this->activate_suppliers (local_ec
.in ());
500 ACE_DEBUG ((LM_DEBUG
, "suppliers are active\n"));
502 this->running_suppliers_
= this->hp_suppliers_
+ this->lp_suppliers_
;
504 // Acquire the mutex for the ready mutex, blocking any supplier
505 // that may start after this point.
506 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, ready_mon
, this->ready_mtx_
, 1);
508 this->test_start_
= ACE_OS::gethrtime ();
509 this->ready_cnd_
.broadcast ();
510 ready_mon
.release ();
512 ACE_DEBUG ((LM_DEBUG
, "activate the EC\n"));
514 if (this->rmt_name_
.length () != 0)
516 ec_impl
.remove_observer (observer_handle
);
519 // Create the EC internal threads
522 ACE_DEBUG ((LM_DEBUG
, "running the test\n"));
525 this->test_stop_
= ACE_OS::gethrtime ();
527 ACE_DEBUG ((LM_DEBUG
, "shutdown the EC\n"));
530 this->dump_results ();
532 if (this->schedule_file_
!= 0)
534 RtecScheduler::RT_Info_Set_var infos
;
535 RtecScheduler::Dependency_Set_var deps
;
536 RtecScheduler::Config_Info_Set_var configs
;
537 RtecScheduler::Scheduling_Anomaly_Set_var anomalies
;
539 ACE_Scheduler_Factory::server ()->compute_scheduling
540 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
,
542 ACE_Sched_Params::priority_max (ACE_SCHED_FIFO
,
544 infos
.out (), deps
.out (),
545 configs
.out (), anomalies
.out ());
547 ACE_Scheduler_Factory::dump_schedule (infos
.in (),
551 this->schedule_file_
);
554 naming_context
->unbind (channel_name
);
556 if (this->rmt_name_
.length () != 0)
559 this->ecg_
.shutdown ();
562 this->disconnect_consumers ();
563 this->disconnect_suppliers ();
565 ACE_DEBUG ((LM_DEBUG
, "shutdown grace period\n"));
569 catch (const CORBA::SystemException
& sys_ex
)
571 sys_ex
._tao_print_exception ("SYS_EX");
573 catch (const CORBA::Exception
& ex
)
575 ex
._tao_print_exception ("NON SYS EX");
580 RtecEventChannelAdmin::EventChannel_ptr
581 Test_ECG::get_ec (CosNaming::NamingContext_ptr naming_context
,
582 const char* process_name
)
584 const int bufsize
= 512;
586 ACE_OS::strcpy (buf
, "EventChannel@");
587 ACE_OS::strcat (buf
, process_name
);
589 CosNaming::Name
channel_name (1);
590 channel_name
.length (1);
591 channel_name
[0].id
= CORBA::string_dup (buf
);
593 CORBA::Object_var ec_ptr
=
594 naming_context
->resolve (channel_name
);
595 if (CORBA::is_nil (ec_ptr
.in ()))
596 return RtecEventChannelAdmin::EventChannel::_nil ();
598 return RtecEventChannelAdmin::EventChannel::_narrow (ec_ptr
.in ());
602 Test_ECG::disconnect_suppliers (void)
604 for (int i
= 0; i
< this->hp_suppliers_
+ this->lp_suppliers_
; ++i
)
606 this->suppliers_
[i
]->close ();
611 Test_ECG::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec
)
614 for (i
= 0; i
< this->hp_suppliers_
; ++i
)
616 // Limit the number of messages sent by each supplier
617 int mc
= this->hp_message_count_
/ this->hp_suppliers_
;
622 ACE_OS::sprintf (buf
, "hp_supplier_%02d@%s", i
, this->lcl_name_
.c_str ());
624 ACE_NEW (this->suppliers_
[i
],
625 Test_Supplier (this, this->suppliers_
+ i
));
627 this->suppliers_
[i
]->open (buf
,
631 this->hp_interval_
* 10,
635 for (; i
< this->hp_suppliers_
+ this->lp_suppliers_
; ++i
)
637 // Limit the number of messages sent by each supplier
638 int mc
= this->lp_message_count_
/ this->lp_suppliers_
;
643 ACE_OS::sprintf (buf
, "lp_supplier_%02d@%s",
644 i
- this->hp_suppliers_
, this->lcl_name_
.c_str ());
646 ACE_NEW (this->suppliers_
[i
],
647 Test_Supplier (this, this->suppliers_
+ i
));
649 this->suppliers_
[i
]->open (buf
,
653 this->lp_interval_
* 10,
659 Test_ECG::disconnect_consumers (void)
661 for (int i
= 0; i
< this->hp_consumers_
+ this->lp_consumers_
; ++i
)
663 this->consumers_
[i
]->close ();
668 Test_ECG::activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec
)
673 for (i
= 0; i
< this->hp_suppliers_
; ++i
)
675 // Limit the number of messages sent by each supplier
676 int mc
= this->hp_message_count_
/ this->hp_suppliers_
;
681 ACE_OS::sprintf (buf
, "hp_supplier_%02d@%s", i
, this->lcl_name_
.c_str ());
683 this->suppliers_
[i
]->activate (buf
,
684 this->hp_interval_
* 10,
688 for (; i
< this->hp_suppliers_
+ this->lp_suppliers_
; ++i
)
690 // Limit the number of messages sent by each supplier
691 int mc
= this->lp_message_count_
/ this->lp_suppliers_
;
696 ACE_OS::sprintf (buf
, "lp_supplier_%02d@%s",
697 i
- this->hp_suppliers_
, this->lcl_name_
.c_str ());
699 this->suppliers_
[i
]->activate (buf
,
700 this->lp_interval_
* 10,
704 catch (const CORBA::Exception
&)
711 Test_ECG::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec
)
714 for (i
= 0; i
< this->hp_consumers_
; ++i
)
717 ACE_OS::sprintf (buf
, "hp_consumer_%02d@%s", i
, this->lcl_name_
.c_str ());
719 ACE_NEW (this->consumers_
[i
],
720 Test_Consumer (this, this->consumers_
+ i
));
722 this->consumers_
[i
]->open (buf
,
726 this->stats_
[i
].total_time_
= 0;
727 this->stats_
[i
].lcl_count_
= 0;
728 this->stats_
[i
].rmt_count_
= 0;
731 for (; i
< this->hp_consumers_
+ this->lp_consumers_
; ++i
)
734 ACE_OS::sprintf (buf
, "lp_consumer_%02d@%s",
735 i
- this->hp_consumers_
, this->lcl_name_
.c_str ());
737 ACE_NEW (this->consumers_
[i
],
738 Test_Consumer (this, this->consumers_
+ i
));
740 this->consumers_
[i
]->open (buf
,
744 this->stats_
[i
].total_time_
= 0;
745 this->stats_
[i
].lcl_count_
= 0;
746 this->stats_
[i
].rmt_count_
= 0;
748 this->running_consumers_
= this->hp_consumers_
+ this->lp_consumers_
;
752 Test_ECG::connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec
,
753 RtecEventChannelAdmin::EventChannel_ptr remote_ec
,
754 RtecScheduler::Scheduler_ptr remote_sch
)
756 RtecScheduler::Scheduler_ptr local_sch
=
757 ACE_Scheduler_Factory::server ();
760 const int bufsize
= 512;
761 char ecg_name
[bufsize
];
762 ACE_OS::strcpy (ecg_name
, "ecg_");
763 ACE_OS::strcat (ecg_name
, this->lcl_name_
.c_str ());
765 // We could use the same name on the local and remote scheduler,
766 // but that fails when using a global scheduler.
768 ACE_OS::strcpy (rmt
, ecg_name
);
769 ACE_OS::strcat (rmt
, "@");
770 ACE_OS::strcat (rmt
, this->rmt_name_
.c_str ());
772 // We could use the same name on the local and remote scheduler,
773 // but that fails when using a global scheduler.
775 ACE_OS::strcpy (lcl
, ecg_name
);
776 ACE_OS::strcat (lcl
, "@");
777 ACE_OS::strcat (lcl
, this->lcl_name_
.c_str ());
779 this->ecg_
.init (remote_ec
, local_ec
, remote_sch
, local_sch
,
784 Test_ECG::push_supplier (void * /* cookie */,
785 RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer
,
786 const RtecEventComm::EventSet
&events
)
788 this->wait_until_ready ();
789 // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events sent by supplier\n"));
790 // @@ TODO we could keep somekind of stats here...
791 if (!this->short_circuit_
)
793 consumer
->push (events
);
798 for (; i
< this->hp_consumers_
; ++i
)
800 this->consumers_
[i
]->push (events
);
802 for (; i
< this->hp_consumers_
+ this->lp_consumers_
; ++i
)
804 this->consumers_
[i
]->push (events
);
810 Test_ECG::push_consumer (void *consumer_cookie
,
811 ACE_hrtime_t arrival
,
812 const RtecEventComm::EventSet
&events
)
815 (reinterpret_cast<Test_Consumer
**> (consumer_cookie
)
818 // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events received by consumer %d\n", ID));
820 if (events
.length () == 0)
822 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
826 // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
829 const int bufsize
= 128;
831 ACE_OS::sprintf (buf
, "Consumer %d receives event in thread: ", ID
);
832 print_priority_info (buf
);
835 for (u_int i
= 0; i
< events
.length (); ++i
)
837 const RtecEventComm::Event
& e
= events
[i
];
839 if (e
.header
.type
== ACE_ES_EVENT_SHUTDOWN
)
841 this->shutdown_consumer (ID
);
846 ORBSVCS_Time::TimeT_to_hrtime (s
, e
.header
.creation_time
);
847 ACE_hrtime_t nsec
= arrival
- s
;
848 if (this->local_source (e
.header
.source
))
850 int& count
= this->stats_
[ID
].lcl_count_
;
852 this->stats_
[ID
].lcl_latency_
[count
] = nsec
;
853 int workload
= this->hp_workload_
;
854 int interval
= this->hp_interval_
;
855 if (ID
>= this->hp_consumers_
)
857 workload
= this->lp_workload_
;
858 interval
= this->lp_interval_
;
861 for (int j
= 0; j
< workload
; ++j
)
863 // Eat a little CPU so the Utilization test can measure the
865 /* takes about 40.2 usecs on a 167 MHz Ultra2 */
867 ACE::is_prime (n
, 2, n
/ 2);
869 // Increment the elapsed time on this consumer.
870 ACE_hrtime_t now
= ACE_OS::gethrtime ();
871 this->stats_
[ID
].total_time_
+= now
- arrival
;
872 this->stats_
[ID
].end_
[count
] = now
;
874 // We estimate our laxity based on the event creation
875 // time... it may not be very precise, but will do; other
876 // strategies include:
877 // + Keep track of the "current frame", then then deadline
878 // is the end of the frame.
879 // + Use the start of the test to keep the current frame.
880 // + Use the last execution.
882 CORBA::ULong tmp
= ACE_U64_TO_U32 (s
- now
);
883 this->stats_
[ID
].laxity_
[count
] = 1 + tmp
/1000.0F
/interval
;
888 int& count
= this->stats_
[ID
].rmt_count_
;
889 this->stats_
[ID
].rmt_latency_
[count
] = nsec
;
896 Test_ECG::wait_until_ready (void)
898 ACE_GUARD (TAO_SYNCH_MUTEX
, ready_mon
, this->ready_mtx_
);
899 while (!this->ready_
)
900 this->ready_cnd_
.wait ();
904 Test_ECG::shutdown_supplier (void* /* supplier_cookie */,
905 RtecEventComm::PushConsumer_ptr consumer
)
908 this->running_suppliers_
--;
909 if (this->running_suppliers_
!= 0)
912 // We propagate a shutdown event through the system...
913 //FUZZ: disable check_for_lack_ACE_OS
914 RtecEventComm::EventSet
shutdown (1);
915 //FUZZ: enable check_for_lack_ACE_OS
917 RtecEventComm::Event
& s
= shutdown
[0];
922 ACE_hrtime_t t
= ACE_OS::gethrtime ();
923 ORBSVCS_Time::hrtime_to_TimeT (s
.header
.creation_time
, t
);
924 s
.header
.type
= ACE_ES_EVENT_SHUTDOWN
;
925 consumer
->push (shutdown
);
929 Test_ECG::shutdown_consumer (int id
)
931 ACE_DEBUG ((LM_DEBUG
, "Shutdown consumer %d\n", id
));
932 this->running_consumers_
--;
934 if (this->running_consumers_
== 0)
936 if (TAO_ORB_Core_instance ()->orb () == 0)
938 ACE_ERROR ((LM_ERROR
,
939 "(%P|%t) Test_ECG::shutdown_consumer, "
940 "ORB instance is 0\n"));
944 TAO_ORB_Core_instance ()->orb ()->shutdown ();
950 Test_ECG::shutdown (void)
952 ACE_DEBUG ((LM_DEBUG
, "Shutting down the multiple EC test\n"));
954 if (this->rmt_name_
.length () != 0)
956 this->ecg_
.shutdown ();
959 TAO_ORB_Core_instance ()->orb ()->shutdown ();
964 Test_ECG::dump_results (void)
966 const int bufsize
= 512;
967 ACE_TCHAR buf
[bufsize
];
970 for (i
= 0; i
< this->hp_consumers_
; ++i
)
972 ACE_OS::sprintf (buf
, ACE_TEXT("HP%02d"), i
);
973 this->dump_results (buf
, this->stats_
[i
]);
975 for (i
= 0; i
< this->lp_consumers_
; ++i
)
977 ACE_OS::sprintf (buf
, ACE_TEXT("LP%02d"), i
);
978 this->dump_results (buf
, this->stats_
[i
+ this->hp_consumers_
]);
980 CORBA::ULong tmp
= ACE_U64_TO_U32 (this->test_stop_
- this->test_start_
);
981 double usec
= tmp
/ 1000.0;
982 ACE_DEBUG ((LM_DEBUG
, "Time[TOTAL]: %.3f\n", usec
));
986 Test_ECG::dump_results (const ACE_TCHAR
* name
, Stats
& stats
)
988 // @@ We are reporting the information without specifics about
989 double usec
= ACE_U64_TO_U32 (stats
.total_time_
) / 1000.0;
990 ACE_DEBUG ((LM_DEBUG
, "Time[LCL,%s]: %.3f\n", name
, usec
));
992 for (i
= 1; i
< stats
.lcl_count_
- 1; ++i
)
994 usec
= ACE_U64_TO_U32 (stats
.lcl_latency_
[i
]) / 1000.0;
995 ACE_DEBUG ((LM_DEBUG
, "Latency[LCL,%s]: %.3f\n", name
, usec
));
997 double percent
= stats
.laxity_
[i
] * 100.0;
998 ACE_DEBUG ((LM_DEBUG
, "Laxity[LCL,%s]: %.3f\n", name
, percent
));
1000 usec
= ACE_U64_TO_U32 (stats
.end_
[i
] - this->test_start_
) / 1000.0;
1001 ACE_DEBUG ((LM_DEBUG
, "Completion[LCL,%s]: %.3f\n", name
, usec
));
1003 for (i
= 1; i
< stats
.rmt_count_
- 1; ++i
)
1005 double usec
= ACE_U64_TO_U32 (stats
.rmt_latency_
[i
]) / 1000.0;
1006 ACE_DEBUG ((LM_DEBUG
, "Latency[RMT,%s]: %.3f\n", name
, usec
));
1011 Test_ECG::local_source (RtecEventComm::EventSourceID id
) const
1013 for (int i
= 0; i
< this->hp_suppliers_
+ this->lp_suppliers_
; ++i
)
1015 if (this->suppliers_
[i
]->supplier_id () == id
)
1022 Test_ECG::parse_args (int argc
, ACE_TCHAR
*argv
[])
1024 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("l:r:s:i:xh:w:p:d:"));
1027 while ((opt
= get_opt ()) != EOF
)
1032 this->lcl_name_
= ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ());
1036 this->rmt_name_
= ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ());
1040 if (ACE_OS::strcasecmp (get_opt
.opt_arg (), ACE_TEXT("global")) == 0)
1042 this->scheduling_type_
= Test_ECG::ss_global
;
1044 else if (ACE_OS::strcasecmp (get_opt
.opt_arg (), ACE_TEXT("local")) == 0)
1046 this->scheduling_type_
= Test_ECG::ss_local
;
1048 else if (ACE_OS::strcasecmp (get_opt
.opt_arg (), ACE_TEXT("runtime")) == 0)
1050 this->scheduling_type_
= Test_ECG::ss_runtime
;
1054 ACE_DEBUG ((LM_DEBUG
,
1055 "Unknown scheduling type <%s> "
1056 "defaulting to local\n",
1057 get_opt
.opt_arg ()));
1058 this->scheduling_type_
= Test_ECG::ss_local
;
1063 this->short_circuit_
= 1;
1069 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
1070 this->consumer_disconnects_
= ACE_OS::atoi (arg
);
1071 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1072 this->supplier_disconnects_
= ACE_OS::atoi (arg
);
1079 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
1081 this->hp_suppliers_
= ACE_OS::atoi (arg
);
1082 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1083 this->hp_consumers_
= ACE_OS::atoi (arg
);
1084 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1085 this->hp_workload_
= ACE_OS::atoi (arg
);
1086 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1087 this->hp_interval_
= ACE_OS::atoi (arg
);
1088 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1089 this->hp_message_count_
= ACE_OS::atoi (arg
);
1090 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1091 this->hps_event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1092 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1093 this->hps_event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1094 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1095 this->hpc_event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1096 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1097 this->hpc_event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1104 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
1106 this->lp_suppliers_
= ACE_OS::atoi (arg
);
1107 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1108 this->lp_consumers_
= ACE_OS::atoi (arg
);
1109 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1110 this->lp_workload_
= ACE_OS::atoi (arg
);
1111 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1112 this->lp_interval_
= ACE_OS::atoi (arg
);
1113 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1114 this->lp_message_count_
= ACE_OS::atoi (arg
);
1115 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1116 this->lps_event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1117 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1118 this->lps_event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1119 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1120 this->lpc_event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1121 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1122 this->lpc_event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1127 this->pid_file_name_
= get_opt
.opt_arg ();
1130 this->schedule_file_
= get_opt
.opt_arg ();
1135 ACE_DEBUG ((LM_DEBUG
,
1140 "-s <global|local|runtime> "
1141 "-i <consumer disc.,supplier disc.> "
1142 "-x (short circuit EC) "
1143 "-h <high priority args> "
1144 "-w <low priority args> "
1145 "-p <pid file name> "
1146 "-d <schedule file name> "
1153 if (this->hp_message_count_
< 0
1154 || this->hp_message_count_
>= Test_ECG::MAX_EVENTS
)
1156 ACE_DEBUG ((LM_DEBUG
,
1157 "%s: HP event count (%d) is out of range, "
1158 "reset to default (%d)\n",
1159 argv
[0], this->lp_message_count_
,
1161 this->hp_message_count_
= 160;
1164 if (this->lp_message_count_
< 0
1165 || this->lp_message_count_
>= Test_ECG::MAX_EVENTS
)
1167 ACE_DEBUG ((LM_DEBUG
,
1168 "%s: LP event count (%d) is out of range, "
1169 "reset to default (%d)\n",
1170 argv
[0], this->lp_message_count_
,
1172 this->lp_message_count_
= 4;
1175 if (this->hp_consumers_
<= 0
1176 || this->lp_consumers_
< 0
1177 || this->hp_consumers_
+ this->lp_consumers_
>= Test_ECG::MAX_CONSUMERS
1178 || this->hp_suppliers_
<= 0
1179 || this->lp_suppliers_
< 0
1180 || this->hp_suppliers_
+ this->lp_suppliers_
>= Test_ECG::MAX_SUPPLIERS
)
1182 ACE_ERROR_RETURN ((LM_DEBUG
,
1183 "%s: number of consumers (low: %d, high: %d) or "
1184 "suppliers (low: %d, high: %d) out of range\n",
1186 lp_consumers_
, hp_consumers_
,
1187 lp_suppliers_
, lp_suppliers_
), -1);
1193 Test_Supplier::Test_Supplier (Test_ECG
*test
,
1202 Test_Supplier::open (const char* name
,
1206 const RtecScheduler::Period_t
& rate
,
1207 RtecEventChannelAdmin::EventChannel_ptr ec
)
1209 this->event_a_
= event_a
;
1210 this->event_b_
= event_b
;
1211 this->message_count_
= message_count
;
1213 RtecScheduler::Scheduler_ptr server
=
1214 ACE_Scheduler_Factory::server ();
1216 RtecScheduler::handle_t rt_info
=
1217 server
->create (name
);
1219 // The execution times are set to reasonable values, but
1220 // actually they are changed on the real execution, i.e. we
1221 // lie to the scheduler to obtain right priorities; but we
1222 // don't care if the set is schedulable.
1223 ACE_Time_Value
tv (0, 2000);
1224 TimeBase::TimeT time
;
1225 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
1226 ACE_DEBUG ((LM_DEBUG
, "register supplier \"%C\"\n", name
));
1227 server
->set (rt_info
,
1228 RtecScheduler::VERY_HIGH_CRITICALITY
,
1231 RtecScheduler::VERY_LOW_IMPORTANCE
,
1234 RtecScheduler::OPERATION
);
1236 this->supplier_id_
= ACE::crc32 (name
);
1237 ACE_DEBUG ((LM_DEBUG
, "ID for <%C> is %04.4x\n", name
,
1238 this->supplier_id_
));
1240 ACE_SupplierQOS_Factory qos
;
1241 qos
.insert (this->supplier_id_
,
1244 qos
.insert (this->supplier_id_
,
1247 qos
.insert (this->supplier_id_
,
1248 ACE_ES_EVENT_SHUTDOWN
,
1251 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin
=
1252 ec
->for_suppliers ();
1254 this->consumer_proxy_
=
1255 supplier_admin
->obtain_push_consumer ();
1257 RtecEventComm::PushSupplier_var objref
= this->_this ();
1259 this->consumer_proxy_
->connect_push_supplier (objref
.in (),
1260 qos
.get_SupplierQOS ());
1264 Test_Supplier::close (void)
1266 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
1269 RtecEventChannelAdmin::ProxyPushConsumer_var proxy
=
1270 this->consumer_proxy_
._retn ();
1271 proxy
->disconnect_push_consumer ();
1275 Test_Supplier::activate (const char* name
,
1276 const RtecScheduler::Period_t
& rate
,
1277 RtecEventChannelAdmin::EventChannel_ptr ec
)
1279 RtecScheduler::Scheduler_ptr server
=
1280 ACE_Scheduler_Factory::server ();
1282 const int bufsize
= 512;
1284 ACE_OS::strcpy (buf
, "consumer_");
1285 ACE_OS::strcat (buf
, name
);
1286 RtecScheduler::handle_t rt_info
=
1287 server
->create (buf
);
1289 // The execution times are set to reasonable values, but
1290 // actually they are changed on the real execution, i.e. we
1291 // lie to the scheduler to obtain right priorities; but we
1292 // don't care if the set is schedulable.
1293 ACE_Time_Value
tv (0, 2000);
1294 TimeBase::TimeT time
;
1295 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
1296 ACE_DEBUG ((LM_DEBUG
, "activate \"%C\"\n", buf
));
1297 server
->set (rt_info
,
1298 RtecScheduler::VERY_HIGH_CRITICALITY
,
1301 RtecScheduler::VERY_LOW_IMPORTANCE
,
1304 RtecScheduler::OPERATION
);
1306 // Also connect our consumer for timeout events from the EC.
1307 int interval
= rate
/ 10;
1308 ACE_Time_Value
tv_timeout (interval
/ ACE_ONE_SECOND_IN_USECS
,
1309 interval
% ACE_ONE_SECOND_IN_USECS
);
1310 TimeBase::TimeT timeout
;
1311 ORBSVCS_Time::Time_Value_to_TimeT (timeout
, tv_timeout
);
1313 ACE_ConsumerQOS_Factory consumer_qos
;
1314 consumer_qos
.start_disjunction_group ();
1315 consumer_qos
.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT
,
1319 // = Connect as a consumer.
1320 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
1321 ec
->for_consumers ();
1323 this->supplier_proxy_
=
1324 consumer_admin
->obtain_push_supplier ();
1326 RtecEventComm::PushConsumer_var cref
=
1327 this->consumer_
._this ();
1329 this->supplier_proxy_
->connect_push_consumer (
1331 consumer_qos
.get_ConsumerQOS ());
1335 Test_Supplier::push (const RtecEventComm::EventSet
& events
)
1338 const int bufsize
= 128;
1340 ACE_OS::sprintf (buf
, "Supplier %d receives event in thread: ",
1341 this->supplier_id_
);
1342 print_priority_info (buf
);
1345 if (events
.length () == 0 || this->message_count_
< 0)
1347 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
1351 RtecEventComm::EventSet
sent (events
.length ());
1352 sent
.length (events
.length ());
1354 for (u_int i
= 0; i
< events
.length (); ++i
)
1356 const RtecEventComm::Event
& e
= events
[i
];
1357 if (e
.header
.type
!= ACE_ES_EVENT_INTERVAL_TIMEOUT
)
1360 // ACE_DEBUG ((LM_DEBUG, "Test_Supplier - timeout (%t)\n"));
1362 RtecEventComm::Event
& s
= sent
[i
];
1363 s
.header
.source
= this->supplier_id_
;
1366 ACE_hrtime_t t
= ACE_OS::gethrtime ();
1367 ORBSVCS_Time::hrtime_to_TimeT (s
.header
.creation_time
, t
);
1369 this->message_count_
--;
1371 if (this->message_count_
< 0)
1373 this->test_
->shutdown_supplier (this->cookie_
,
1374 this->consumer_proxy_
.in ());
1376 if (this->message_count_
% 2 == 0)
1378 // Generate an A event...
1379 s
.header
.type
= this->event_a_
;
1383 s
.header
.type
= this->event_b_
;
1386 this->test_
->push_supplier (this->cookie_
,
1387 this->consumer_proxy_
.in (),
1392 Test_Supplier::disconnect_push_supplier (void)
1394 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
1397 this->supplier_proxy_
->disconnect_push_supplier ();
1401 Test_Supplier::disconnect_push_consumer (void)
1405 int Test_Supplier::supplier_id (void) const
1407 return this->supplier_id_
;
1410 Test_Consumer::Test_Consumer (Test_ECG
*test
,
1418 Test_Consumer::open (const char* name
,
1419 int event_a
, int event_b
,
1420 RtecEventChannelAdmin::EventChannel_ptr ec
)
1422 RtecScheduler::Scheduler_ptr server
=
1423 ACE_Scheduler_Factory::server ();
1425 RtecScheduler::handle_t rt_info
=
1426 server
->create (name
);
1428 // The worst case execution time is far less than 2
1429 // milliseconds, but that is a safe estimate....
1430 ACE_Time_Value
tv (0, 2000);
1431 TimeBase::TimeT time
;
1432 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
1433 ACE_DEBUG ((LM_DEBUG
, "register consumer \"%C\"\n", name
));
1434 server
->set (rt_info
,
1435 RtecScheduler::VERY_HIGH_CRITICALITY
,
1438 RtecScheduler::VERY_LOW_IMPORTANCE
,
1441 RtecScheduler::OPERATION
);
1443 ACE_ConsumerQOS_Factory qos
;
1444 qos
.start_disjunction_group ();
1445 qos
.insert_type (ACE_ES_EVENT_SHUTDOWN
, rt_info
);
1446 qos
.insert_type (event_a
, rt_info
);
1447 qos
.insert_type (event_b
, rt_info
);
1449 // = Connect as a consumer.
1450 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
1451 ec
->for_consumers ();
1453 this->supplier_proxy_
=
1454 consumer_admin
->obtain_push_supplier ();
1456 RtecEventComm::PushConsumer_var objref
= this->_this ();
1458 this->supplier_proxy_
->connect_push_consumer (objref
.in (),
1459 qos
.get_ConsumerQOS ());
1463 Test_Consumer::close (void)
1465 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
1468 RtecEventChannelAdmin::ProxyPushSupplier_var proxy
=
1469 this->supplier_proxy_
._retn ();
1470 proxy
->disconnect_push_supplier ();
1474 Test_Consumer::push (const RtecEventComm::EventSet
& events
)
1476 ACE_hrtime_t arrival
= ACE_OS::gethrtime ();
1477 this->test_
->push_consumer (this->cookie_
, arrival
, events
);
1481 Test_Consumer::disconnect_push_consumer (void)
1486 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
1490 // Dynamically allocate the Test_ECG instance so that we don't have
1491 // to worry about running out of stack space if it's large.
1492 ACE_NEW_RETURN (test
,
1496 const int status
= test
->run (argc
, argv
);