Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Multiple / EC_Multiple.cpp
blobe9481bcd9515fb23cc7ea9aa5295e5892d434480
1 #include "EC_Multiple.h"
3 #include "Scheduler_Runtime1.h"
4 #include "Scheduler_Runtime2.h"
5 #include "Scheduler_Runtime_Dynamic.h" /* infos_3 */
7 #include "orbsvcs/Event_Utilities.h"
8 #include "orbsvcs/Event_Service_Constants.h"
9 #include "orbsvcs/Scheduler_Factory.h"
10 #include "orbsvcs/Time_Utilities.h"
11 #include "orbsvcs/RtecEventChannelAdminC.h"
12 #include "orbsvcs/Sched/Config_Scheduler.h"
13 #include "orbsvcs/Event/EC_Event_Channel.h"
14 #include "orbsvcs/Runtime_Scheduler.h"
16 #include "tao/ORB_Core.h"
18 #include "ace/Get_Opt.h"
19 #include "ace/Sched_Params.h"
20 #include "ace/OS_NS_errno.h"
21 #include "ace/OS_NS_strings.h"
22 #include <memory>
24 Test_ECG::Test_ECG ()
25 : lcl_name_ ("Test_ECG"),
26 rmt_name_ (""),
27 scheduling_type_ (Test_ECG::ss_runtime),
28 consumer_disconnects_ (0),
29 supplier_disconnects_ (0),
30 short_circuit_ (0),
31 hp_suppliers_ (1),
32 hp_consumers_ (1),
33 hp_workload_ (10),
34 hp_interval_ (25000),
35 hp_message_count_ (200),
36 hps_event_a_ (ACE_ES_EVENT_UNDEFINED),
37 hps_event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
38 hpc_event_a_ (ACE_ES_EVENT_UNDEFINED),
39 hpc_event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
40 lp_suppliers_ (0),
41 lp_consumers_ (0),
42 lp_workload_ (0),
43 lp_interval_ (100000),
44 lp_message_count_ (50),
45 lps_event_a_ (0),
46 lps_event_b_ (0),
47 lpc_event_a_ (0),
48 lpc_event_b_ (0),
49 schedule_file_ (0),
50 pid_file_name_ (0),
51 ready_ (0),
52 ready_cnd_ (ready_mtx_)
56 void
57 print_priority_info (const char *const name)
59 #if defined (ACE_HAS_PTHREADS)
60 #if defined (ACE_HAS_PTHREADS)
61 struct sched_param param;
62 int policy, status;
64 if ((status = pthread_getschedparam (pthread_self (), &policy,
65 &param)) == 0) {
66 ACE_DEBUG ((LM_DEBUG,
67 "%C (%lu|%u); policy is %d, priority is %d\n",
68 name,
69 ACE_OS::getpid (),
71 pthread_self (),
72 policy, param.sched_priority ));
73 } else {
74 ACE_DEBUG ((LM_DEBUG,"pthread_getschedparam failed: %d\n", status));
76 #else
77 ACE_UNUSED_ARG (name);
78 #endif /* ACE_HAS_PTHREADS */
80 #ifdef sun
81 // Find what scheduling class the thread's LWP is in.
82 //FUZZ: disable check_for_lack_ACE_OS
83 ACE_Sched_Params sched_params (ACE_SCHED_OTHER, 0);
84 //FUZZ: enable check_for_lack_ACE_OS
85 if (ACE_OS::lwp_getparams (sched_params) == -1)
87 ACE_OS::perror ("ACE_OS::lwp_getparams");
88 return;
90 else if (sched_params.policy () == ACE_SCHED_FIFO ||
91 sched_params.policy () == ACE_SCHED_RR)
93 // This thread's LWP is in the RT class.
94 ACE_DEBUG ((LM_DEBUG,
95 "RT class; priority: %d, quantum: %u msec\n",
96 sched_params.priority (),
97 sched_params.quantum ().msec ()));
99 else
101 ACE_DEBUG ((LM_DEBUG,
102 "TS class; priority: %d\n",
103 sched_params.priority ()));
105 #endif /* sun */
106 #else
107 ACE_UNUSED_ARG (name);
108 #endif /* ACE_HAS_PTHREADS */
112 Test_ECG::run (int argc, ACE_TCHAR* argv[])
116 CORBA::ORB_var orb =
117 CORBA::ORB_init (argc, argv);
119 CORBA::Object_var poa_object =
120 orb->resolve_initial_references("RootPOA");
122 if (CORBA::is_nil (poa_object.in ()))
123 ACE_ERROR_RETURN ((LM_ERROR,
124 " (%P|%t) Unable to initialize the POA.\n"),
127 PortableServer::POA_var root_poa =
128 PortableServer::POA::_narrow (poa_object.in ());
130 PortableServer::POAManager_var poa_manager =
131 root_poa->the_POAManager ();
133 poa_manager->activate ();
135 if (this->parse_args (argc, argv))
136 return 1;
138 ACE_DEBUG ((LM_DEBUG,
139 "Execution parameters:\n"
140 " lcl name = <%C>\n"
141 " rmt name = <%C>\n"
142 " scheduler type = <%d>\n"
143 " consumer disconnects = <%d>\n"
144 " supplier disconnects = <%d>\n"
145 " short circuit EC = <%d>\n"
146 " HP suppliers = <%d>\n"
147 " HP consumers = <%d>\n"
148 " HP workload = <%d> (iterations)\n"
149 " HP interval between events = <%d> (usecs)\n"
150 " HP message count = <%d>\n"
151 " HP supplier Event A = <%d>\n"
152 " HP supplier Event B = <%d>\n"
153 " HP consumer Event A = <%d>\n"
154 " HP consumer Event B = <%d>\n"
155 " LP suppliers = <%d>\n"
156 " LP consumers = <%d>\n"
157 " LP workload = <%d> (iterations)\n"
158 " LP interval between events = <%d> (usecs)\n"
159 " LP message count = <%d>\n"
160 " LP supplier Event A = <%d>\n"
161 " LP supplier Event B = <%d>\n"
162 " LP consumer Event A = <%d>\n"
163 " LP consumer Event B = <%d>\n"
164 " schedule_file = <%s>\n"
165 " pid file name = <%s>\n",
166 this->lcl_name_.length () ? this->lcl_name_.c_str () : "nil",
167 this->rmt_name_.length () ? this->rmt_name_.c_str () : "nil",
168 this->scheduling_type_,
169 this->consumer_disconnects_,
170 this->supplier_disconnects_,
171 this->short_circuit_,
173 this->hp_suppliers_,
174 this->hp_consumers_,
175 this->hp_workload_,
176 this->hp_interval_,
177 this->hp_message_count_,
178 this->hps_event_a_,
179 this->hps_event_b_,
180 this->hpc_event_a_,
181 this->hpc_event_b_,
183 this->lp_suppliers_,
184 this->lp_consumers_,
185 this->lp_workload_,
186 this->lp_interval_,
187 this->lp_message_count_,
188 this->lps_event_a_,
189 this->lps_event_b_,
190 this->lpc_event_a_,
191 this->lpc_event_b_,
193 this->schedule_file_?this->schedule_file_:ACE_TEXT("nil"),
194 this->pid_file_name_?this->pid_file_name_:ACE_TEXT("nil")));
196 print_priority_info ("Test_ECG::run (Main)");
198 if (this->pid_file_name_ != 0)
200 FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w");
201 if (pid != 0)
203 ACE_OS::fprintf (pid, "%ld\n",
204 static_cast<long> (ACE_OS::getpid ()));
205 ACE_OS::fclose (pid);
209 int min_priority =
210 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO);
211 // Enable FIFO scheduling
213 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
214 min_priority,
215 ACE_SCOPE_PROCESS)) != 0)
217 if (ACE_OS::last_error () == EPERM)
218 ACE_DEBUG ((LM_DEBUG,
219 "%s: user is not superuser, "
220 "so remain in time-sharing class\n", argv[0]));
221 else
222 ACE_ERROR ((LM_ERROR,
223 "%s: ACE_OS::sched_params failed\n", argv[0]));
226 if (ACE_OS::thr_setprio (min_priority) == -1)
228 ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed\n"));
231 print_priority_info ("Test_ECG::run (Main after thr_setprio)");
233 CORBA::Object_var naming_obj =
234 orb->resolve_initial_references ("NameService");
236 if (CORBA::is_nil (naming_obj.in ()))
237 ACE_ERROR_RETURN ((LM_ERROR,
238 " (%P|%t) Unable to get the Naming Service.\n"),
241 CosNaming::NamingContext_var naming_context =
242 CosNaming::NamingContext::_narrow (naming_obj.in ());
244 std::unique_ptr<POA_RtecScheduler::Scheduler> scheduler_impl;
245 RtecScheduler::Scheduler_var scheduler;
247 switch (this->scheduling_type_)
249 default:
250 ACE_ERROR ((LM_WARNING, "Unknown scheduling type %d\n",
251 this->scheduling_type_));
252 ACE_FALLTHROUGH;
253 case Test_ECG::ss_global:
254 break;
256 case Test_ECG::ss_local:
258 std::unique_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl (new ACE_Config_Scheduler);
259 scheduler_impl = std::move(auto_scheduler_impl);
261 if (scheduler_impl.get () == 0)
262 return -1;
263 scheduler = scheduler_impl->_this ();
264 break;
266 case Test_ECG::ss_runtime:
267 if (ACE_OS::strcmp (this->lcl_name_.c_str (), "ECM1") == 0)
269 // This setups Scheduler_Factory to use the runtime version
270 ACE_Scheduler_Factory::use_runtime (
271 sizeof (runtime_configs_1)/sizeof (runtime_configs_1[0]),
272 runtime_configs_1,
273 sizeof (runtime_infos_1)/sizeof (runtime_infos_1[0]),
274 runtime_infos_1);
276 std::unique_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl
277 (new ACE_Runtime_Scheduler (runtime_configs_1_size,
278 runtime_configs_1,
279 runtime_infos_1_size,
280 runtime_infos_1));
281 scheduler_impl = std::move(auto_scheduler_impl);
283 if (scheduler_impl.get () == 0)
284 return -1;
285 scheduler = scheduler_impl->_this ();
287 else if (ACE_OS::strcmp (this->lcl_name_.c_str (), "ECM2") == 0)
289 // This setups Scheduler_Factory to use the runtime version
290 ACE_Scheduler_Factory::use_runtime (
291 sizeof (runtime_configs_2)/sizeof (runtime_configs_2[0]),
292 runtime_configs_2,
293 sizeof (runtime_infos_2)/sizeof (runtime_infos_2[0]),
294 runtime_infos_2);
296 std::unique_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl
297 (new ACE_Runtime_Scheduler (runtime_configs_2_size,
298 runtime_configs_2,
299 runtime_infos_2_size,
300 runtime_infos_2));
301 scheduler_impl = std::move(auto_scheduler_impl);
303 if (scheduler_impl.get () == 0)
304 return -1;
305 scheduler = scheduler_impl->_this ();
307 else if (ACE_OS::strcmp (this->lcl_name_.c_str (), "ECM3") == 0)
309 // This setups Scheduler_Factory to use the runtime version
310 ACE_Scheduler_Factory::use_runtime (
311 sizeof (runtime_configs_3)/sizeof (runtime_configs_3[0]),
312 runtime_configs_3,
313 sizeof (runtime_infos_3)/sizeof (runtime_infos_3[0]),
314 runtime_infos_3);
316 std::unique_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl
317 (new ACE_Runtime_Scheduler (runtime_configs_3_size,
318 runtime_configs_3,
319 runtime_infos_3_size,
320 runtime_infos_3));
321 scheduler_impl = std::move(auto_scheduler_impl);
323 if (scheduler_impl.get () == 0)
324 return -1;
325 scheduler = scheduler_impl->_this ();
327 else
329 ACE_ERROR ((LM_WARNING,
330 "Unknown name <%C> defaulting to "
331 "config scheduler\n", this->lcl_name_.c_str ()));
333 std::unique_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl (new ACE_Config_Scheduler);
334 scheduler_impl = std::move(auto_scheduler_impl);
336 if (scheduler_impl.get () == 0)
337 return -1;
338 scheduler = scheduler_impl->_this ();
340 break;
343 // We use this buffer to generate the names of the local
344 // services.
345 const int bufsize = 512;
346 char buf[bufsize];
348 // Register the scheduler with the naming service.
349 switch (this->scheduling_type_)
351 default:
352 case Test_ECG::ss_global:
353 break;
355 case Test_ECG::ss_local:
356 case Test_ECG::ss_runtime:
358 CORBA::String_var str =
359 orb->object_to_string (scheduler.in ());
360 ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%C>\n",
361 str.in ()));
363 ACE_OS::strcpy (buf, "ScheduleService@");
364 ACE_OS::strcat (buf, this->lcl_name_.c_str ());
366 // Register the servant with the Naming Context....
367 CosNaming::Name schedule_name (1);
368 schedule_name.length (1);
369 schedule_name[0].id = CORBA::string_dup (buf);
370 naming_context->bind (schedule_name, scheduler.in ());
372 if (ACE_Scheduler_Factory::use_config (naming_context.in (),
373 buf) == -1)
374 return -1;
376 break;
379 // Create the EventService implementation, but don't start its
380 // internal threads.
381 TAO_EC_Event_Channel_Attributes attr (root_poa.in (),
382 root_poa.in ());
383 TAO_EC_Event_Channel ec_impl (attr);
385 // Register Event_Service with the Naming Service.
386 RtecEventChannelAdmin::EventChannel_var ec =
387 ec_impl._this ();
389 CORBA::String_var str =
390 orb->object_to_string (ec.in ());
392 ACE_OS::sleep (5);
393 ACE_DEBUG ((LM_DEBUG, "The (local) EC IOR is <%C>\n", str.in ()));
395 ACE_OS::strcpy (buf, "EventChannel@");
396 ACE_OS::strcat (buf, this->lcl_name_.c_str ());
398 CosNaming::Name channel_name (1);
399 channel_name.length (1);
400 channel_name[0].id = CORBA::string_dup (buf);
401 naming_context->bind (channel_name, ec.in ());
403 ACE_DEBUG ((LM_DEBUG, "waiting to start\n"));
405 ACE_Time_Value tv (15, 0);
407 if (this->rmt_name_.length () != 0)
409 orb->run (&tv);
412 ACE_DEBUG ((LM_DEBUG, "starting....\n"));
414 RtecEventChannelAdmin::EventChannel_var local_ec =
415 this->get_ec (naming_context.in (),
416 this->lcl_name_.c_str ());
418 ACE_DEBUG ((LM_DEBUG, "located local EC\n"));
420 for (int sd = 0; sd < this->supplier_disconnects_; ++sd)
422 this->connect_suppliers (local_ec.in ());
423 this->disconnect_suppliers ();
424 ACE_OS::sleep (5);
425 ACE_DEBUG ((LM_DEBUG, "Supplier disconnection %d\n", sd));
428 this->connect_suppliers (local_ec.in ());
430 ACE_DEBUG ((LM_DEBUG, "connected supplier\n"));
432 RtecEventChannelAdmin::Observer_Handle observer_handle = 0;
433 if (this->rmt_name_.length () != 0)
435 tv.set (5, 0);
436 orb->run (&tv);
438 RtecEventChannelAdmin::EventChannel_var remote_ec =
439 this->get_ec (naming_context.in (),
440 this->rmt_name_.c_str ());
441 ACE_DEBUG ((LM_DEBUG, "located remote EC\n"));
443 CosNaming::Name rsch_name (1);
444 rsch_name.length (1);
445 ACE_OS::strcpy (buf, "ScheduleService");
446 if (this->scheduling_type_ != Test_ECG::ss_global)
448 ACE_OS::strcat (buf, "@");
449 ACE_OS::strcat (buf, this->rmt_name_.c_str ());
451 rsch_name[0].id = CORBA::string_dup (buf);
452 CORBA::Object_var tmpobj =
453 naming_context->resolve (rsch_name);
455 RtecScheduler::Scheduler_var remote_sch =
456 RtecScheduler::Scheduler::_narrow (tmpobj.in ());
458 this->connect_ecg (local_ec.in (),
459 remote_ec.in (),
460 remote_sch.in ());
462 ACE_DEBUG ((LM_DEBUG, "connected proxy\n"));
464 tv.set (5, 0);
465 orb->run (&tv);
467 RtecEventChannelAdmin::Observer_ptr observer =
468 this->ecg_._this ();
469 observer_handle = ec_impl.append_observer (observer);
472 for (int cd = 0; cd < this->consumer_disconnects_; ++cd)
474 this->connect_consumers (local_ec.in ());
475 this->disconnect_consumers ();
476 ACE_OS::sleep (5);
477 ACE_DEBUG ((LM_DEBUG, "Consumer disconnection %d\n", cd));
479 this->connect_consumers (local_ec.in ());
481 ACE_DEBUG ((LM_DEBUG, "connected consumer\n"));
483 this->activate_suppliers (local_ec.in ());
485 ACE_DEBUG ((LM_DEBUG, "suppliers are active\n"));
487 this->running_suppliers_ = this->hp_suppliers_ + this->lp_suppliers_;
489 // Acquire the mutex for the ready mutex, blocking any supplier
490 // that may start after this point.
491 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ready_mon, this->ready_mtx_, 1);
492 this->ready_ = 1;
493 this->test_start_ = ACE_OS::gethrtime ();
494 this->ready_cnd_.broadcast ();
495 ready_mon.release ();
497 ACE_DEBUG ((LM_DEBUG, "activate the EC\n"));
499 if (this->rmt_name_.length () != 0)
501 ec_impl.remove_observer (observer_handle);
504 // Create the EC internal threads
505 ec_impl.activate ();
507 ACE_DEBUG ((LM_DEBUG, "running the test\n"));
508 orb->run ();
510 this->test_stop_ = ACE_OS::gethrtime ();
512 ACE_DEBUG ((LM_DEBUG, "shutdown the EC\n"));
513 ec_impl.shutdown ();
515 this->dump_results ();
517 if (this->schedule_file_ != 0)
519 RtecScheduler::RT_Info_Set_var infos;
520 RtecScheduler::Dependency_Set_var deps;
521 RtecScheduler::Config_Info_Set_var configs;
522 RtecScheduler::Scheduling_Anomaly_Set_var anomalies;
524 ACE_Scheduler_Factory::server ()->compute_scheduling
525 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
526 ACE_SCOPE_THREAD),
527 ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
528 ACE_SCOPE_THREAD),
529 infos.out (), deps.out (),
530 configs.out (), anomalies.out ());
532 ACE_Scheduler_Factory::dump_schedule (infos.in (),
533 deps.in (),
534 configs.in (),
535 anomalies.in (),
536 this->schedule_file_);
539 naming_context->unbind (channel_name);
541 if (this->rmt_name_.length () != 0)
543 this->ecg_.close ();
544 this->ecg_.shutdown ();
547 this->disconnect_consumers ();
548 this->disconnect_suppliers ();
550 ACE_DEBUG ((LM_DEBUG, "shutdown grace period\n"));
551 tv.set (5, 0);
552 orb->run (&tv);
554 catch (const CORBA::SystemException& sys_ex)
556 sys_ex._tao_print_exception ("SYS_EX");
558 catch (const CORBA::Exception& ex)
560 ex._tao_print_exception ("NON SYS EX");
562 return 0;
565 RtecEventChannelAdmin::EventChannel_ptr
566 Test_ECG::get_ec (CosNaming::NamingContext_ptr naming_context,
567 const char* process_name)
569 const int bufsize = 512;
570 char buf[bufsize];
571 ACE_OS::strcpy (buf, "EventChannel@");
572 ACE_OS::strcat (buf, process_name);
574 CosNaming::Name channel_name (1);
575 channel_name.length (1);
576 channel_name[0].id = CORBA::string_dup (buf);
578 CORBA::Object_var ec_ptr =
579 naming_context->resolve (channel_name);
580 if (CORBA::is_nil (ec_ptr.in ()))
581 return RtecEventChannelAdmin::EventChannel::_nil ();
583 return RtecEventChannelAdmin::EventChannel::_narrow (ec_ptr.in ());
586 void
587 Test_ECG::disconnect_suppliers ()
589 for (int i = 0; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
591 this->suppliers_[i]->close ();
595 void
596 Test_ECG::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec)
598 int i;
599 for (i = 0; i < this->hp_suppliers_; ++i)
601 // Limit the number of messages sent by each supplier
602 int mc = this->hp_message_count_ / this->hp_suppliers_;
603 if (mc == 0)
604 mc = 1;
606 char buf[BUFSIZ];
607 ACE_OS::sprintf (buf, "hp_supplier_%02d@%s", i, this->lcl_name_.c_str ());
609 ACE_NEW (this->suppliers_[i],
610 Test_Supplier (this, this->suppliers_ + i));
612 this->suppliers_[i]->open (buf,
613 this->hps_event_a_,
614 this->hps_event_b_,
616 this->hp_interval_ * 10,
617 local_ec);
620 for (; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
622 // Limit the number of messages sent by each supplier
623 int mc = this->lp_message_count_ / this->lp_suppliers_;
624 if (mc == 0)
625 mc = 1;
627 char buf[BUFSIZ];
628 ACE_OS::sprintf (buf, "lp_supplier_%02d@%s",
629 i - this->hp_suppliers_, this->lcl_name_.c_str ());
631 ACE_NEW (this->suppliers_[i],
632 Test_Supplier (this, this->suppliers_ + i));
634 this->suppliers_[i]->open (buf,
635 this->lps_event_a_,
636 this->lps_event_b_,
638 this->lp_interval_ * 10,
639 local_ec);
643 void
644 Test_ECG::disconnect_consumers ()
646 for (int i = 0; i < this->hp_consumers_ + this->lp_consumers_; ++i)
648 this->consumers_[i]->close ();
652 void
653 Test_ECG::activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec)
657 int i;
658 for (i = 0; i < this->hp_suppliers_; ++i)
660 // Limit the number of messages sent by each supplier
661 int mc = this->hp_message_count_ / this->hp_suppliers_;
662 if (mc == 0)
663 mc = 1;
665 char buf[BUFSIZ];
666 ACE_OS::sprintf (buf, "hp_supplier_%02d@%s", i, this->lcl_name_.c_str ());
668 this->suppliers_[i]->activate (buf,
669 this->hp_interval_ * 10,
670 local_ec);
673 for (; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
675 // Limit the number of messages sent by each supplier
676 int mc = this->lp_message_count_ / this->lp_suppliers_;
677 if (mc == 0)
678 mc = 1;
680 char buf[BUFSIZ];
681 ACE_OS::sprintf (buf, "lp_supplier_%02d@%s",
682 i - this->hp_suppliers_, this->lcl_name_.c_str ());
684 this->suppliers_[i]->activate (buf,
685 this->lp_interval_ * 10,
686 local_ec);
689 catch (const CORBA::Exception&)
691 throw;
695 void
696 Test_ECG::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec)
698 int i;
699 for (i = 0; i < this->hp_consumers_; ++i)
701 char buf[BUFSIZ];
702 ACE_OS::sprintf (buf, "hp_consumer_%02d@%s", i, this->lcl_name_.c_str ());
704 ACE_NEW (this->consumers_[i],
705 Test_Consumer (this, this->consumers_ + i));
707 this->consumers_[i]->open (buf,
708 this->hpc_event_a_,
709 this->hpc_event_b_,
710 local_ec);
711 this->stats_[i].total_time_ = 0;
712 this->stats_[i].lcl_count_ = 0;
713 this->stats_[i].rmt_count_ = 0;
716 for (; i < this->hp_consumers_ + this->lp_consumers_; ++i)
718 char buf[BUFSIZ];
719 ACE_OS::sprintf (buf, "lp_consumer_%02d@%s",
720 i - this->hp_consumers_, this->lcl_name_.c_str ());
722 ACE_NEW (this->consumers_[i],
723 Test_Consumer (this, this->consumers_ + i));
725 this->consumers_[i]->open (buf,
726 this->lpc_event_a_,
727 this->lpc_event_b_,
728 local_ec);
729 this->stats_[i].total_time_ = 0;
730 this->stats_[i].lcl_count_ = 0;
731 this->stats_[i].rmt_count_ = 0;
733 this->running_consumers_ = this->hp_consumers_ + this->lp_consumers_;
736 void
737 Test_ECG::connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec,
738 RtecEventChannelAdmin::EventChannel_ptr remote_ec,
739 RtecScheduler::Scheduler_ptr remote_sch)
741 RtecScheduler::Scheduler_ptr local_sch =
742 ACE_Scheduler_Factory::server ();
744 // ECG name.
745 const int bufsize = 512;
746 char ecg_name[bufsize];
747 ACE_OS::strcpy (ecg_name, "ecg_");
748 ACE_OS::strcat (ecg_name, this->lcl_name_.c_str ());
750 // We could use the same name on the local and remote scheduler,
751 // but that fails when using a global scheduler.
752 char rmt[BUFSIZ];
753 ACE_OS::strcpy (rmt, ecg_name);
754 ACE_OS::strcat (rmt, "@");
755 ACE_OS::strcat (rmt, this->rmt_name_.c_str ());
757 // We could use the same name on the local and remote scheduler,
758 // but that fails when using a global scheduler.
759 char lcl[bufsize];
760 ACE_OS::strcpy (lcl, ecg_name);
761 ACE_OS::strcat (lcl, "@");
762 ACE_OS::strcat (lcl, this->lcl_name_.c_str ());
764 this->ecg_.init (remote_ec, local_ec, remote_sch, local_sch,
765 rmt, lcl);
768 void
769 Test_ECG::push_supplier (void * /* cookie */,
770 RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
771 const RtecEventComm::EventSet &events)
773 this->wait_until_ready ();
774 // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events sent by supplier\n"));
775 // @@ TODO we could keep somekind of stats here...
776 if (!this->short_circuit_)
778 consumer->push (events);
780 else
782 int i = 0;
783 for (; i < this->hp_consumers_; ++i)
785 this->consumers_[i]->push (events);
787 for (; i < this->hp_consumers_ + this->lp_consumers_; ++i)
789 this->consumers_[i]->push (events);
794 void
795 Test_ECG::push_consumer (void *consumer_cookie,
796 ACE_hrtime_t arrival,
797 const RtecEventComm::EventSet &events)
799 int ID =
800 (reinterpret_cast<Test_Consumer**> (consumer_cookie)
801 - this->consumers_);
803 // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events received by consumer %d\n", ID));
805 if (events.length () == 0)
807 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
808 return;
811 // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
813 #if 0
814 const int bufsize = 128;
815 char buf[bufsize];
816 ACE_OS::sprintf (buf, "Consumer %d receives event in thread: ", ID);
817 print_priority_info (buf);
818 #endif
820 for (u_int i = 0; i < events.length (); ++i)
822 const RtecEventComm::Event& e = events[i];
824 if (e.header.type == ACE_ES_EVENT_SHUTDOWN)
826 this->shutdown_consumer (ID);
827 continue;
830 ACE_hrtime_t s;
831 ORBSVCS_Time::TimeT_to_hrtime (s, e.header.creation_time);
832 ACE_hrtime_t nsec = arrival - s;
833 if (this->local_source (e.header.source))
835 int& count = this->stats_[ID].lcl_count_;
837 this->stats_[ID].lcl_latency_[count] = nsec;
838 int workload = this->hp_workload_;
839 int interval = this->hp_interval_;
840 if (ID >= this->hp_consumers_)
842 workload = this->lp_workload_;
843 interval = this->lp_interval_;
846 for (int j = 0; j < workload; ++j)
848 // Eat a little CPU so the Utilization test can measure the
849 // consumed time....
850 /* takes about 40.2 usecs on a 167 MHz Ultra2 */
851 u_long n = 1279UL;
852 ACE::is_prime (n, 2, n / 2);
854 // Increment the elapsed time on this consumer.
855 ACE_hrtime_t now = ACE_OS::gethrtime ();
856 this->stats_[ID].total_time_ += now - arrival;
857 this->stats_[ID].end_[count] = now;
859 // We estimate our laxity based on the event creation
860 // time... it may not be very precise, but will do; other
861 // strategies include:
862 // + Keep track of the "current frame", then then deadline
863 // is the end of the frame.
864 // + Use the start of the test to keep the current frame.
865 // + Use the last execution.
867 CORBA::ULong tmp = ACE_U64_TO_U32 (s - now);
868 this->stats_[ID].laxity_[count] = 1 + tmp/1000.0F/interval;
869 count++;
871 else
873 int& count = this->stats_[ID].rmt_count_;
874 this->stats_[ID].rmt_latency_[count] = nsec;
875 count++;
880 void
881 Test_ECG::wait_until_ready ()
883 ACE_GUARD (TAO_SYNCH_MUTEX, ready_mon, this->ready_mtx_);
884 while (!this->ready_)
885 this->ready_cnd_.wait ();
888 void
889 Test_ECG::shutdown_supplier (void* /* supplier_cookie */,
890 RtecEventComm::PushConsumer_ptr consumer)
892 this->running_suppliers_--;
893 if (this->running_suppliers_ != 0)
894 return;
896 // We propagate a shutdown event through the system...
897 //FUZZ: disable check_for_lack_ACE_OS
898 RtecEventComm::EventSet shutdown (1);
899 //FUZZ: enable check_for_lack_ACE_OS
900 shutdown.length (1);
901 RtecEventComm::Event& s = shutdown[0];
903 s.header.source = 0;
904 s.header.ttl = 1;
906 ACE_hrtime_t t = ACE_OS::gethrtime ();
907 ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
908 s.header.type = ACE_ES_EVENT_SHUTDOWN;
909 consumer->push (shutdown);
912 void
913 Test_ECG::shutdown_consumer (int id)
915 ACE_DEBUG ((LM_DEBUG, "Shutdown consumer %d\n", id));
916 this->running_consumers_--;
918 if (this->running_consumers_ == 0)
920 if (TAO_ORB_Core_instance ()->orb () == 0)
922 ACE_ERROR ((LM_ERROR,
923 "(%P|%t) Test_ECG::shutdown_consumer, "
924 "ORB instance is 0\n"));
926 else
928 TAO_ORB_Core_instance ()->orb ()->shutdown ();
934 Test_ECG::shutdown ()
936 ACE_DEBUG ((LM_DEBUG, "Shutting down the multiple EC test\n"));
938 if (this->rmt_name_.length () != 0)
940 this->ecg_.shutdown ();
943 TAO_ORB_Core_instance ()->orb ()->shutdown ();
944 return 0;
947 void
948 Test_ECG::dump_results ()
950 const int bufsize = 512;
951 ACE_TCHAR buf[bufsize];
953 int i;
954 for (i = 0; i < this->hp_consumers_; ++i)
956 ACE_OS::sprintf (buf, ACE_TEXT("HP%02d"), i);
957 this->dump_results (buf, this->stats_[i]);
959 for (i = 0; i < this->lp_consumers_; ++i)
961 ACE_OS::sprintf (buf, ACE_TEXT("LP%02d"), i);
962 this->dump_results (buf, this->stats_[i + this->hp_consumers_]);
964 CORBA::ULong tmp = ACE_U64_TO_U32 (this->test_stop_ - this->test_start_);
965 double usec = tmp / 1000.0;
966 ACE_DEBUG ((LM_DEBUG, "Time[TOTAL]: %.3f\n", usec));
969 void
970 Test_ECG::dump_results (const ACE_TCHAR* name, Stats& stats)
972 // @@ We are reporting the information without specifics about
973 double usec = ACE_U64_TO_U32 (stats.total_time_) / 1000.0;
974 ACE_DEBUG ((LM_DEBUG, "Time[LCL,%s]: %.3f\n", name, usec));
975 int i;
976 for (i = 1; i < stats.lcl_count_ - 1; ++i)
978 usec = ACE_U64_TO_U32 (stats.lcl_latency_[i]) / 1000.0;
979 ACE_DEBUG ((LM_DEBUG, "Latency[LCL,%s]: %.3f\n", name, usec));
981 double percent = stats.laxity_[i] * 100.0;
982 ACE_DEBUG ((LM_DEBUG, "Laxity[LCL,%s]: %.3f\n", name, percent));
984 usec = ACE_U64_TO_U32 (stats.end_[i] - this->test_start_) / 1000.0;
985 ACE_DEBUG ((LM_DEBUG, "Completion[LCL,%s]: %.3f\n", name, usec));
987 for (i = 1; i < stats.rmt_count_ - 1; ++i)
989 double usec = ACE_U64_TO_U32 (stats.rmt_latency_[i]) / 1000.0;
990 ACE_DEBUG ((LM_DEBUG, "Latency[RMT,%s]: %.3f\n", name, usec));
995 Test_ECG::local_source (RtecEventComm::EventSourceID id) const
997 for (int i = 0; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
999 if (this->suppliers_[i]->supplier_id () == id)
1000 return 1;
1002 return 0;
1006 Test_ECG::parse_args (int argc, ACE_TCHAR *argv [])
1008 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("l:r:s:i:xh:w:p:d:"));
1009 int opt;
1011 while ((opt = get_opt ()) != EOF)
1013 switch (opt)
1015 case 'l':
1016 this->lcl_name_ = ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ());
1017 break;
1019 case 'r':
1020 this->rmt_name_ = ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ());
1021 break;
1023 case 's':
1024 if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT("global")) == 0)
1026 this->scheduling_type_ = Test_ECG::ss_global;
1028 else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT("local")) == 0)
1030 this->scheduling_type_ = Test_ECG::ss_local;
1032 else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT("runtime")) == 0)
1034 this->scheduling_type_ = Test_ECG::ss_runtime;
1036 else
1038 ACE_DEBUG ((LM_DEBUG,
1039 "Unknown scheduling type <%s> "
1040 "defaulting to local\n",
1041 get_opt.opt_arg ()));
1042 this->scheduling_type_ = Test_ECG::ss_local;
1044 break;
1046 case 'x':
1047 this->short_circuit_ = 1;
1048 break;
1050 case 'i':
1052 char* aux = 0;
1053 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
1054 this->consumer_disconnects_ = ACE_OS::atoi (arg);
1055 arg = ACE_OS::strtok_r (0, ",", &aux);
1056 this->supplier_disconnects_ = ACE_OS::atoi (arg);
1058 break;
1060 case 'h':
1062 char* aux = 0;
1063 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
1065 this->hp_suppliers_ = ACE_OS::atoi (arg);
1066 arg = ACE_OS::strtok_r (0, ",", &aux);
1067 this->hp_consumers_ = ACE_OS::atoi (arg);
1068 arg = ACE_OS::strtok_r (0, ",", &aux);
1069 this->hp_workload_ = ACE_OS::atoi (arg);
1070 arg = ACE_OS::strtok_r (0, ",", &aux);
1071 this->hp_interval_ = ACE_OS::atoi (arg);
1072 arg = ACE_OS::strtok_r (0, ",", &aux);
1073 this->hp_message_count_ = ACE_OS::atoi (arg);
1074 arg = ACE_OS::strtok_r (0, ",", &aux);
1075 this->hps_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1076 arg = ACE_OS::strtok_r (0, ",", &aux);
1077 this->hps_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1078 arg = ACE_OS::strtok_r (0, ",", &aux);
1079 this->hpc_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1080 arg = ACE_OS::strtok_r (0, ",", &aux);
1081 this->hpc_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1083 break;
1085 case 'w':
1087 char* aux = 0;
1088 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
1090 this->lp_suppliers_ = ACE_OS::atoi (arg);
1091 arg = ACE_OS::strtok_r (0, ",", &aux);
1092 this->lp_consumers_ = ACE_OS::atoi (arg);
1093 arg = ACE_OS::strtok_r (0, ",", &aux);
1094 this->lp_workload_ = ACE_OS::atoi (arg);
1095 arg = ACE_OS::strtok_r (0, ",", &aux);
1096 this->lp_interval_ = ACE_OS::atoi (arg);
1097 arg = ACE_OS::strtok_r (0, ",", &aux);
1098 this->lp_message_count_ = ACE_OS::atoi (arg);
1099 arg = ACE_OS::strtok_r (0, ",", &aux);
1100 this->lps_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1101 arg = ACE_OS::strtok_r (0, ",", &aux);
1102 this->lps_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1103 arg = ACE_OS::strtok_r (0, ",", &aux);
1104 this->lpc_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1105 arg = ACE_OS::strtok_r (0, ",", &aux);
1106 this->lpc_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1108 break;
1110 case 'p':
1111 this->pid_file_name_ = get_opt.opt_arg ();
1112 break;
1113 case 'd':
1114 this->schedule_file_ = get_opt.opt_arg ();
1115 break;
1117 case '?':
1118 default:
1119 ACE_DEBUG ((LM_DEBUG,
1120 "Usage: %s "
1121 "[ORB options] "
1122 "-l <local_name> "
1123 "-r <remote_name> "
1124 "-s <global|local|runtime> "
1125 "-i <consumer disc.,supplier disc.> "
1126 "-x (short circuit EC) "
1127 "-h <high priority args> "
1128 "-w <low priority args> "
1129 "-p <pid file name> "
1130 "-d <schedule file name> "
1131 "\n",
1132 argv[0]));
1133 return -1;
1137 if (this->hp_message_count_ < 0
1138 || this->hp_message_count_ >= Test_ECG::MAX_EVENTS)
1140 ACE_DEBUG ((LM_DEBUG,
1141 "%s: HP event count (%d) is out of range, "
1142 "reset to default (%d)\n",
1143 argv[0], this->lp_message_count_,
1144 160));
1145 this->hp_message_count_ = 160;
1148 if (this->lp_message_count_ < 0
1149 || this->lp_message_count_ >= Test_ECG::MAX_EVENTS)
1151 ACE_DEBUG ((LM_DEBUG,
1152 "%s: LP event count (%d) is out of range, "
1153 "reset to default (%d)\n",
1154 argv[0], this->lp_message_count_,
1155 4));
1156 this->lp_message_count_ = 4;
1159 if (this->hp_consumers_ <= 0
1160 || this->lp_consumers_ < 0
1161 || this->hp_consumers_ + this->lp_consumers_ >= Test_ECG::MAX_CONSUMERS
1162 || this->hp_suppliers_ <= 0
1163 || this->lp_suppliers_ < 0
1164 || this->hp_suppliers_ + this->lp_suppliers_ >= Test_ECG::MAX_SUPPLIERS)
1166 ACE_ERROR_RETURN ((LM_DEBUG,
1167 "%s: number of consumers (low: %d, high: %d) or "
1168 "suppliers (low: %d, high: %d) out of range\n",
1169 argv[0],
1170 lp_consumers_, hp_consumers_,
1171 lp_suppliers_, lp_suppliers_), -1);
1174 return 0;
1177 Test_Supplier::Test_Supplier (Test_ECG *test,
1178 void *cookie)
1179 : test_ (test),
1180 cookie_ (cookie),
1181 consumer_ (this)
1185 void
1186 Test_Supplier::open (const char* name,
1187 int event_a,
1188 int event_b,
1189 int message_count,
1190 const RtecScheduler::Period_t& rate,
1191 RtecEventChannelAdmin::EventChannel_ptr ec)
1193 this->event_a_ = event_a;
1194 this->event_b_ = event_b;
1195 this->message_count_ = message_count;
1197 RtecScheduler::Scheduler_ptr server =
1198 ACE_Scheduler_Factory::server ();
1200 RtecScheduler::handle_t rt_info =
1201 server->create (name);
1203 // The execution times are set to reasonable values, but
1204 // actually they are changed on the real execution, i.e. we
1205 // lie to the scheduler to obtain right priorities; but we
1206 // don't care if the set is schedulable.
1207 ACE_Time_Value tv (0, 2000);
1208 TimeBase::TimeT time;
1209 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
1210 ACE_DEBUG ((LM_DEBUG, "register supplier \"%C\"\n", name));
1211 server->set (rt_info,
1212 RtecScheduler::VERY_HIGH_CRITICALITY,
1213 time, time, time,
1214 rate,
1215 RtecScheduler::VERY_LOW_IMPORTANCE,
1216 time,
1218 RtecScheduler::OPERATION);
1220 this->supplier_id_ = ACE::crc32 (name);
1221 ACE_DEBUG ((LM_DEBUG, "ID for <%C> is %04.4x\n", name,
1222 this->supplier_id_));
1224 ACE_SupplierQOS_Factory qos;
1225 qos.insert (this->supplier_id_,
1226 this->event_a_,
1227 rt_info, 1);
1228 qos.insert (this->supplier_id_,
1229 this->event_b_,
1230 rt_info, 1);
1231 qos.insert (this->supplier_id_,
1232 ACE_ES_EVENT_SHUTDOWN,
1233 rt_info, 1);
1235 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
1236 ec->for_suppliers ();
1238 this->consumer_proxy_ =
1239 supplier_admin->obtain_push_consumer ();
1241 RtecEventComm::PushSupplier_var objref = this->_this ();
1243 this->consumer_proxy_->connect_push_supplier (objref.in (),
1244 qos.get_SupplierQOS ());
1247 void
1248 Test_Supplier::close ()
1250 if (CORBA::is_nil (this->consumer_proxy_.in ()))
1251 return;
1253 RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
1254 this->consumer_proxy_._retn ();
1255 proxy->disconnect_push_consumer ();
1258 void
1259 Test_Supplier::activate (const char* name,
1260 const RtecScheduler::Period_t& rate,
1261 RtecEventChannelAdmin::EventChannel_ptr ec)
1263 RtecScheduler::Scheduler_ptr server =
1264 ACE_Scheduler_Factory::server ();
1266 const int bufsize = 512;
1267 char buf[bufsize];
1268 ACE_OS::strcpy (buf, "consumer_");
1269 ACE_OS::strcat (buf, name);
1270 RtecScheduler::handle_t rt_info =
1271 server->create (buf);
1273 // The execution times are set to reasonable values, but
1274 // actually they are changed on the real execution, i.e. we
1275 // lie to the scheduler to obtain right priorities; but we
1276 // don't care if the set is schedulable.
1277 ACE_Time_Value tv (0, 2000);
1278 TimeBase::TimeT time;
1279 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
1280 ACE_DEBUG ((LM_DEBUG, "activate \"%C\"\n", buf));
1281 server->set (rt_info,
1282 RtecScheduler::VERY_HIGH_CRITICALITY,
1283 time, time, time,
1284 rate,
1285 RtecScheduler::VERY_LOW_IMPORTANCE,
1286 time,
1288 RtecScheduler::OPERATION);
1290 // Also connect our consumer for timeout events from the EC.
1291 int interval = rate / 10;
1292 ACE_Time_Value tv_timeout (interval / ACE_ONE_SECOND_IN_USECS,
1293 interval % ACE_ONE_SECOND_IN_USECS);
1294 TimeBase::TimeT timeout;
1295 ORBSVCS_Time::Time_Value_to_TimeT (timeout, tv_timeout);
1297 ACE_ConsumerQOS_Factory consumer_qos;
1298 consumer_qos.start_disjunction_group ();
1299 consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
1300 timeout,
1301 rt_info);
1303 // = Connect as a consumer.
1304 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
1305 ec->for_consumers ();
1307 this->supplier_proxy_ =
1308 consumer_admin->obtain_push_supplier ();
1310 RtecEventComm::PushConsumer_var cref =
1311 this->consumer_._this ();
1313 this->supplier_proxy_->connect_push_consumer (
1314 cref.in (),
1315 consumer_qos.get_ConsumerQOS ());
1318 void
1319 Test_Supplier::push (const RtecEventComm::EventSet& events)
1321 #if 0
1322 const int bufsize = 128;
1323 char buf[bufsize];
1324 ACE_OS::sprintf (buf, "Supplier %d receives event in thread: ",
1325 this->supplier_id_);
1326 print_priority_info (buf);
1327 #endif
1329 if (events.length () == 0 || this->message_count_ < 0)
1331 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
1332 return;
1335 RtecEventComm::EventSet sent (events.length ());
1336 sent.length (events.length ());
1338 for (u_int i = 0; i < events.length (); ++i)
1340 const RtecEventComm::Event& e = events[i];
1341 if (e.header.type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
1342 continue;
1344 // ACE_DEBUG ((LM_DEBUG, "Test_Supplier - timeout (%t)\n"));
1346 RtecEventComm::Event& s = sent[i];
1347 s.header.source = this->supplier_id_;
1348 s.header.ttl = 1;
1350 ACE_hrtime_t t = ACE_OS::gethrtime ();
1351 ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
1353 this->message_count_--;
1355 if (this->message_count_ < 0)
1357 this->test_->shutdown_supplier (this->cookie_,
1358 this->consumer_proxy_.in ());
1360 if (this->message_count_ % 2 == 0)
1362 // Generate an A event...
1363 s.header.type = this->event_a_;
1365 else
1367 s.header.type = this->event_b_;
1370 this->test_->push_supplier (this->cookie_,
1371 this->consumer_proxy_.in (),
1372 sent);
1375 void
1376 Test_Supplier::disconnect_push_supplier ()
1378 if (CORBA::is_nil (this->supplier_proxy_.in ()))
1379 return;
1381 this->supplier_proxy_->disconnect_push_supplier ();
1384 void
1385 Test_Supplier::disconnect_push_consumer ()
1389 int Test_Supplier::supplier_id () const
1391 return this->supplier_id_;
1394 Test_Consumer::Test_Consumer (Test_ECG *test,
1395 void *cookie)
1396 : test_ (test),
1397 cookie_ (cookie)
1401 void
1402 Test_Consumer::open (const char* name,
1403 int event_a, int event_b,
1404 RtecEventChannelAdmin::EventChannel_ptr ec)
1406 RtecScheduler::Scheduler_ptr server =
1407 ACE_Scheduler_Factory::server ();
1409 RtecScheduler::handle_t rt_info =
1410 server->create (name);
1412 // The worst case execution time is far less than 2
1413 // milliseconds, but that is a safe estimate....
1414 ACE_Time_Value tv (0, 2000);
1415 TimeBase::TimeT time;
1416 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
1417 ACE_DEBUG ((LM_DEBUG, "register consumer \"%C\"\n", name));
1418 server->set (rt_info,
1419 RtecScheduler::VERY_HIGH_CRITICALITY,
1420 time, time, time,
1422 RtecScheduler::VERY_LOW_IMPORTANCE,
1423 time,
1425 RtecScheduler::OPERATION);
1427 ACE_ConsumerQOS_Factory qos;
1428 qos.start_disjunction_group ();
1429 qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info);
1430 qos.insert_type (event_a, rt_info);
1431 qos.insert_type (event_b, rt_info);
1433 // = Connect as a consumer.
1434 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
1435 ec->for_consumers ();
1437 this->supplier_proxy_ =
1438 consumer_admin->obtain_push_supplier ();
1440 RtecEventComm::PushConsumer_var objref = this->_this ();
1442 this->supplier_proxy_->connect_push_consumer (objref.in (),
1443 qos.get_ConsumerQOS ());
1446 void
1447 Test_Consumer::close ()
1449 if (CORBA::is_nil (this->supplier_proxy_.in ()))
1450 return;
1452 RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
1453 this->supplier_proxy_._retn ();
1454 proxy->disconnect_push_supplier ();
1457 void
1458 Test_Consumer::push (const RtecEventComm::EventSet& events)
1460 ACE_hrtime_t arrival = ACE_OS::gethrtime ();
1461 this->test_->push_consumer (this->cookie_, arrival, events);
1464 void
1465 Test_Consumer::disconnect_push_consumer ()
1470 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
1472 Test_ECG *test;
1474 // Dynamically allocate the Test_ECG instance so that we don't have
1475 // to worry about running out of stack space if it's large.
1476 ACE_NEW_RETURN (test,
1477 Test_ECG,
1478 -1);
1480 const int status = test->run (argc, argv);
1482 delete test;
1483 return status;