Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Multiple / EC_Multiple.cpp
blobd254b7c177354fa6359aa5938b061b04cdbc686d
1 #include "EC_Multiple.h"
3 #include "Scheduler_Runtime1.h"
4 #include "Scheduler_Runtime2.h"
5 #include "Scheduler_Runtime_Dynamic.h" /* infos_3 */
7 #include "orbsvcs/Event_Utilities.h"
8 #include "orbsvcs/Event_Service_Constants.h"
9 #include "orbsvcs/Scheduler_Factory.h"
10 #include "orbsvcs/Time_Utilities.h"
11 #include "orbsvcs/RtecEventChannelAdminC.h"
12 #include "orbsvcs/Sched/Config_Scheduler.h"
13 #include "orbsvcs/Event/EC_Event_Channel.h"
14 #include "orbsvcs/Runtime_Scheduler.h"
16 #include "tao/ORB_Core.h"
18 #include "ace/Get_Opt.h"
19 #include "ace/Auto_Ptr.h"
20 #include "ace/Sched_Params.h"
21 #include "ace/OS_NS_errno.h"
22 #include "ace/OS_NS_strings.h"
24 #if defined (sun)
25 # include <sys/lwp.h> /* for _lwp_self */
26 #endif /* sun */
28 Test_ECG::Test_ECG (void)
29 : lcl_name_ ("Test_ECG"),
30 rmt_name_ (""),
31 scheduling_type_ (Test_ECG::ss_runtime),
32 consumer_disconnects_ (0),
33 supplier_disconnects_ (0),
34 short_circuit_ (0),
35 hp_suppliers_ (1),
36 hp_consumers_ (1),
37 hp_workload_ (10),
38 hp_interval_ (25000),
39 hp_message_count_ (200),
40 hps_event_a_ (ACE_ES_EVENT_UNDEFINED),
41 hps_event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
42 hpc_event_a_ (ACE_ES_EVENT_UNDEFINED),
43 hpc_event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
44 lp_suppliers_ (0),
45 lp_consumers_ (0),
46 lp_workload_ (0),
47 lp_interval_ (100000),
48 lp_message_count_ (50),
49 lps_event_a_ (0),
50 lps_event_b_ (0),
51 lpc_event_a_ (0),
52 lpc_event_b_ (0),
53 schedule_file_ (0),
54 pid_file_name_ (0),
55 ready_ (0),
56 ready_cnd_ (ready_mtx_)
60 void
61 print_priority_info (const char *const name)
63 #if defined (ACE_HAS_PTHREADS) || defined (sun)
64 #if defined (ACE_HAS_PTHREADS)
65 struct sched_param param;
66 int policy, status;
68 if ((status = pthread_getschedparam (pthread_self (), &policy,
69 &param)) == 0) {
70 # ifdef sun
71 ACE_DEBUG ((LM_DEBUG,
72 "%C (%lu|%u); policy is %d, priority is %d\n",
73 name,
74 ACE_OS::getpid (),
75 _lwp_self (),
76 pthread_self (),
77 policy, param.sched_priority));
78 # else /* ! sun */
79 ACE_DEBUG ((LM_DEBUG,
80 "%C (%lu|%u); policy is %d, priority is %d\n",
81 name,
82 ACE_OS::getpid (),
84 pthread_self (),
85 policy, param.sched_priority ));
86 # endif /* ! sun */
87 } else {
88 ACE_DEBUG ((LM_DEBUG,"pthread_getschedparam failed: %d\n", status));
90 #else
91 ACE_UNUSED_ARG (name);
92 #endif /* ACE_HAS_PTHREADS */
94 #ifdef sun
95 // Find what scheduling class the thread's LWP is in.
96 //FUZZ: disable check_for_lack_ACE_OS
97 ACE_Sched_Params sched_params (ACE_SCHED_OTHER, 0);
98 //FUZZ: enable check_for_lack_ACE_OS
99 if (ACE_OS::lwp_getparams (sched_params) == -1)
101 ACE_OS::perror ("ACE_OS::lwp_getparams");
102 return;
104 else if (sched_params.policy () == ACE_SCHED_FIFO ||
105 sched_params.policy () == ACE_SCHED_RR)
107 // This thread's LWP is in the RT class.
108 ACE_DEBUG ((LM_DEBUG,
109 "RT class; priority: %d, quantum: %u msec\n",
110 sched_params.priority (),
111 sched_params.quantum ().msec ()));
113 else
115 ACE_DEBUG ((LM_DEBUG,
116 "TS class; priority: %d\n",
117 sched_params.priority ()));
119 #endif /* sun */
120 #else
121 ACE_UNUSED_ARG (name);
122 #endif /* ACE_HAS_PTHREADS || sun */
126 Test_ECG::run (int argc, ACE_TCHAR* argv[])
130 CORBA::ORB_var orb =
131 CORBA::ORB_init (argc, argv);
133 CORBA::Object_var poa_object =
134 orb->resolve_initial_references("RootPOA");
136 if (CORBA::is_nil (poa_object.in ()))
137 ACE_ERROR_RETURN ((LM_ERROR,
138 " (%P|%t) Unable to initialize the POA.\n"),
141 PortableServer::POA_var root_poa =
142 PortableServer::POA::_narrow (poa_object.in ());
144 PortableServer::POAManager_var poa_manager =
145 root_poa->the_POAManager ();
147 poa_manager->activate ();
149 if (this->parse_args (argc, argv))
150 return 1;
152 ACE_DEBUG ((LM_DEBUG,
153 "Execution parameters:\n"
154 " lcl name = <%C>\n"
155 " rmt name = <%C>\n"
156 " scheduler type = <%d>\n"
157 " consumer disconnects = <%d>\n"
158 " supplier disconnects = <%d>\n"
159 " short circuit EC = <%d>\n"
160 " HP suppliers = <%d>\n"
161 " HP consumers = <%d>\n"
162 " HP workload = <%d> (iterations)\n"
163 " HP interval between events = <%d> (usecs)\n"
164 " HP message count = <%d>\n"
165 " HP supplier Event A = <%d>\n"
166 " HP supplier Event B = <%d>\n"
167 " HP consumer Event A = <%d>\n"
168 " HP consumer Event B = <%d>\n"
169 " LP suppliers = <%d>\n"
170 " LP consumers = <%d>\n"
171 " LP workload = <%d> (iterations)\n"
172 " LP interval between events = <%d> (usecs)\n"
173 " LP message count = <%d>\n"
174 " LP supplier Event A = <%d>\n"
175 " LP supplier Event B = <%d>\n"
176 " LP consumer Event A = <%d>\n"
177 " LP consumer Event B = <%d>\n"
178 " schedule_file = <%s>\n"
179 " pid file name = <%s>\n",
180 this->lcl_name_.length () ? this->lcl_name_.c_str () : "nil",
181 this->rmt_name_.length () ? this->rmt_name_.c_str () : "nil",
182 this->scheduling_type_,
183 this->consumer_disconnects_,
184 this->supplier_disconnects_,
185 this->short_circuit_,
187 this->hp_suppliers_,
188 this->hp_consumers_,
189 this->hp_workload_,
190 this->hp_interval_,
191 this->hp_message_count_,
192 this->hps_event_a_,
193 this->hps_event_b_,
194 this->hpc_event_a_,
195 this->hpc_event_b_,
197 this->lp_suppliers_,
198 this->lp_consumers_,
199 this->lp_workload_,
200 this->lp_interval_,
201 this->lp_message_count_,
202 this->lps_event_a_,
203 this->lps_event_b_,
204 this->lpc_event_a_,
205 this->lpc_event_b_,
207 this->schedule_file_?this->schedule_file_:ACE_TEXT("nil"),
208 this->pid_file_name_?this->pid_file_name_:ACE_TEXT("nil")) );
210 print_priority_info ("Test_ECG::run (Main)");
212 if (this->pid_file_name_ != 0)
214 FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w");
215 if (pid != 0)
217 ACE_OS::fprintf (pid, "%ld\n",
218 static_cast<long> (ACE_OS::getpid ()));
219 ACE_OS::fclose (pid);
223 int min_priority =
224 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO);
225 // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
227 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
228 min_priority,
229 ACE_SCOPE_PROCESS)) != 0)
231 if (ACE_OS::last_error () == EPERM)
232 ACE_DEBUG ((LM_DEBUG,
233 "%s: user is not superuser, "
234 "so remain in time-sharing class\n", argv[0]));
235 else
236 ACE_ERROR ((LM_ERROR,
237 "%s: ACE_OS::sched_params failed\n", argv[0]));
240 if (ACE_OS::thr_setprio (min_priority) == -1)
242 ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed\n"));
245 print_priority_info ("Test_ECG::run (Main after thr_setprio)");
247 CORBA::Object_var naming_obj =
248 orb->resolve_initial_references ("NameService");
250 if (CORBA::is_nil (naming_obj.in ()))
251 ACE_ERROR_RETURN ((LM_ERROR,
252 " (%P|%t) Unable to get the Naming Service.\n"),
255 CosNaming::NamingContext_var naming_context =
256 CosNaming::NamingContext::_narrow (naming_obj.in ());
258 auto_ptr<POA_RtecScheduler::Scheduler> scheduler_impl;
259 RtecScheduler::Scheduler_var scheduler;
261 switch (this->scheduling_type_)
263 default:
264 ACE_ERROR ((LM_WARNING, "Unknown scheduling type %d\n",
265 this->scheduling_type_));
266 /*FALLTHROUGH*/
267 case Test_ECG::ss_global:
268 break;
270 case Test_ECG::ss_local:
272 auto_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl (new ACE_Config_Scheduler);
273 scheduler_impl = auto_scheduler_impl;
275 if (scheduler_impl.get () == 0)
276 return -1;
277 scheduler = scheduler_impl->_this ();
278 break;
280 case Test_ECG::ss_runtime:
281 if (ACE_OS::strcmp (this->lcl_name_.c_str (), "ECM1") == 0)
283 // This setups Scheduler_Factory to use the runtime version
284 ACE_Scheduler_Factory::use_runtime (
285 sizeof (runtime_configs_1)/sizeof (runtime_configs_1[0]),
286 runtime_configs_1,
287 sizeof (runtime_infos_1)/sizeof (runtime_infos_1[0]),
288 runtime_infos_1);
290 auto_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl
291 (new ACE_Runtime_Scheduler (runtime_configs_1_size,
292 runtime_configs_1,
293 runtime_infos_1_size,
294 runtime_infos_1));
295 scheduler_impl = auto_scheduler_impl;
297 if (scheduler_impl.get () == 0)
298 return -1;
299 scheduler = scheduler_impl->_this ();
301 else if (ACE_OS::strcmp (this->lcl_name_.c_str (), "ECM2") == 0)
303 // This setups Scheduler_Factory to use the runtime version
304 ACE_Scheduler_Factory::use_runtime (
305 sizeof (runtime_configs_2)/sizeof (runtime_configs_2[0]),
306 runtime_configs_2,
307 sizeof (runtime_infos_2)/sizeof (runtime_infos_2[0]),
308 runtime_infos_2);
310 auto_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl
311 (new ACE_Runtime_Scheduler (runtime_configs_2_size,
312 runtime_configs_2,
313 runtime_infos_2_size,
314 runtime_infos_2));
315 scheduler_impl = auto_scheduler_impl;
317 if (scheduler_impl.get () == 0)
318 return -1;
319 scheduler = scheduler_impl->_this ();
321 else if (ACE_OS::strcmp (this->lcl_name_.c_str (), "ECM3") == 0)
323 // This setups Scheduler_Factory to use the runtime version
324 ACE_Scheduler_Factory::use_runtime (
325 sizeof (runtime_configs_3)/sizeof (runtime_configs_3[0]),
326 runtime_configs_3,
327 sizeof (runtime_infos_3)/sizeof (runtime_infos_3[0]),
328 runtime_infos_3);
330 auto_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl
331 (new ACE_Runtime_Scheduler (runtime_configs_3_size,
332 runtime_configs_3,
333 runtime_infos_3_size,
334 runtime_infos_3));
335 scheduler_impl = auto_scheduler_impl;
337 if (scheduler_impl.get () == 0)
338 return -1;
339 scheduler = scheduler_impl->_this ();
341 else
343 ACE_ERROR ((LM_WARNING,
344 "Unknown name <%C> defaulting to "
345 "config scheduler\n", this->lcl_name_.c_str ()));
347 auto_ptr<POA_RtecScheduler::Scheduler> auto_scheduler_impl (new ACE_Config_Scheduler);
348 scheduler_impl = auto_scheduler_impl;
350 if (scheduler_impl.get () == 0)
351 return -1;
352 scheduler = scheduler_impl->_this ();
354 break;
358 // We use this buffer to generate the names of the local
359 // services.
360 const int bufsize = 512;
361 char buf[bufsize];
363 // Register the scheduler with the naming service.
364 switch (this->scheduling_type_)
366 default:
367 case Test_ECG::ss_global:
368 break;
370 case Test_ECG::ss_local:
371 case Test_ECG::ss_runtime:
373 CORBA::String_var str =
374 orb->object_to_string (scheduler.in ());
375 ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%C>\n",
376 str.in ()));
378 ACE_OS::strcpy (buf, "ScheduleService@");
379 ACE_OS::strcat (buf, this->lcl_name_.c_str ());
381 // Register the servant with the Naming Context....
382 CosNaming::Name schedule_name (1);
383 schedule_name.length (1);
384 schedule_name[0].id = CORBA::string_dup (buf);
385 naming_context->bind (schedule_name, scheduler.in ());
387 if (ACE_Scheduler_Factory::use_config (naming_context.in (),
388 buf) == -1)
389 return -1;
391 break;
394 // Create the EventService implementation, but don't start its
395 // internal threads.
396 TAO_EC_Event_Channel_Attributes attr (root_poa.in (),
397 root_poa.in ());
398 TAO_EC_Event_Channel ec_impl (attr);
400 // Register Event_Service with the Naming Service.
401 RtecEventChannelAdmin::EventChannel_var ec =
402 ec_impl._this ();
404 CORBA::String_var str =
405 orb->object_to_string (ec.in ());
407 ACE_OS::sleep (5);
408 ACE_DEBUG ((LM_DEBUG, "The (local) EC IOR is <%C>\n", str.in ()));
410 ACE_OS::strcpy (buf, "EventChannel@");
411 ACE_OS::strcat (buf, this->lcl_name_.c_str ());
413 CosNaming::Name channel_name (1);
414 channel_name.length (1);
415 channel_name[0].id = CORBA::string_dup (buf);
416 naming_context->bind (channel_name, ec.in ());
418 ACE_DEBUG ((LM_DEBUG, "waiting to start\n"));
420 ACE_Time_Value tv (15, 0);
422 if (this->rmt_name_.length () != 0)
424 orb->run (&tv);
427 ACE_DEBUG ((LM_DEBUG, "starting....\n"));
429 RtecEventChannelAdmin::EventChannel_var local_ec =
430 this->get_ec (naming_context.in (),
431 this->lcl_name_.c_str ());
433 ACE_DEBUG ((LM_DEBUG, "located local EC\n"));
435 for (int sd = 0; sd < this->supplier_disconnects_; ++sd)
437 this->connect_suppliers (local_ec.in ());
438 this->disconnect_suppliers ();
439 ACE_OS::sleep (5);
440 ACE_DEBUG ((LM_DEBUG, "Supplier disconnection %d\n", sd));
443 this->connect_suppliers (local_ec.in ());
445 ACE_DEBUG ((LM_DEBUG, "connected supplier\n"));
447 RtecEventChannelAdmin::Observer_Handle observer_handle = 0;
448 if (this->rmt_name_.length () != 0)
450 tv.set (5, 0);
451 orb->run (&tv);
453 RtecEventChannelAdmin::EventChannel_var remote_ec =
454 this->get_ec (naming_context.in (),
455 this->rmt_name_.c_str ());
456 ACE_DEBUG ((LM_DEBUG, "located remote EC\n"));
458 CosNaming::Name rsch_name (1);
459 rsch_name.length (1);
460 ACE_OS::strcpy (buf, "ScheduleService");
461 if (this->scheduling_type_ != Test_ECG::ss_global)
463 ACE_OS::strcat (buf, "@");
464 ACE_OS::strcat (buf, this->rmt_name_.c_str ());
466 rsch_name[0].id = CORBA::string_dup (buf);
467 CORBA::Object_var tmpobj =
468 naming_context->resolve (rsch_name);
470 RtecScheduler::Scheduler_var remote_sch =
471 RtecScheduler::Scheduler::_narrow (tmpobj.in ());
473 this->connect_ecg (local_ec.in (),
474 remote_ec.in (),
475 remote_sch.in ());
477 ACE_DEBUG ((LM_DEBUG, "connected proxy\n"));
479 tv.set (5, 0);
480 orb->run (&tv);
482 RtecEventChannelAdmin::Observer_ptr observer =
483 this->ecg_._this ();
484 observer_handle = ec_impl.append_observer (observer);
487 for (int cd = 0; cd < this->consumer_disconnects_; ++cd)
489 this->connect_consumers (local_ec.in ());
490 this->disconnect_consumers ();
491 ACE_OS::sleep (5);
492 ACE_DEBUG ((LM_DEBUG, "Consumer disconnection %d\n", cd));
494 this->connect_consumers (local_ec.in ());
496 ACE_DEBUG ((LM_DEBUG, "connected consumer\n"));
498 this->activate_suppliers (local_ec.in ());
500 ACE_DEBUG ((LM_DEBUG, "suppliers are active\n"));
502 this->running_suppliers_ = this->hp_suppliers_ + this->lp_suppliers_;
504 // Acquire the mutex for the ready mutex, blocking any supplier
505 // that may start after this point.
506 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ready_mon, this->ready_mtx_, 1);
507 this->ready_ = 1;
508 this->test_start_ = ACE_OS::gethrtime ();
509 this->ready_cnd_.broadcast ();
510 ready_mon.release ();
512 ACE_DEBUG ((LM_DEBUG, "activate the EC\n"));
514 if (this->rmt_name_.length () != 0)
516 ec_impl.remove_observer (observer_handle);
519 // Create the EC internal threads
520 ec_impl.activate ();
522 ACE_DEBUG ((LM_DEBUG, "running the test\n"));
523 orb->run ();
525 this->test_stop_ = ACE_OS::gethrtime ();
527 ACE_DEBUG ((LM_DEBUG, "shutdown the EC\n"));
528 ec_impl.shutdown ();
530 this->dump_results ();
532 if (this->schedule_file_ != 0)
534 RtecScheduler::RT_Info_Set_var infos;
535 RtecScheduler::Dependency_Set_var deps;
536 RtecScheduler::Config_Info_Set_var configs;
537 RtecScheduler::Scheduling_Anomaly_Set_var anomalies;
539 ACE_Scheduler_Factory::server ()->compute_scheduling
540 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
541 ACE_SCOPE_THREAD),
542 ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
543 ACE_SCOPE_THREAD),
544 infos.out (), deps.out (),
545 configs.out (), anomalies.out ());
547 ACE_Scheduler_Factory::dump_schedule (infos.in (),
548 deps.in (),
549 configs.in (),
550 anomalies.in (),
551 this->schedule_file_);
554 naming_context->unbind (channel_name);
556 if (this->rmt_name_.length () != 0)
558 this->ecg_.close ();
559 this->ecg_.shutdown ();
562 this->disconnect_consumers ();
563 this->disconnect_suppliers ();
565 ACE_DEBUG ((LM_DEBUG, "shutdown grace period\n"));
566 tv.set (5, 0);
567 orb->run (&tv);
569 catch (const CORBA::SystemException& sys_ex)
571 sys_ex._tao_print_exception ("SYS_EX");
573 catch (const CORBA::Exception& ex)
575 ex._tao_print_exception ("NON SYS EX");
577 return 0;
580 RtecEventChannelAdmin::EventChannel_ptr
581 Test_ECG::get_ec (CosNaming::NamingContext_ptr naming_context,
582 const char* process_name)
584 const int bufsize = 512;
585 char buf[bufsize];
586 ACE_OS::strcpy (buf, "EventChannel@");
587 ACE_OS::strcat (buf, process_name);
589 CosNaming::Name channel_name (1);
590 channel_name.length (1);
591 channel_name[0].id = CORBA::string_dup (buf);
593 CORBA::Object_var ec_ptr =
594 naming_context->resolve (channel_name);
595 if (CORBA::is_nil (ec_ptr.in ()))
596 return RtecEventChannelAdmin::EventChannel::_nil ();
598 return RtecEventChannelAdmin::EventChannel::_narrow (ec_ptr.in ());
601 void
602 Test_ECG::disconnect_suppliers (void)
604 for (int i = 0; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
606 this->suppliers_[i]->close ();
610 void
611 Test_ECG::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec)
613 int i;
614 for (i = 0; i < this->hp_suppliers_; ++i)
616 // Limit the number of messages sent by each supplier
617 int mc = this->hp_message_count_ / this->hp_suppliers_;
618 if (mc == 0)
619 mc = 1;
621 char buf[BUFSIZ];
622 ACE_OS::sprintf (buf, "hp_supplier_%02d@%s", i, this->lcl_name_.c_str ());
624 ACE_NEW (this->suppliers_[i],
625 Test_Supplier (this, this->suppliers_ + i));
627 this->suppliers_[i]->open (buf,
628 this->hps_event_a_,
629 this->hps_event_b_,
631 this->hp_interval_ * 10,
632 local_ec);
635 for (; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
637 // Limit the number of messages sent by each supplier
638 int mc = this->lp_message_count_ / this->lp_suppliers_;
639 if (mc == 0)
640 mc = 1;
642 char buf[BUFSIZ];
643 ACE_OS::sprintf (buf, "lp_supplier_%02d@%s",
644 i - this->hp_suppliers_, this->lcl_name_.c_str ());
646 ACE_NEW (this->suppliers_[i],
647 Test_Supplier (this, this->suppliers_ + i));
649 this->suppliers_[i]->open (buf,
650 this->lps_event_a_,
651 this->lps_event_b_,
653 this->lp_interval_ * 10,
654 local_ec);
658 void
659 Test_ECG::disconnect_consumers (void)
661 for (int i = 0; i < this->hp_consumers_ + this->lp_consumers_; ++i)
663 this->consumers_[i]->close ();
667 void
668 Test_ECG::activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec)
672 int i;
673 for (i = 0; i < this->hp_suppliers_; ++i)
675 // Limit the number of messages sent by each supplier
676 int mc = this->hp_message_count_ / this->hp_suppliers_;
677 if (mc == 0)
678 mc = 1;
680 char buf[BUFSIZ];
681 ACE_OS::sprintf (buf, "hp_supplier_%02d@%s", i, this->lcl_name_.c_str ());
683 this->suppliers_[i]->activate (buf,
684 this->hp_interval_ * 10,
685 local_ec);
688 for (; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
690 // Limit the number of messages sent by each supplier
691 int mc = this->lp_message_count_ / this->lp_suppliers_;
692 if (mc == 0)
693 mc = 1;
695 char buf[BUFSIZ];
696 ACE_OS::sprintf (buf, "lp_supplier_%02d@%s",
697 i - this->hp_suppliers_, this->lcl_name_.c_str ());
699 this->suppliers_[i]->activate (buf,
700 this->lp_interval_ * 10,
701 local_ec);
704 catch (const CORBA::Exception&)
706 throw;
710 void
711 Test_ECG::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec)
713 int i;
714 for (i = 0; i < this->hp_consumers_; ++i)
716 char buf[BUFSIZ];
717 ACE_OS::sprintf (buf, "hp_consumer_%02d@%s", i, this->lcl_name_.c_str ());
719 ACE_NEW (this->consumers_[i],
720 Test_Consumer (this, this->consumers_ + i));
722 this->consumers_[i]->open (buf,
723 this->hpc_event_a_,
724 this->hpc_event_b_,
725 local_ec);
726 this->stats_[i].total_time_ = 0;
727 this->stats_[i].lcl_count_ = 0;
728 this->stats_[i].rmt_count_ = 0;
731 for (; i < this->hp_consumers_ + this->lp_consumers_; ++i)
733 char buf[BUFSIZ];
734 ACE_OS::sprintf (buf, "lp_consumer_%02d@%s",
735 i - this->hp_consumers_, this->lcl_name_.c_str ());
737 ACE_NEW (this->consumers_[i],
738 Test_Consumer (this, this->consumers_ + i));
740 this->consumers_[i]->open (buf,
741 this->lpc_event_a_,
742 this->lpc_event_b_,
743 local_ec);
744 this->stats_[i].total_time_ = 0;
745 this->stats_[i].lcl_count_ = 0;
746 this->stats_[i].rmt_count_ = 0;
748 this->running_consumers_ = this->hp_consumers_ + this->lp_consumers_;
751 void
752 Test_ECG::connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec,
753 RtecEventChannelAdmin::EventChannel_ptr remote_ec,
754 RtecScheduler::Scheduler_ptr remote_sch)
756 RtecScheduler::Scheduler_ptr local_sch =
757 ACE_Scheduler_Factory::server ();
759 // ECG name.
760 const int bufsize = 512;
761 char ecg_name[bufsize];
762 ACE_OS::strcpy (ecg_name, "ecg_");
763 ACE_OS::strcat (ecg_name, this->lcl_name_.c_str ());
765 // We could use the same name on the local and remote scheduler,
766 // but that fails when using a global scheduler.
767 char rmt[BUFSIZ];
768 ACE_OS::strcpy (rmt, ecg_name);
769 ACE_OS::strcat (rmt, "@");
770 ACE_OS::strcat (rmt, this->rmt_name_.c_str ());
772 // We could use the same name on the local and remote scheduler,
773 // but that fails when using a global scheduler.
774 char lcl[bufsize];
775 ACE_OS::strcpy (lcl, ecg_name);
776 ACE_OS::strcat (lcl, "@");
777 ACE_OS::strcat (lcl, this->lcl_name_.c_str ());
779 this->ecg_.init (remote_ec, local_ec, remote_sch, local_sch,
780 rmt, lcl);
783 void
784 Test_ECG::push_supplier (void * /* cookie */,
785 RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
786 const RtecEventComm::EventSet &events)
788 this->wait_until_ready ();
789 // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events sent by supplier\n"));
790 // @@ TODO we could keep somekind of stats here...
791 if (!this->short_circuit_)
793 consumer->push (events);
795 else
797 int i = 0;
798 for (; i < this->hp_consumers_; ++i)
800 this->consumers_[i]->push (events);
802 for (; i < this->hp_consumers_ + this->lp_consumers_; ++i)
804 this->consumers_[i]->push (events);
809 void
810 Test_ECG::push_consumer (void *consumer_cookie,
811 ACE_hrtime_t arrival,
812 const RtecEventComm::EventSet &events)
814 int ID =
815 (reinterpret_cast<Test_Consumer**> (consumer_cookie)
816 - this->consumers_);
818 // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events received by consumer %d\n", ID));
820 if (events.length () == 0)
822 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
823 return;
826 // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
828 #if 0
829 const int bufsize = 128;
830 char buf[bufsize];
831 ACE_OS::sprintf (buf, "Consumer %d receives event in thread: ", ID);
832 print_priority_info (buf);
833 #endif
835 for (u_int i = 0; i < events.length (); ++i)
837 const RtecEventComm::Event& e = events[i];
839 if (e.header.type == ACE_ES_EVENT_SHUTDOWN)
841 this->shutdown_consumer (ID);
842 continue;
845 ACE_hrtime_t s;
846 ORBSVCS_Time::TimeT_to_hrtime (s, e.header.creation_time);
847 ACE_hrtime_t nsec = arrival - s;
848 if (this->local_source (e.header.source))
850 int& count = this->stats_[ID].lcl_count_;
852 this->stats_[ID].lcl_latency_[count] = nsec;
853 int workload = this->hp_workload_;
854 int interval = this->hp_interval_;
855 if (ID >= this->hp_consumers_)
857 workload = this->lp_workload_;
858 interval = this->lp_interval_;
861 for (int j = 0; j < workload; ++j)
863 // Eat a little CPU so the Utilization test can measure the
864 // consumed time....
865 /* takes about 40.2 usecs on a 167 MHz Ultra2 */
866 u_long n = 1279UL;
867 ACE::is_prime (n, 2, n / 2);
869 // Increment the elapsed time on this consumer.
870 ACE_hrtime_t now = ACE_OS::gethrtime ();
871 this->stats_[ID].total_time_ += now - arrival;
872 this->stats_[ID].end_[count] = now;
874 // We estimate our laxity based on the event creation
875 // time... it may not be very precise, but will do; other
876 // strategies include:
877 // + Keep track of the "current frame", then then deadline
878 // is the end of the frame.
879 // + Use the start of the test to keep the current frame.
880 // + Use the last execution.
882 CORBA::ULong tmp = ACE_U64_TO_U32 (s - now);
883 this->stats_[ID].laxity_[count] = 1 + tmp/1000.0F/interval;
884 count++;
886 else
888 int& count = this->stats_[ID].rmt_count_;
889 this->stats_[ID].rmt_latency_[count] = nsec;
890 count++;
895 void
896 Test_ECG::wait_until_ready (void)
898 ACE_GUARD (TAO_SYNCH_MUTEX, ready_mon, this->ready_mtx_);
899 while (!this->ready_)
900 this->ready_cnd_.wait ();
903 void
904 Test_ECG::shutdown_supplier (void* /* supplier_cookie */,
905 RtecEventComm::PushConsumer_ptr consumer)
908 this->running_suppliers_--;
909 if (this->running_suppliers_ != 0)
910 return;
912 // We propagate a shutdown event through the system...
913 //FUZZ: disable check_for_lack_ACE_OS
914 RtecEventComm::EventSet shutdown (1);
915 //FUZZ: enable check_for_lack_ACE_OS
916 shutdown.length (1);
917 RtecEventComm::Event& s = shutdown[0];
919 s.header.source = 0;
920 s.header.ttl = 1;
922 ACE_hrtime_t t = ACE_OS::gethrtime ();
923 ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
924 s.header.type = ACE_ES_EVENT_SHUTDOWN;
925 consumer->push (shutdown);
928 void
929 Test_ECG::shutdown_consumer (int id)
931 ACE_DEBUG ((LM_DEBUG, "Shutdown consumer %d\n", id));
932 this->running_consumers_--;
934 if (this->running_consumers_ == 0)
936 if (TAO_ORB_Core_instance ()->orb () == 0)
938 ACE_ERROR ((LM_ERROR,
939 "(%P|%t) Test_ECG::shutdown_consumer, "
940 "ORB instance is 0\n"));
942 else
944 TAO_ORB_Core_instance ()->orb ()->shutdown ();
950 Test_ECG::shutdown (void)
952 ACE_DEBUG ((LM_DEBUG, "Shutting down the multiple EC test\n"));
954 if (this->rmt_name_.length () != 0)
956 this->ecg_.shutdown ();
959 TAO_ORB_Core_instance ()->orb ()->shutdown ();
960 return 0;
963 void
964 Test_ECG::dump_results (void)
966 const int bufsize = 512;
967 ACE_TCHAR buf[bufsize];
969 int i;
970 for (i = 0; i < this->hp_consumers_; ++i)
972 ACE_OS::sprintf (buf, ACE_TEXT("HP%02d"), i);
973 this->dump_results (buf, this->stats_[i]);
975 for (i = 0; i < this->lp_consumers_; ++i)
977 ACE_OS::sprintf (buf, ACE_TEXT("LP%02d"), i);
978 this->dump_results (buf, this->stats_[i + this->hp_consumers_]);
980 CORBA::ULong tmp = ACE_U64_TO_U32 (this->test_stop_ - this->test_start_);
981 double usec = tmp / 1000.0;
982 ACE_DEBUG ((LM_DEBUG, "Time[TOTAL]: %.3f\n", usec));
985 void
986 Test_ECG::dump_results (const ACE_TCHAR* name, Stats& stats)
988 // @@ We are reporting the information without specifics about
989 double usec = ACE_U64_TO_U32 (stats.total_time_) / 1000.0;
990 ACE_DEBUG ((LM_DEBUG, "Time[LCL,%s]: %.3f\n", name, usec));
991 int i;
992 for (i = 1; i < stats.lcl_count_ - 1; ++i)
994 usec = ACE_U64_TO_U32 (stats.lcl_latency_[i]) / 1000.0;
995 ACE_DEBUG ((LM_DEBUG, "Latency[LCL,%s]: %.3f\n", name, usec));
997 double percent = stats.laxity_[i] * 100.0;
998 ACE_DEBUG ((LM_DEBUG, "Laxity[LCL,%s]: %.3f\n", name, percent));
1000 usec = ACE_U64_TO_U32 (stats.end_[i] - this->test_start_) / 1000.0;
1001 ACE_DEBUG ((LM_DEBUG, "Completion[LCL,%s]: %.3f\n", name, usec));
1003 for (i = 1; i < stats.rmt_count_ - 1; ++i)
1005 double usec = ACE_U64_TO_U32 (stats.rmt_latency_[i]) / 1000.0;
1006 ACE_DEBUG ((LM_DEBUG, "Latency[RMT,%s]: %.3f\n", name, usec));
1011 Test_ECG::local_source (RtecEventComm::EventSourceID id) const
1013 for (int i = 0; i < this->hp_suppliers_ + this->lp_suppliers_; ++i)
1015 if (this->suppliers_[i]->supplier_id () == id)
1016 return 1;
1018 return 0;
1022 Test_ECG::parse_args (int argc, ACE_TCHAR *argv [])
1024 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("l:r:s:i:xh:w:p:d:"));
1025 int opt;
1027 while ((opt = get_opt ()) != EOF)
1029 switch (opt)
1031 case 'l':
1032 this->lcl_name_ = ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ());
1033 break;
1035 case 'r':
1036 this->rmt_name_ = ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ());
1037 break;
1039 case 's':
1040 if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT("global")) == 0)
1042 this->scheduling_type_ = Test_ECG::ss_global;
1044 else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT("local")) == 0)
1046 this->scheduling_type_ = Test_ECG::ss_local;
1048 else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT("runtime")) == 0)
1050 this->scheduling_type_ = Test_ECG::ss_runtime;
1052 else
1054 ACE_DEBUG ((LM_DEBUG,
1055 "Unknown scheduling type <%s> "
1056 "defaulting to local\n",
1057 get_opt.opt_arg ()));
1058 this->scheduling_type_ = Test_ECG::ss_local;
1060 break;
1062 case 'x':
1063 this->short_circuit_ = 1;
1064 break;
1066 case 'i':
1068 char* aux = 0;
1069 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
1070 this->consumer_disconnects_ = ACE_OS::atoi (arg);
1071 arg = ACE_OS::strtok_r (0, ",", &aux);
1072 this->supplier_disconnects_ = ACE_OS::atoi (arg);
1074 break;
1076 case 'h':
1078 char* aux = 0;
1079 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
1081 this->hp_suppliers_ = ACE_OS::atoi (arg);
1082 arg = ACE_OS::strtok_r (0, ",", &aux);
1083 this->hp_consumers_ = ACE_OS::atoi (arg);
1084 arg = ACE_OS::strtok_r (0, ",", &aux);
1085 this->hp_workload_ = ACE_OS::atoi (arg);
1086 arg = ACE_OS::strtok_r (0, ",", &aux);
1087 this->hp_interval_ = ACE_OS::atoi (arg);
1088 arg = ACE_OS::strtok_r (0, ",", &aux);
1089 this->hp_message_count_ = ACE_OS::atoi (arg);
1090 arg = ACE_OS::strtok_r (0, ",", &aux);
1091 this->hps_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1092 arg = ACE_OS::strtok_r (0, ",", &aux);
1093 this->hps_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1094 arg = ACE_OS::strtok_r (0, ",", &aux);
1095 this->hpc_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1096 arg = ACE_OS::strtok_r (0, ",", &aux);
1097 this->hpc_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1099 break;
1101 case 'w':
1103 char* aux = 0;
1104 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
1106 this->lp_suppliers_ = ACE_OS::atoi (arg);
1107 arg = ACE_OS::strtok_r (0, ",", &aux);
1108 this->lp_consumers_ = ACE_OS::atoi (arg);
1109 arg = ACE_OS::strtok_r (0, ",", &aux);
1110 this->lp_workload_ = ACE_OS::atoi (arg);
1111 arg = ACE_OS::strtok_r (0, ",", &aux);
1112 this->lp_interval_ = ACE_OS::atoi (arg);
1113 arg = ACE_OS::strtok_r (0, ",", &aux);
1114 this->lp_message_count_ = ACE_OS::atoi (arg);
1115 arg = ACE_OS::strtok_r (0, ",", &aux);
1116 this->lps_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1117 arg = ACE_OS::strtok_r (0, ",", &aux);
1118 this->lps_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1119 arg = ACE_OS::strtok_r (0, ",", &aux);
1120 this->lpc_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1121 arg = ACE_OS::strtok_r (0, ",", &aux);
1122 this->lpc_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
1124 break;
1126 case 'p':
1127 this->pid_file_name_ = get_opt.opt_arg ();
1128 break;
1129 case 'd':
1130 this->schedule_file_ = get_opt.opt_arg ();
1131 break;
1133 case '?':
1134 default:
1135 ACE_DEBUG ((LM_DEBUG,
1136 "Usage: %s "
1137 "[ORB options] "
1138 "-l <local_name> "
1139 "-r <remote_name> "
1140 "-s <global|local|runtime> "
1141 "-i <consumer disc.,supplier disc.> "
1142 "-x (short circuit EC) "
1143 "-h <high priority args> "
1144 "-w <low priority args> "
1145 "-p <pid file name> "
1146 "-d <schedule file name> "
1147 "\n",
1148 argv[0]));
1149 return -1;
1153 if (this->hp_message_count_ < 0
1154 || this->hp_message_count_ >= Test_ECG::MAX_EVENTS)
1156 ACE_DEBUG ((LM_DEBUG,
1157 "%s: HP event count (%d) is out of range, "
1158 "reset to default (%d)\n",
1159 argv[0], this->lp_message_count_,
1160 160));
1161 this->hp_message_count_ = 160;
1164 if (this->lp_message_count_ < 0
1165 || this->lp_message_count_ >= Test_ECG::MAX_EVENTS)
1167 ACE_DEBUG ((LM_DEBUG,
1168 "%s: LP event count (%d) is out of range, "
1169 "reset to default (%d)\n",
1170 argv[0], this->lp_message_count_,
1171 4));
1172 this->lp_message_count_ = 4;
1175 if (this->hp_consumers_ <= 0
1176 || this->lp_consumers_ < 0
1177 || this->hp_consumers_ + this->lp_consumers_ >= Test_ECG::MAX_CONSUMERS
1178 || this->hp_suppliers_ <= 0
1179 || this->lp_suppliers_ < 0
1180 || this->hp_suppliers_ + this->lp_suppliers_ >= Test_ECG::MAX_SUPPLIERS)
1182 ACE_ERROR_RETURN ((LM_DEBUG,
1183 "%s: number of consumers (low: %d, high: %d) or "
1184 "suppliers (low: %d, high: %d) out of range\n",
1185 argv[0],
1186 lp_consumers_, hp_consumers_,
1187 lp_suppliers_, lp_suppliers_), -1);
1190 return 0;
1193 Test_Supplier::Test_Supplier (Test_ECG *test,
1194 void *cookie)
1195 : test_ (test),
1196 cookie_ (cookie),
1197 consumer_ (this)
1201 void
1202 Test_Supplier::open (const char* name,
1203 int event_a,
1204 int event_b,
1205 int message_count,
1206 const RtecScheduler::Period_t& rate,
1207 RtecEventChannelAdmin::EventChannel_ptr ec)
1209 this->event_a_ = event_a;
1210 this->event_b_ = event_b;
1211 this->message_count_ = message_count;
1213 RtecScheduler::Scheduler_ptr server =
1214 ACE_Scheduler_Factory::server ();
1216 RtecScheduler::handle_t rt_info =
1217 server->create (name);
1219 // The execution times are set to reasonable values, but
1220 // actually they are changed on the real execution, i.e. we
1221 // lie to the scheduler to obtain right priorities; but we
1222 // don't care if the set is schedulable.
1223 ACE_Time_Value tv (0, 2000);
1224 TimeBase::TimeT time;
1225 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
1226 ACE_DEBUG ((LM_DEBUG, "register supplier \"%C\"\n", name));
1227 server->set (rt_info,
1228 RtecScheduler::VERY_HIGH_CRITICALITY,
1229 time, time, time,
1230 rate,
1231 RtecScheduler::VERY_LOW_IMPORTANCE,
1232 time,
1234 RtecScheduler::OPERATION);
1236 this->supplier_id_ = ACE::crc32 (name);
1237 ACE_DEBUG ((LM_DEBUG, "ID for <%C> is %04.4x\n", name,
1238 this->supplier_id_));
1240 ACE_SupplierQOS_Factory qos;
1241 qos.insert (this->supplier_id_,
1242 this->event_a_,
1243 rt_info, 1);
1244 qos.insert (this->supplier_id_,
1245 this->event_b_,
1246 rt_info, 1);
1247 qos.insert (this->supplier_id_,
1248 ACE_ES_EVENT_SHUTDOWN,
1249 rt_info, 1);
1251 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
1252 ec->for_suppliers ();
1254 this->consumer_proxy_ =
1255 supplier_admin->obtain_push_consumer ();
1257 RtecEventComm::PushSupplier_var objref = this->_this ();
1259 this->consumer_proxy_->connect_push_supplier (objref.in (),
1260 qos.get_SupplierQOS ());
1263 void
1264 Test_Supplier::close (void)
1266 if (CORBA::is_nil (this->consumer_proxy_.in ()))
1267 return;
1269 RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
1270 this->consumer_proxy_._retn ();
1271 proxy->disconnect_push_consumer ();
1274 void
1275 Test_Supplier::activate (const char* name,
1276 const RtecScheduler::Period_t& rate,
1277 RtecEventChannelAdmin::EventChannel_ptr ec)
1279 RtecScheduler::Scheduler_ptr server =
1280 ACE_Scheduler_Factory::server ();
1282 const int bufsize = 512;
1283 char buf[bufsize];
1284 ACE_OS::strcpy (buf, "consumer_");
1285 ACE_OS::strcat (buf, name);
1286 RtecScheduler::handle_t rt_info =
1287 server->create (buf);
1289 // The execution times are set to reasonable values, but
1290 // actually they are changed on the real execution, i.e. we
1291 // lie to the scheduler to obtain right priorities; but we
1292 // don't care if the set is schedulable.
1293 ACE_Time_Value tv (0, 2000);
1294 TimeBase::TimeT time;
1295 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
1296 ACE_DEBUG ((LM_DEBUG, "activate \"%C\"\n", buf));
1297 server->set (rt_info,
1298 RtecScheduler::VERY_HIGH_CRITICALITY,
1299 time, time, time,
1300 rate,
1301 RtecScheduler::VERY_LOW_IMPORTANCE,
1302 time,
1304 RtecScheduler::OPERATION);
1306 // Also connect our consumer for timeout events from the EC.
1307 int interval = rate / 10;
1308 ACE_Time_Value tv_timeout (interval / ACE_ONE_SECOND_IN_USECS,
1309 interval % ACE_ONE_SECOND_IN_USECS);
1310 TimeBase::TimeT timeout;
1311 ORBSVCS_Time::Time_Value_to_TimeT (timeout, tv_timeout);
1313 ACE_ConsumerQOS_Factory consumer_qos;
1314 consumer_qos.start_disjunction_group ();
1315 consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
1316 timeout,
1317 rt_info);
1319 // = Connect as a consumer.
1320 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
1321 ec->for_consumers ();
1323 this->supplier_proxy_ =
1324 consumer_admin->obtain_push_supplier ();
1326 RtecEventComm::PushConsumer_var cref =
1327 this->consumer_._this ();
1329 this->supplier_proxy_->connect_push_consumer (
1330 cref.in (),
1331 consumer_qos.get_ConsumerQOS ());
1334 void
1335 Test_Supplier::push (const RtecEventComm::EventSet& events)
1337 #if 0
1338 const int bufsize = 128;
1339 char buf[bufsize];
1340 ACE_OS::sprintf (buf, "Supplier %d receives event in thread: ",
1341 this->supplier_id_);
1342 print_priority_info (buf);
1343 #endif
1345 if (events.length () == 0 || this->message_count_ < 0)
1347 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
1348 return;
1351 RtecEventComm::EventSet sent (events.length ());
1352 sent.length (events.length ());
1354 for (u_int i = 0; i < events.length (); ++i)
1356 const RtecEventComm::Event& e = events[i];
1357 if (e.header.type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
1358 continue;
1360 // ACE_DEBUG ((LM_DEBUG, "Test_Supplier - timeout (%t)\n"));
1362 RtecEventComm::Event& s = sent[i];
1363 s.header.source = this->supplier_id_;
1364 s.header.ttl = 1;
1366 ACE_hrtime_t t = ACE_OS::gethrtime ();
1367 ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t);
1369 this->message_count_--;
1371 if (this->message_count_ < 0)
1373 this->test_->shutdown_supplier (this->cookie_,
1374 this->consumer_proxy_.in ());
1376 if (this->message_count_ % 2 == 0)
1378 // Generate an A event...
1379 s.header.type = this->event_a_;
1381 else
1383 s.header.type = this->event_b_;
1386 this->test_->push_supplier (this->cookie_,
1387 this->consumer_proxy_.in (),
1388 sent);
1391 void
1392 Test_Supplier::disconnect_push_supplier (void)
1394 if (CORBA::is_nil (this->supplier_proxy_.in ()))
1395 return;
1397 this->supplier_proxy_->disconnect_push_supplier ();
1400 void
1401 Test_Supplier::disconnect_push_consumer (void)
1405 int Test_Supplier::supplier_id (void) const
1407 return this->supplier_id_;
1410 Test_Consumer::Test_Consumer (Test_ECG *test,
1411 void *cookie)
1412 : test_ (test),
1413 cookie_ (cookie)
1417 void
1418 Test_Consumer::open (const char* name,
1419 int event_a, int event_b,
1420 RtecEventChannelAdmin::EventChannel_ptr ec)
1422 RtecScheduler::Scheduler_ptr server =
1423 ACE_Scheduler_Factory::server ();
1425 RtecScheduler::handle_t rt_info =
1426 server->create (name);
1428 // The worst case execution time is far less than 2
1429 // milliseconds, but that is a safe estimate....
1430 ACE_Time_Value tv (0, 2000);
1431 TimeBase::TimeT time;
1432 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
1433 ACE_DEBUG ((LM_DEBUG, "register consumer \"%C\"\n", name));
1434 server->set (rt_info,
1435 RtecScheduler::VERY_HIGH_CRITICALITY,
1436 time, time, time,
1438 RtecScheduler::VERY_LOW_IMPORTANCE,
1439 time,
1441 RtecScheduler::OPERATION);
1443 ACE_ConsumerQOS_Factory qos;
1444 qos.start_disjunction_group ();
1445 qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info);
1446 qos.insert_type (event_a, rt_info);
1447 qos.insert_type (event_b, rt_info);
1449 // = Connect as a consumer.
1450 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
1451 ec->for_consumers ();
1453 this->supplier_proxy_ =
1454 consumer_admin->obtain_push_supplier ();
1456 RtecEventComm::PushConsumer_var objref = this->_this ();
1458 this->supplier_proxy_->connect_push_consumer (objref.in (),
1459 qos.get_ConsumerQOS ());
1462 void
1463 Test_Consumer::close (void)
1465 if (CORBA::is_nil (this->supplier_proxy_.in ()))
1466 return;
1468 RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
1469 this->supplier_proxy_._retn ();
1470 proxy->disconnect_push_supplier ();
1473 void
1474 Test_Consumer::push (const RtecEventComm::EventSet& events)
1476 ACE_hrtime_t arrival = ACE_OS::gethrtime ();
1477 this->test_->push_consumer (this->cookie_, arrival, events);
1480 void
1481 Test_Consumer::disconnect_push_consumer (void)
1486 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
1488 Test_ECG *test;
1490 // Dynamically allocate the Test_ECG instance so that we don't have
1491 // to worry about running out of stack space if it's large.
1492 ACE_NEW_RETURN (test,
1493 Test_ECG,
1494 -1);
1496 const int status = test->run (argc, argv);
1498 delete test;
1499 return status;