1 #include "EC_Multiple.h"
3 #include "Scheduler_Runtime1.h"
4 #include "Scheduler_Runtime2.h"
5 #include "Scheduler_Runtime_Dynamic.h" /* infos_3 */
7 #include "orbsvcs/Event_Utilities.h"
8 #include "orbsvcs/Event_Service_Constants.h"
9 #include "orbsvcs/Scheduler_Factory.h"
10 #include "orbsvcs/Time_Utilities.h"
11 #include "orbsvcs/RtecEventChannelAdminC.h"
12 #include "orbsvcs/Sched/Config_Scheduler.h"
13 #include "orbsvcs/Event/EC_Event_Channel.h"
14 #include "orbsvcs/Runtime_Scheduler.h"
16 #include "tao/ORB_Core.h"
18 #include "ace/Get_Opt.h"
19 #include "ace/Sched_Params.h"
20 #include "ace/OS_NS_errno.h"
21 #include "ace/OS_NS_strings.h"
25 : lcl_name_ ("Test_ECG"),
27 scheduling_type_ (Test_ECG::ss_runtime
),
28 consumer_disconnects_ (0),
29 supplier_disconnects_ (0),
35 hp_message_count_ (200),
36 hps_event_a_ (ACE_ES_EVENT_UNDEFINED
),
37 hps_event_b_ (ACE_ES_EVENT_UNDEFINED
+ 1),
38 hpc_event_a_ (ACE_ES_EVENT_UNDEFINED
),
39 hpc_event_b_ (ACE_ES_EVENT_UNDEFINED
+ 1),
43 lp_interval_ (100000),
44 lp_message_count_ (50),
52 ready_cnd_ (ready_mtx_
)
57 print_priority_info (const char *const name
)
59 #if defined (ACE_HAS_PTHREADS)
60 #if defined (ACE_HAS_PTHREADS)
61 struct sched_param param
;
64 if ((status
= pthread_getschedparam (pthread_self (), &policy
,
67 "%C (%lu|%u); policy is %d, priority is %d\n",
72 policy
, param
.sched_priority
));
74 ACE_DEBUG ((LM_DEBUG
,"pthread_getschedparam failed: %d\n", status
));
77 ACE_UNUSED_ARG (name
);
78 #endif /* ACE_HAS_PTHREADS */
81 // Find what scheduling class the thread's LWP is in.
82 //FUZZ: disable check_for_lack_ACE_OS
83 ACE_Sched_Params
sched_params (ACE_SCHED_OTHER
, 0);
84 //FUZZ: enable check_for_lack_ACE_OS
85 if (ACE_OS::lwp_getparams (sched_params
) == -1)
87 ACE_OS::perror ("ACE_OS::lwp_getparams");
90 else if (sched_params
.policy () == ACE_SCHED_FIFO
||
91 sched_params
.policy () == ACE_SCHED_RR
)
93 // This thread's LWP is in the RT class.
95 "RT class; priority: %d, quantum: %u msec\n",
96 sched_params
.priority (),
97 sched_params
.quantum ().msec ()));
101 ACE_DEBUG ((LM_DEBUG
,
102 "TS class; priority: %d\n",
103 sched_params
.priority ()));
107 ACE_UNUSED_ARG (name
);
108 #endif /* ACE_HAS_PTHREADS */
112 Test_ECG::run (int argc
, ACE_TCHAR
* argv
[])
117 CORBA::ORB_init (argc
, argv
);
119 CORBA::Object_var poa_object
=
120 orb
->resolve_initial_references("RootPOA");
122 if (CORBA::is_nil (poa_object
.in ()))
123 ACE_ERROR_RETURN ((LM_ERROR
,
124 " (%P|%t) Unable to initialize the POA.\n"),
127 PortableServer::POA_var root_poa
=
128 PortableServer::POA::_narrow (poa_object
.in ());
130 PortableServer::POAManager_var poa_manager
=
131 root_poa
->the_POAManager ();
133 poa_manager
->activate ();
135 if (this->parse_args (argc
, argv
))
138 ACE_DEBUG ((LM_DEBUG
,
139 "Execution parameters:\n"
142 " scheduler type = <%d>\n"
143 " consumer disconnects = <%d>\n"
144 " supplier disconnects = <%d>\n"
145 " short circuit EC = <%d>\n"
146 " HP suppliers = <%d>\n"
147 " HP consumers = <%d>\n"
148 " HP workload = <%d> (iterations)\n"
149 " HP interval between events = <%d> (usecs)\n"
150 " HP message count = <%d>\n"
151 " HP supplier Event A = <%d>\n"
152 " HP supplier Event B = <%d>\n"
153 " HP consumer Event A = <%d>\n"
154 " HP consumer Event B = <%d>\n"
155 " LP suppliers = <%d>\n"
156 " LP consumers = <%d>\n"
157 " LP workload = <%d> (iterations)\n"
158 " LP interval between events = <%d> (usecs)\n"
159 " LP message count = <%d>\n"
160 " LP supplier Event A = <%d>\n"
161 " LP supplier Event B = <%d>\n"
162 " LP consumer Event A = <%d>\n"
163 " LP consumer Event B = <%d>\n"
164 " schedule_file = <%s>\n"
165 " pid file name = <%s>\n",
166 this->lcl_name_
.length () ? this->lcl_name_
.c_str () : "nil",
167 this->rmt_name_
.length () ? this->rmt_name_
.c_str () : "nil",
168 this->scheduling_type_
,
169 this->consumer_disconnects_
,
170 this->supplier_disconnects_
,
171 this->short_circuit_
,
177 this->hp_message_count_
,
187 this->lp_message_count_
,
193 this->schedule_file_
?this->schedule_file_
:ACE_TEXT("nil"),
194 this->pid_file_name_
?this->pid_file_name_
:ACE_TEXT("nil")));
196 print_priority_info ("Test_ECG::run (Main)");
198 if (this->pid_file_name_
!= 0)
200 FILE* pid
= ACE_OS::fopen (this->pid_file_name_
, "w");
203 ACE_OS::fprintf (pid
, "%ld\n",
204 static_cast<long> (ACE_OS::getpid ()));
205 ACE_OS::fclose (pid
);
210 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
);
211 // Enable FIFO scheduling
213 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
215 ACE_SCOPE_PROCESS
)) != 0)
217 if (ACE_OS::last_error () == EPERM
)
218 ACE_DEBUG ((LM_DEBUG
,
219 "%s: user is not superuser, "
220 "so remain in time-sharing class\n", argv
[0]));
222 ACE_ERROR ((LM_ERROR
,
223 "%s: ACE_OS::sched_params failed\n", argv
[0]));
226 if (ACE_OS::thr_setprio (min_priority
) == -1)
228 ACE_ERROR ((LM_ERROR
, "(%P|%t) main thr_setprio failed\n"));
231 print_priority_info ("Test_ECG::run (Main after thr_setprio)");
233 CORBA::Object_var naming_obj
=
234 orb
->resolve_initial_references ("NameService");
236 if (CORBA::is_nil (naming_obj
.in ()))
237 ACE_ERROR_RETURN ((LM_ERROR
,
238 " (%P|%t) Unable to get the Naming Service.\n"),
241 CosNaming::NamingContext_var naming_context
=
242 CosNaming::NamingContext::_narrow (naming_obj
.in ());
244 std::unique_ptr
<POA_RtecScheduler::Scheduler
> scheduler_impl
;
245 RtecScheduler::Scheduler_var scheduler
;
247 switch (this->scheduling_type_
)
250 ACE_ERROR ((LM_WARNING
, "Unknown scheduling type %d\n",
251 this->scheduling_type_
));
253 case Test_ECG::ss_global
:
256 case Test_ECG::ss_local
:
258 std::unique_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl (new ACE_Config_Scheduler
);
259 scheduler_impl
= std::move(auto_scheduler_impl
);
261 if (scheduler_impl
.get () == 0)
263 scheduler
= scheduler_impl
->_this ();
266 case Test_ECG::ss_runtime
:
267 if (ACE_OS::strcmp (this->lcl_name_
.c_str (), "ECM1") == 0)
269 // This setups Scheduler_Factory to use the runtime version
270 ACE_Scheduler_Factory::use_runtime (
271 sizeof (runtime_configs_1
)/sizeof (runtime_configs_1
[0]),
273 sizeof (runtime_infos_1
)/sizeof (runtime_infos_1
[0]),
276 std::unique_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl
277 (new ACE_Runtime_Scheduler (runtime_configs_1_size
,
279 runtime_infos_1_size
,
281 scheduler_impl
= std::move(auto_scheduler_impl
);
283 if (scheduler_impl
.get () == 0)
285 scheduler
= scheduler_impl
->_this ();
287 else if (ACE_OS::strcmp (this->lcl_name_
.c_str (), "ECM2") == 0)
289 // This setups Scheduler_Factory to use the runtime version
290 ACE_Scheduler_Factory::use_runtime (
291 sizeof (runtime_configs_2
)/sizeof (runtime_configs_2
[0]),
293 sizeof (runtime_infos_2
)/sizeof (runtime_infos_2
[0]),
296 std::unique_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl
297 (new ACE_Runtime_Scheduler (runtime_configs_2_size
,
299 runtime_infos_2_size
,
301 scheduler_impl
= std::move(auto_scheduler_impl
);
303 if (scheduler_impl
.get () == 0)
305 scheduler
= scheduler_impl
->_this ();
307 else if (ACE_OS::strcmp (this->lcl_name_
.c_str (), "ECM3") == 0)
309 // This setups Scheduler_Factory to use the runtime version
310 ACE_Scheduler_Factory::use_runtime (
311 sizeof (runtime_configs_3
)/sizeof (runtime_configs_3
[0]),
313 sizeof (runtime_infos_3
)/sizeof (runtime_infos_3
[0]),
316 std::unique_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl
317 (new ACE_Runtime_Scheduler (runtime_configs_3_size
,
319 runtime_infos_3_size
,
321 scheduler_impl
= std::move(auto_scheduler_impl
);
323 if (scheduler_impl
.get () == 0)
325 scheduler
= scheduler_impl
->_this ();
329 ACE_ERROR ((LM_WARNING
,
330 "Unknown name <%C> defaulting to "
331 "config scheduler\n", this->lcl_name_
.c_str ()));
333 std::unique_ptr
<POA_RtecScheduler::Scheduler
> auto_scheduler_impl (new ACE_Config_Scheduler
);
334 scheduler_impl
= std::move(auto_scheduler_impl
);
336 if (scheduler_impl
.get () == 0)
338 scheduler
= scheduler_impl
->_this ();
343 // We use this buffer to generate the names of the local
345 const int bufsize
= 512;
348 // Register the scheduler with the naming service.
349 switch (this->scheduling_type_
)
352 case Test_ECG::ss_global
:
355 case Test_ECG::ss_local
:
356 case Test_ECG::ss_runtime
:
358 CORBA::String_var str
=
359 orb
->object_to_string (scheduler
.in ());
360 ACE_DEBUG ((LM_DEBUG
, "The (local) scheduler IOR is <%C>\n",
363 ACE_OS::strcpy (buf
, "ScheduleService@");
364 ACE_OS::strcat (buf
, this->lcl_name_
.c_str ());
366 // Register the servant with the Naming Context....
367 CosNaming::Name
schedule_name (1);
368 schedule_name
.length (1);
369 schedule_name
[0].id
= CORBA::string_dup (buf
);
370 naming_context
->bind (schedule_name
, scheduler
.in ());
372 if (ACE_Scheduler_Factory::use_config (naming_context
.in (),
379 // Create the EventService implementation, but don't start its
381 TAO_EC_Event_Channel_Attributes
attr (root_poa
.in (),
383 TAO_EC_Event_Channel
ec_impl (attr
);
385 // Register Event_Service with the Naming Service.
386 RtecEventChannelAdmin::EventChannel_var ec
=
389 CORBA::String_var str
=
390 orb
->object_to_string (ec
.in ());
393 ACE_DEBUG ((LM_DEBUG
, "The (local) EC IOR is <%C>\n", str
.in ()));
395 ACE_OS::strcpy (buf
, "EventChannel@");
396 ACE_OS::strcat (buf
, this->lcl_name_
.c_str ());
398 CosNaming::Name
channel_name (1);
399 channel_name
.length (1);
400 channel_name
[0].id
= CORBA::string_dup (buf
);
401 naming_context
->bind (channel_name
, ec
.in ());
403 ACE_DEBUG ((LM_DEBUG
, "waiting to start\n"));
405 ACE_Time_Value
tv (15, 0);
407 if (this->rmt_name_
.length () != 0)
412 ACE_DEBUG ((LM_DEBUG
, "starting....\n"));
414 RtecEventChannelAdmin::EventChannel_var local_ec
=
415 this->get_ec (naming_context
.in (),
416 this->lcl_name_
.c_str ());
418 ACE_DEBUG ((LM_DEBUG
, "located local EC\n"));
420 for (int sd
= 0; sd
< this->supplier_disconnects_
; ++sd
)
422 this->connect_suppliers (local_ec
.in ());
423 this->disconnect_suppliers ();
425 ACE_DEBUG ((LM_DEBUG
, "Supplier disconnection %d\n", sd
));
428 this->connect_suppliers (local_ec
.in ());
430 ACE_DEBUG ((LM_DEBUG
, "connected supplier\n"));
432 RtecEventChannelAdmin::Observer_Handle observer_handle
= 0;
433 if (this->rmt_name_
.length () != 0)
438 RtecEventChannelAdmin::EventChannel_var remote_ec
=
439 this->get_ec (naming_context
.in (),
440 this->rmt_name_
.c_str ());
441 ACE_DEBUG ((LM_DEBUG
, "located remote EC\n"));
443 CosNaming::Name
rsch_name (1);
444 rsch_name
.length (1);
445 ACE_OS::strcpy (buf
, "ScheduleService");
446 if (this->scheduling_type_
!= Test_ECG::ss_global
)
448 ACE_OS::strcat (buf
, "@");
449 ACE_OS::strcat (buf
, this->rmt_name_
.c_str ());
451 rsch_name
[0].id
= CORBA::string_dup (buf
);
452 CORBA::Object_var tmpobj
=
453 naming_context
->resolve (rsch_name
);
455 RtecScheduler::Scheduler_var remote_sch
=
456 RtecScheduler::Scheduler::_narrow (tmpobj
.in ());
458 this->connect_ecg (local_ec
.in (),
462 ACE_DEBUG ((LM_DEBUG
, "connected proxy\n"));
467 RtecEventChannelAdmin::Observer_ptr observer
=
469 observer_handle
= ec_impl
.append_observer (observer
);
472 for (int cd
= 0; cd
< this->consumer_disconnects_
; ++cd
)
474 this->connect_consumers (local_ec
.in ());
475 this->disconnect_consumers ();
477 ACE_DEBUG ((LM_DEBUG
, "Consumer disconnection %d\n", cd
));
479 this->connect_consumers (local_ec
.in ());
481 ACE_DEBUG ((LM_DEBUG
, "connected consumer\n"));
483 this->activate_suppliers (local_ec
.in ());
485 ACE_DEBUG ((LM_DEBUG
, "suppliers are active\n"));
487 this->running_suppliers_
= this->hp_suppliers_
+ this->lp_suppliers_
;
489 // Acquire the mutex for the ready mutex, blocking any supplier
490 // that may start after this point.
491 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, ready_mon
, this->ready_mtx_
, 1);
493 this->test_start_
= ACE_OS::gethrtime ();
494 this->ready_cnd_
.broadcast ();
495 ready_mon
.release ();
497 ACE_DEBUG ((LM_DEBUG
, "activate the EC\n"));
499 if (this->rmt_name_
.length () != 0)
501 ec_impl
.remove_observer (observer_handle
);
504 // Create the EC internal threads
507 ACE_DEBUG ((LM_DEBUG
, "running the test\n"));
510 this->test_stop_
= ACE_OS::gethrtime ();
512 ACE_DEBUG ((LM_DEBUG
, "shutdown the EC\n"));
515 this->dump_results ();
517 if (this->schedule_file_
!= 0)
519 RtecScheduler::RT_Info_Set_var infos
;
520 RtecScheduler::Dependency_Set_var deps
;
521 RtecScheduler::Config_Info_Set_var configs
;
522 RtecScheduler::Scheduling_Anomaly_Set_var anomalies
;
524 ACE_Scheduler_Factory::server ()->compute_scheduling
525 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
,
527 ACE_Sched_Params::priority_max (ACE_SCHED_FIFO
,
529 infos
.out (), deps
.out (),
530 configs
.out (), anomalies
.out ());
532 ACE_Scheduler_Factory::dump_schedule (infos
.in (),
536 this->schedule_file_
);
539 naming_context
->unbind (channel_name
);
541 if (this->rmt_name_
.length () != 0)
544 this->ecg_
.shutdown ();
547 this->disconnect_consumers ();
548 this->disconnect_suppliers ();
550 ACE_DEBUG ((LM_DEBUG
, "shutdown grace period\n"));
554 catch (const CORBA::SystemException
& sys_ex
)
556 sys_ex
._tao_print_exception ("SYS_EX");
558 catch (const CORBA::Exception
& ex
)
560 ex
._tao_print_exception ("NON SYS EX");
565 RtecEventChannelAdmin::EventChannel_ptr
566 Test_ECG::get_ec (CosNaming::NamingContext_ptr naming_context
,
567 const char* process_name
)
569 const int bufsize
= 512;
571 ACE_OS::strcpy (buf
, "EventChannel@");
572 ACE_OS::strcat (buf
, process_name
);
574 CosNaming::Name
channel_name (1);
575 channel_name
.length (1);
576 channel_name
[0].id
= CORBA::string_dup (buf
);
578 CORBA::Object_var ec_ptr
=
579 naming_context
->resolve (channel_name
);
580 if (CORBA::is_nil (ec_ptr
.in ()))
581 return RtecEventChannelAdmin::EventChannel::_nil ();
583 return RtecEventChannelAdmin::EventChannel::_narrow (ec_ptr
.in ());
587 Test_ECG::disconnect_suppliers ()
589 for (int i
= 0; i
< this->hp_suppliers_
+ this->lp_suppliers_
; ++i
)
591 this->suppliers_
[i
]->close ();
596 Test_ECG::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec
)
599 for (i
= 0; i
< this->hp_suppliers_
; ++i
)
601 // Limit the number of messages sent by each supplier
602 int mc
= this->hp_message_count_
/ this->hp_suppliers_
;
607 ACE_OS::sprintf (buf
, "hp_supplier_%02d@%s", i
, this->lcl_name_
.c_str ());
609 ACE_NEW (this->suppliers_
[i
],
610 Test_Supplier (this, this->suppliers_
+ i
));
612 this->suppliers_
[i
]->open (buf
,
616 this->hp_interval_
* 10,
620 for (; i
< this->hp_suppliers_
+ this->lp_suppliers_
; ++i
)
622 // Limit the number of messages sent by each supplier
623 int mc
= this->lp_message_count_
/ this->lp_suppliers_
;
628 ACE_OS::sprintf (buf
, "lp_supplier_%02d@%s",
629 i
- this->hp_suppliers_
, this->lcl_name_
.c_str ());
631 ACE_NEW (this->suppliers_
[i
],
632 Test_Supplier (this, this->suppliers_
+ i
));
634 this->suppliers_
[i
]->open (buf
,
638 this->lp_interval_
* 10,
644 Test_ECG::disconnect_consumers ()
646 for (int i
= 0; i
< this->hp_consumers_
+ this->lp_consumers_
; ++i
)
648 this->consumers_
[i
]->close ();
653 Test_ECG::activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec
)
658 for (i
= 0; i
< this->hp_suppliers_
; ++i
)
660 // Limit the number of messages sent by each supplier
661 int mc
= this->hp_message_count_
/ this->hp_suppliers_
;
666 ACE_OS::sprintf (buf
, "hp_supplier_%02d@%s", i
, this->lcl_name_
.c_str ());
668 this->suppliers_
[i
]->activate (buf
,
669 this->hp_interval_
* 10,
673 for (; i
< this->hp_suppliers_
+ this->lp_suppliers_
; ++i
)
675 // Limit the number of messages sent by each supplier
676 int mc
= this->lp_message_count_
/ this->lp_suppliers_
;
681 ACE_OS::sprintf (buf
, "lp_supplier_%02d@%s",
682 i
- this->hp_suppliers_
, this->lcl_name_
.c_str ());
684 this->suppliers_
[i
]->activate (buf
,
685 this->lp_interval_
* 10,
689 catch (const CORBA::Exception
&)
696 Test_ECG::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec
)
699 for (i
= 0; i
< this->hp_consumers_
; ++i
)
702 ACE_OS::sprintf (buf
, "hp_consumer_%02d@%s", i
, this->lcl_name_
.c_str ());
704 ACE_NEW (this->consumers_
[i
],
705 Test_Consumer (this, this->consumers_
+ i
));
707 this->consumers_
[i
]->open (buf
,
711 this->stats_
[i
].total_time_
= 0;
712 this->stats_
[i
].lcl_count_
= 0;
713 this->stats_
[i
].rmt_count_
= 0;
716 for (; i
< this->hp_consumers_
+ this->lp_consumers_
; ++i
)
719 ACE_OS::sprintf (buf
, "lp_consumer_%02d@%s",
720 i
- this->hp_consumers_
, this->lcl_name_
.c_str ());
722 ACE_NEW (this->consumers_
[i
],
723 Test_Consumer (this, this->consumers_
+ i
));
725 this->consumers_
[i
]->open (buf
,
729 this->stats_
[i
].total_time_
= 0;
730 this->stats_
[i
].lcl_count_
= 0;
731 this->stats_
[i
].rmt_count_
= 0;
733 this->running_consumers_
= this->hp_consumers_
+ this->lp_consumers_
;
737 Test_ECG::connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec
,
738 RtecEventChannelAdmin::EventChannel_ptr remote_ec
,
739 RtecScheduler::Scheduler_ptr remote_sch
)
741 RtecScheduler::Scheduler_ptr local_sch
=
742 ACE_Scheduler_Factory::server ();
745 const int bufsize
= 512;
746 char ecg_name
[bufsize
];
747 ACE_OS::strcpy (ecg_name
, "ecg_");
748 ACE_OS::strcat (ecg_name
, this->lcl_name_
.c_str ());
750 // We could use the same name on the local and remote scheduler,
751 // but that fails when using a global scheduler.
753 ACE_OS::strcpy (rmt
, ecg_name
);
754 ACE_OS::strcat (rmt
, "@");
755 ACE_OS::strcat (rmt
, this->rmt_name_
.c_str ());
757 // We could use the same name on the local and remote scheduler,
758 // but that fails when using a global scheduler.
760 ACE_OS::strcpy (lcl
, ecg_name
);
761 ACE_OS::strcat (lcl
, "@");
762 ACE_OS::strcat (lcl
, this->lcl_name_
.c_str ());
764 this->ecg_
.init (remote_ec
, local_ec
, remote_sch
, local_sch
,
769 Test_ECG::push_supplier (void * /* cookie */,
770 RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer
,
771 const RtecEventComm::EventSet
&events
)
773 this->wait_until_ready ();
774 // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events sent by supplier\n"));
775 // @@ TODO we could keep somekind of stats here...
776 if (!this->short_circuit_
)
778 consumer
->push (events
);
783 for (; i
< this->hp_consumers_
; ++i
)
785 this->consumers_
[i
]->push (events
);
787 for (; i
< this->hp_consumers_
+ this->lp_consumers_
; ++i
)
789 this->consumers_
[i
]->push (events
);
795 Test_ECG::push_consumer (void *consumer_cookie
,
796 ACE_hrtime_t arrival
,
797 const RtecEventComm::EventSet
&events
)
800 (reinterpret_cast<Test_Consumer
**> (consumer_cookie
)
803 // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events received by consumer %d\n", ID));
805 if (events
.length () == 0)
807 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
811 // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
814 const int bufsize
= 128;
816 ACE_OS::sprintf (buf
, "Consumer %d receives event in thread: ", ID
);
817 print_priority_info (buf
);
820 for (u_int i
= 0; i
< events
.length (); ++i
)
822 const RtecEventComm::Event
& e
= events
[i
];
824 if (e
.header
.type
== ACE_ES_EVENT_SHUTDOWN
)
826 this->shutdown_consumer (ID
);
831 ORBSVCS_Time::TimeT_to_hrtime (s
, e
.header
.creation_time
);
832 ACE_hrtime_t nsec
= arrival
- s
;
833 if (this->local_source (e
.header
.source
))
835 int& count
= this->stats_
[ID
].lcl_count_
;
837 this->stats_
[ID
].lcl_latency_
[count
] = nsec
;
838 int workload
= this->hp_workload_
;
839 int interval
= this->hp_interval_
;
840 if (ID
>= this->hp_consumers_
)
842 workload
= this->lp_workload_
;
843 interval
= this->lp_interval_
;
846 for (int j
= 0; j
< workload
; ++j
)
848 // Eat a little CPU so the Utilization test can measure the
850 /* takes about 40.2 usecs on a 167 MHz Ultra2 */
852 ACE::is_prime (n
, 2, n
/ 2);
854 // Increment the elapsed time on this consumer.
855 ACE_hrtime_t now
= ACE_OS::gethrtime ();
856 this->stats_
[ID
].total_time_
+= now
- arrival
;
857 this->stats_
[ID
].end_
[count
] = now
;
859 // We estimate our laxity based on the event creation
860 // time... it may not be very precise, but will do; other
861 // strategies include:
862 // + Keep track of the "current frame", then then deadline
863 // is the end of the frame.
864 // + Use the start of the test to keep the current frame.
865 // + Use the last execution.
867 CORBA::ULong tmp
= ACE_U64_TO_U32 (s
- now
);
868 this->stats_
[ID
].laxity_
[count
] = 1 + tmp
/1000.0F
/interval
;
873 int& count
= this->stats_
[ID
].rmt_count_
;
874 this->stats_
[ID
].rmt_latency_
[count
] = nsec
;
881 Test_ECG::wait_until_ready ()
883 ACE_GUARD (TAO_SYNCH_MUTEX
, ready_mon
, this->ready_mtx_
);
884 while (!this->ready_
)
885 this->ready_cnd_
.wait ();
889 Test_ECG::shutdown_supplier (void* /* supplier_cookie */,
890 RtecEventComm::PushConsumer_ptr consumer
)
892 this->running_suppliers_
--;
893 if (this->running_suppliers_
!= 0)
896 // We propagate a shutdown event through the system...
897 //FUZZ: disable check_for_lack_ACE_OS
898 RtecEventComm::EventSet
shutdown (1);
899 //FUZZ: enable check_for_lack_ACE_OS
901 RtecEventComm::Event
& s
= shutdown
[0];
906 ACE_hrtime_t t
= ACE_OS::gethrtime ();
907 ORBSVCS_Time::hrtime_to_TimeT (s
.header
.creation_time
, t
);
908 s
.header
.type
= ACE_ES_EVENT_SHUTDOWN
;
909 consumer
->push (shutdown
);
913 Test_ECG::shutdown_consumer (int id
)
915 ACE_DEBUG ((LM_DEBUG
, "Shutdown consumer %d\n", id
));
916 this->running_consumers_
--;
918 if (this->running_consumers_
== 0)
920 if (TAO_ORB_Core_instance ()->orb () == 0)
922 ACE_ERROR ((LM_ERROR
,
923 "(%P|%t) Test_ECG::shutdown_consumer, "
924 "ORB instance is 0\n"));
928 TAO_ORB_Core_instance ()->orb ()->shutdown ();
934 Test_ECG::shutdown ()
936 ACE_DEBUG ((LM_DEBUG
, "Shutting down the multiple EC test\n"));
938 if (this->rmt_name_
.length () != 0)
940 this->ecg_
.shutdown ();
943 TAO_ORB_Core_instance ()->orb ()->shutdown ();
948 Test_ECG::dump_results ()
950 const int bufsize
= 512;
951 ACE_TCHAR buf
[bufsize
];
954 for (i
= 0; i
< this->hp_consumers_
; ++i
)
956 ACE_OS::sprintf (buf
, ACE_TEXT("HP%02d"), i
);
957 this->dump_results (buf
, this->stats_
[i
]);
959 for (i
= 0; i
< this->lp_consumers_
; ++i
)
961 ACE_OS::sprintf (buf
, ACE_TEXT("LP%02d"), i
);
962 this->dump_results (buf
, this->stats_
[i
+ this->hp_consumers_
]);
964 CORBA::ULong tmp
= ACE_U64_TO_U32 (this->test_stop_
- this->test_start_
);
965 double usec
= tmp
/ 1000.0;
966 ACE_DEBUG ((LM_DEBUG
, "Time[TOTAL]: %.3f\n", usec
));
970 Test_ECG::dump_results (const ACE_TCHAR
* name
, Stats
& stats
)
972 // @@ We are reporting the information without specifics about
973 double usec
= ACE_U64_TO_U32 (stats
.total_time_
) / 1000.0;
974 ACE_DEBUG ((LM_DEBUG
, "Time[LCL,%s]: %.3f\n", name
, usec
));
976 for (i
= 1; i
< stats
.lcl_count_
- 1; ++i
)
978 usec
= ACE_U64_TO_U32 (stats
.lcl_latency_
[i
]) / 1000.0;
979 ACE_DEBUG ((LM_DEBUG
, "Latency[LCL,%s]: %.3f\n", name
, usec
));
981 double percent
= stats
.laxity_
[i
] * 100.0;
982 ACE_DEBUG ((LM_DEBUG
, "Laxity[LCL,%s]: %.3f\n", name
, percent
));
984 usec
= ACE_U64_TO_U32 (stats
.end_
[i
] - this->test_start_
) / 1000.0;
985 ACE_DEBUG ((LM_DEBUG
, "Completion[LCL,%s]: %.3f\n", name
, usec
));
987 for (i
= 1; i
< stats
.rmt_count_
- 1; ++i
)
989 double usec
= ACE_U64_TO_U32 (stats
.rmt_latency_
[i
]) / 1000.0;
990 ACE_DEBUG ((LM_DEBUG
, "Latency[RMT,%s]: %.3f\n", name
, usec
));
995 Test_ECG::local_source (RtecEventComm::EventSourceID id
) const
997 for (int i
= 0; i
< this->hp_suppliers_
+ this->lp_suppliers_
; ++i
)
999 if (this->suppliers_
[i
]->supplier_id () == id
)
1006 Test_ECG::parse_args (int argc
, ACE_TCHAR
*argv
[])
1008 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("l:r:s:i:xh:w:p:d:"));
1011 while ((opt
= get_opt ()) != EOF
)
1016 this->lcl_name_
= ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ());
1020 this->rmt_name_
= ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ());
1024 if (ACE_OS::strcasecmp (get_opt
.opt_arg (), ACE_TEXT("global")) == 0)
1026 this->scheduling_type_
= Test_ECG::ss_global
;
1028 else if (ACE_OS::strcasecmp (get_opt
.opt_arg (), ACE_TEXT("local")) == 0)
1030 this->scheduling_type_
= Test_ECG::ss_local
;
1032 else if (ACE_OS::strcasecmp (get_opt
.opt_arg (), ACE_TEXT("runtime")) == 0)
1034 this->scheduling_type_
= Test_ECG::ss_runtime
;
1038 ACE_DEBUG ((LM_DEBUG
,
1039 "Unknown scheduling type <%s> "
1040 "defaulting to local\n",
1041 get_opt
.opt_arg ()));
1042 this->scheduling_type_
= Test_ECG::ss_local
;
1047 this->short_circuit_
= 1;
1053 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
1054 this->consumer_disconnects_
= ACE_OS::atoi (arg
);
1055 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1056 this->supplier_disconnects_
= ACE_OS::atoi (arg
);
1063 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
1065 this->hp_suppliers_
= ACE_OS::atoi (arg
);
1066 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1067 this->hp_consumers_
= ACE_OS::atoi (arg
);
1068 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1069 this->hp_workload_
= ACE_OS::atoi (arg
);
1070 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1071 this->hp_interval_
= ACE_OS::atoi (arg
);
1072 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1073 this->hp_message_count_
= ACE_OS::atoi (arg
);
1074 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1075 this->hps_event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1076 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1077 this->hps_event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1078 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1079 this->hpc_event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1080 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1081 this->hpc_event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1088 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
1090 this->lp_suppliers_
= ACE_OS::atoi (arg
);
1091 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1092 this->lp_consumers_
= ACE_OS::atoi (arg
);
1093 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1094 this->lp_workload_
= ACE_OS::atoi (arg
);
1095 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1096 this->lp_interval_
= ACE_OS::atoi (arg
);
1097 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1098 this->lp_message_count_
= ACE_OS::atoi (arg
);
1099 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1100 this->lps_event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1101 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1102 this->lps_event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1103 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1104 this->lpc_event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1105 arg
= ACE_OS::strtok_r (0, ",", &aux
);
1106 this->lpc_event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
1111 this->pid_file_name_
= get_opt
.opt_arg ();
1114 this->schedule_file_
= get_opt
.opt_arg ();
1119 ACE_DEBUG ((LM_DEBUG
,
1124 "-s <global|local|runtime> "
1125 "-i <consumer disc.,supplier disc.> "
1126 "-x (short circuit EC) "
1127 "-h <high priority args> "
1128 "-w <low priority args> "
1129 "-p <pid file name> "
1130 "-d <schedule file name> "
1137 if (this->hp_message_count_
< 0
1138 || this->hp_message_count_
>= Test_ECG::MAX_EVENTS
)
1140 ACE_DEBUG ((LM_DEBUG
,
1141 "%s: HP event count (%d) is out of range, "
1142 "reset to default (%d)\n",
1143 argv
[0], this->lp_message_count_
,
1145 this->hp_message_count_
= 160;
1148 if (this->lp_message_count_
< 0
1149 || this->lp_message_count_
>= Test_ECG::MAX_EVENTS
)
1151 ACE_DEBUG ((LM_DEBUG
,
1152 "%s: LP event count (%d) is out of range, "
1153 "reset to default (%d)\n",
1154 argv
[0], this->lp_message_count_
,
1156 this->lp_message_count_
= 4;
1159 if (this->hp_consumers_
<= 0
1160 || this->lp_consumers_
< 0
1161 || this->hp_consumers_
+ this->lp_consumers_
>= Test_ECG::MAX_CONSUMERS
1162 || this->hp_suppliers_
<= 0
1163 || this->lp_suppliers_
< 0
1164 || this->hp_suppliers_
+ this->lp_suppliers_
>= Test_ECG::MAX_SUPPLIERS
)
1166 ACE_ERROR_RETURN ((LM_DEBUG
,
1167 "%s: number of consumers (low: %d, high: %d) or "
1168 "suppliers (low: %d, high: %d) out of range\n",
1170 lp_consumers_
, hp_consumers_
,
1171 lp_suppliers_
, lp_suppliers_
), -1);
1177 Test_Supplier::Test_Supplier (Test_ECG
*test
,
1186 Test_Supplier::open (const char* name
,
1190 const RtecScheduler::Period_t
& rate
,
1191 RtecEventChannelAdmin::EventChannel_ptr ec
)
1193 this->event_a_
= event_a
;
1194 this->event_b_
= event_b
;
1195 this->message_count_
= message_count
;
1197 RtecScheduler::Scheduler_ptr server
=
1198 ACE_Scheduler_Factory::server ();
1200 RtecScheduler::handle_t rt_info
=
1201 server
->create (name
);
1203 // The execution times are set to reasonable values, but
1204 // actually they are changed on the real execution, i.e. we
1205 // lie to the scheduler to obtain right priorities; but we
1206 // don't care if the set is schedulable.
1207 ACE_Time_Value
tv (0, 2000);
1208 TimeBase::TimeT time
;
1209 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
1210 ACE_DEBUG ((LM_DEBUG
, "register supplier \"%C\"\n", name
));
1211 server
->set (rt_info
,
1212 RtecScheduler::VERY_HIGH_CRITICALITY
,
1215 RtecScheduler::VERY_LOW_IMPORTANCE
,
1218 RtecScheduler::OPERATION
);
1220 this->supplier_id_
= ACE::crc32 (name
);
1221 ACE_DEBUG ((LM_DEBUG
, "ID for <%C> is %04.4x\n", name
,
1222 this->supplier_id_
));
1224 ACE_SupplierQOS_Factory qos
;
1225 qos
.insert (this->supplier_id_
,
1228 qos
.insert (this->supplier_id_
,
1231 qos
.insert (this->supplier_id_
,
1232 ACE_ES_EVENT_SHUTDOWN
,
1235 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin
=
1236 ec
->for_suppliers ();
1238 this->consumer_proxy_
=
1239 supplier_admin
->obtain_push_consumer ();
1241 RtecEventComm::PushSupplier_var objref
= this->_this ();
1243 this->consumer_proxy_
->connect_push_supplier (objref
.in (),
1244 qos
.get_SupplierQOS ());
1248 Test_Supplier::close ()
1250 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
1253 RtecEventChannelAdmin::ProxyPushConsumer_var proxy
=
1254 this->consumer_proxy_
._retn ();
1255 proxy
->disconnect_push_consumer ();
1259 Test_Supplier::activate (const char* name
,
1260 const RtecScheduler::Period_t
& rate
,
1261 RtecEventChannelAdmin::EventChannel_ptr ec
)
1263 RtecScheduler::Scheduler_ptr server
=
1264 ACE_Scheduler_Factory::server ();
1266 const int bufsize
= 512;
1268 ACE_OS::strcpy (buf
, "consumer_");
1269 ACE_OS::strcat (buf
, name
);
1270 RtecScheduler::handle_t rt_info
=
1271 server
->create (buf
);
1273 // The execution times are set to reasonable values, but
1274 // actually they are changed on the real execution, i.e. we
1275 // lie to the scheduler to obtain right priorities; but we
1276 // don't care if the set is schedulable.
1277 ACE_Time_Value
tv (0, 2000);
1278 TimeBase::TimeT time
;
1279 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
1280 ACE_DEBUG ((LM_DEBUG
, "activate \"%C\"\n", buf
));
1281 server
->set (rt_info
,
1282 RtecScheduler::VERY_HIGH_CRITICALITY
,
1285 RtecScheduler::VERY_LOW_IMPORTANCE
,
1288 RtecScheduler::OPERATION
);
1290 // Also connect our consumer for timeout events from the EC.
1291 int interval
= rate
/ 10;
1292 ACE_Time_Value
tv_timeout (interval
/ ACE_ONE_SECOND_IN_USECS
,
1293 interval
% ACE_ONE_SECOND_IN_USECS
);
1294 TimeBase::TimeT timeout
;
1295 ORBSVCS_Time::Time_Value_to_TimeT (timeout
, tv_timeout
);
1297 ACE_ConsumerQOS_Factory consumer_qos
;
1298 consumer_qos
.start_disjunction_group ();
1299 consumer_qos
.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT
,
1303 // = Connect as a consumer.
1304 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
1305 ec
->for_consumers ();
1307 this->supplier_proxy_
=
1308 consumer_admin
->obtain_push_supplier ();
1310 RtecEventComm::PushConsumer_var cref
=
1311 this->consumer_
._this ();
1313 this->supplier_proxy_
->connect_push_consumer (
1315 consumer_qos
.get_ConsumerQOS ());
1319 Test_Supplier::push (const RtecEventComm::EventSet
& events
)
1322 const int bufsize
= 128;
1324 ACE_OS::sprintf (buf
, "Supplier %d receives event in thread: ",
1325 this->supplier_id_
);
1326 print_priority_info (buf
);
1329 if (events
.length () == 0 || this->message_count_
< 0)
1331 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
1335 RtecEventComm::EventSet
sent (events
.length ());
1336 sent
.length (events
.length ());
1338 for (u_int i
= 0; i
< events
.length (); ++i
)
1340 const RtecEventComm::Event
& e
= events
[i
];
1341 if (e
.header
.type
!= ACE_ES_EVENT_INTERVAL_TIMEOUT
)
1344 // ACE_DEBUG ((LM_DEBUG, "Test_Supplier - timeout (%t)\n"));
1346 RtecEventComm::Event
& s
= sent
[i
];
1347 s
.header
.source
= this->supplier_id_
;
1350 ACE_hrtime_t t
= ACE_OS::gethrtime ();
1351 ORBSVCS_Time::hrtime_to_TimeT (s
.header
.creation_time
, t
);
1353 this->message_count_
--;
1355 if (this->message_count_
< 0)
1357 this->test_
->shutdown_supplier (this->cookie_
,
1358 this->consumer_proxy_
.in ());
1360 if (this->message_count_
% 2 == 0)
1362 // Generate an A event...
1363 s
.header
.type
= this->event_a_
;
1367 s
.header
.type
= this->event_b_
;
1370 this->test_
->push_supplier (this->cookie_
,
1371 this->consumer_proxy_
.in (),
1376 Test_Supplier::disconnect_push_supplier ()
1378 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
1381 this->supplier_proxy_
->disconnect_push_supplier ();
1385 Test_Supplier::disconnect_push_consumer ()
1389 int Test_Supplier::supplier_id () const
1391 return this->supplier_id_
;
1394 Test_Consumer::Test_Consumer (Test_ECG
*test
,
1402 Test_Consumer::open (const char* name
,
1403 int event_a
, int event_b
,
1404 RtecEventChannelAdmin::EventChannel_ptr ec
)
1406 RtecScheduler::Scheduler_ptr server
=
1407 ACE_Scheduler_Factory::server ();
1409 RtecScheduler::handle_t rt_info
=
1410 server
->create (name
);
1412 // The worst case execution time is far less than 2
1413 // milliseconds, but that is a safe estimate....
1414 ACE_Time_Value
tv (0, 2000);
1415 TimeBase::TimeT time
;
1416 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
1417 ACE_DEBUG ((LM_DEBUG
, "register consumer \"%C\"\n", name
));
1418 server
->set (rt_info
,
1419 RtecScheduler::VERY_HIGH_CRITICALITY
,
1422 RtecScheduler::VERY_LOW_IMPORTANCE
,
1425 RtecScheduler::OPERATION
);
1427 ACE_ConsumerQOS_Factory qos
;
1428 qos
.start_disjunction_group ();
1429 qos
.insert_type (ACE_ES_EVENT_SHUTDOWN
, rt_info
);
1430 qos
.insert_type (event_a
, rt_info
);
1431 qos
.insert_type (event_b
, rt_info
);
1433 // = Connect as a consumer.
1434 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
1435 ec
->for_consumers ();
1437 this->supplier_proxy_
=
1438 consumer_admin
->obtain_push_supplier ();
1440 RtecEventComm::PushConsumer_var objref
= this->_this ();
1442 this->supplier_proxy_
->connect_push_consumer (objref
.in (),
1443 qos
.get_ConsumerQOS ());
1447 Test_Consumer::close ()
1449 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
1452 RtecEventChannelAdmin::ProxyPushSupplier_var proxy
=
1453 this->supplier_proxy_
._retn ();
1454 proxy
->disconnect_push_supplier ();
1458 Test_Consumer::push (const RtecEventComm::EventSet
& events
)
1460 ACE_hrtime_t arrival
= ACE_OS::gethrtime ();
1461 this->test_
->push_consumer (this->cookie_
, arrival
, events
);
1465 Test_Consumer::disconnect_push_consumer ()
1470 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
1474 // Dynamically allocate the Test_ECG instance so that we don't have
1475 // to worry about running out of stack space if it's large.
1476 ACE_NEW_RETURN (test
,
1480 const int status
= test
->run (argc
, argv
);