Merge pull request #2301 from sonndinh/remove-dup-reactor-functions
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Throughput / ECT_Consumer_Driver.cpp
blob5e63b9380b6285040d3542f06ac56d3e18925b9a
1 #include "ECT_Consumer_Driver.h"
3 #include "orbsvcs/CosNamingC.h"
4 #include "orbsvcs/Event_Utilities.h"
5 #include "orbsvcs/Event_Service_Constants.h"
6 #include "orbsvcs/Time_Utilities.h"
8 #include "tao/Timeprobe.h"
9 #include "tao/debug.h"
11 #include "ace/Get_Opt.h"
12 #include <memory>
13 #include "ace/Sched_Params.h"
14 #include "ace/OS_NS_errno.h"
15 #include "ace/OS_NS_unistd.h"
17 int
18 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
20 ECT_Consumer_Driver driver;
21 return driver.run (argc, argv);
24 // ****************************************************************
26 ECT_Consumer_Driver::ECT_Consumer_Driver ()
27 : n_consumers_ (1),
28 n_suppliers_ (1),
29 type_start_ (ACE_ES_EVENT_UNDEFINED),
30 type_count_ (1),
31 stall_length_(0),
32 shutdown_event_channel_ (1),
33 pid_file_name_ (0),
34 active_count_ (0)
38 ECT_Consumer_Driver::~ECT_Consumer_Driver ()
42 int
43 ECT_Consumer_Driver::run (int argc, ACE_TCHAR* argv[])
45 try
47 this->orb_ =
48 CORBA::ORB_init (argc, argv);
50 CORBA::Object_var poa_object =
51 this->orb_->resolve_initial_references("RootPOA");
53 if (CORBA::is_nil (poa_object.in ()))
54 ACE_ERROR_RETURN ((LM_ERROR,
55 " (%P|%t) Unable to initialize the POA.\n"),
56 1);
58 PortableServer::POA_var root_poa =
59 PortableServer::POA::_narrow (poa_object.in ());
61 PortableServer::POAManager_var poa_manager =
62 root_poa->the_POAManager ();
64 if (this->parse_args (argc, argv))
65 return 1;
67 if (TAO_debug_level > 0)
69 ACE_DEBUG ((LM_DEBUG,
70 "Execution parameters:\n"
71 " consumers = <%d>\n"
72 " suppliers = <%d>\n"
73 " type start = <%d>\n"
74 " type count = <%d>\n"
75 " pid file name = <%s>\n",
77 this->n_consumers_,
78 this->n_suppliers_,
79 this->type_start_,
80 this->type_count_,
82 this->pid_file_name_?this->pid_file_name_:ACE_TEXT("nil")));
85 if (this->pid_file_name_ != 0)
87 FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w");
88 if (pid != 0)
90 ACE_OS::fprintf (pid, "%ld\n",
91 static_cast<long> (ACE_OS::getpid ()));
92 ACE_OS::fclose (pid);
96 int min_priority = ACE_Sched_Params::priority_min (ACE_SCHED_FIFO);
98 // Enable FIFO scheduling
99 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
100 min_priority,
101 ACE_SCOPE_PROCESS)) != 0)
103 if (ACE_OS::last_error () == EPERM)
104 ACE_DEBUG ((LM_DEBUG,
105 "%s: user is not superuser, "
106 "so remain in time-sharing class\n", argv[0]));
107 else
108 ACE_ERROR ((LM_ERROR,
109 "%s: ACE_OS::sched_params failed\n", argv[0]));
112 if (ACE_OS::thr_setprio (min_priority) == -1)
114 ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed,"
115 "no real-time features\n"));
118 CORBA::Object_var naming_obj =
119 this->orb_->resolve_initial_references ("NameService");
121 if (CORBA::is_nil (naming_obj.in ()))
122 ACE_ERROR_RETURN ((LM_ERROR,
123 " (%P|%t) Unable to get the Naming Service.\n"),
126 CosNaming::NamingContext_var naming_context =
127 CosNaming::NamingContext::_narrow (naming_obj.in ());
129 CosNaming::Name schedule_name (1);
130 schedule_name.length (1);
131 schedule_name[0].id = CORBA::string_dup ("ScheduleService");
133 CORBA::Object_var sched_obj =
134 naming_context->resolve (schedule_name);
135 if (CORBA::is_nil (sched_obj.in ()))
136 return 1;
137 RtecScheduler::Scheduler_var scheduler =
138 RtecScheduler::Scheduler::_narrow (sched_obj.in ());
140 CosNaming::Name name (1);
141 name.length (1);
142 name[0].id = CORBA::string_dup ("EventService");
144 CORBA::Object_var ec_obj =
145 naming_context->resolve (name);
147 RtecEventChannelAdmin::EventChannel_var channel;
148 if (CORBA::is_nil (ec_obj.in ()))
149 channel = RtecEventChannelAdmin::EventChannel::_nil ();
150 else
151 channel = RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in ());
153 poa_manager->activate ();
155 this->connect_consumers (scheduler.in (), channel.in ());
157 ACE_DEBUG ((LM_DEBUG, "connected consumer(s)\n"));
159 ACE_DEBUG ((LM_DEBUG, "running the test\n"));
160 for (;;)
162 ACE_Time_Value tv (1, 0);
163 this->orb_->perform_work (tv);
164 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 1);
165 if (this->active_count_ <= 0)
166 break;
168 ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
170 this->dump_results ();
172 this->disconnect_consumers ();
174 if (this->shutdown_event_channel_ != 0)
176 channel->destroy ();
179 root_poa->destroy (true, true);
181 this->orb_->destroy ();
183 catch (const CORBA::SystemException& sys_ex)
185 sys_ex._tao_print_exception ("SYS_EX");
187 catch (const CORBA::Exception& ex)
189 ex._tao_print_exception ("NON SYS EX");
191 return 0;
194 void
195 ECT_Consumer_Driver::shutdown_consumer (void*)
197 // int ID =
198 // (reinterpret_cast<Test_Consumer**> (consumer_cookie)
199 // - this->consumers_);
201 // ACE_DEBUG ((LM_DEBUG, "(%t) events received by consumer %d\n", ID));
203 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
204 this->active_count_--;
207 void
208 ECT_Consumer_Driver::connect_consumers
209 (RtecScheduler::Scheduler_ptr scheduler,
210 RtecEventChannelAdmin::EventChannel_ptr channel)
213 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
214 this->active_count_ = this->n_consumers_;
216 for (int i = 0; i < this->n_consumers_; ++i)
218 char buf[BUFSIZ];
219 ACE_OS::sprintf (buf, "consumer_%02d", i);
221 ACE_NEW (this->consumers_[i],
222 Test_Consumer (this,
223 this->consumers_ + i,
224 this->n_suppliers_,
225 this->stall_length_));
227 this->consumers_[i]->connect (scheduler,
228 buf,
229 this->type_start_,
230 this->type_count_,
231 channel);
235 void
236 ECT_Consumer_Driver::dump_results ()
238 ACE_High_Res_Timer::global_scale_factor_type gsf =
239 ACE_High_Res_Timer::global_scale_factor ();
241 ACE_Throughput_Stats throughput;
242 for (int i = 0; i < this->n_consumers_; ++i)
244 ACE_TCHAR buf[BUFSIZ];
245 ACE_OS::sprintf (buf, ACE_TEXT("consumer_%02d"), i);
247 this->consumers_[i]->dump_results (buf, gsf);
248 this->consumers_[i]->accumulate (throughput);
250 throughput.dump_results (ACE_TEXT("ECT_Consumer/totals"), gsf);
253 void
254 ECT_Consumer_Driver::disconnect_consumers ()
256 for (int i = 0; i < this->n_consumers_; ++i)
258 this->consumers_[i]->disconnect ();
260 delete this->consumers_[i];
261 this->consumers_[i] = 0;
266 ECT_Consumer_Driver::parse_args (int argc, ACE_TCHAR *argv [])
268 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("xdc:s:h:p:o:"));
269 int opt;
271 while ((opt = get_opt ()) != EOF)
273 switch (opt)
275 case 'x':
276 this->shutdown_event_channel_ = 0;
277 break;
279 case 'c':
280 this->n_consumers_ = ACE_OS::atoi (get_opt.opt_arg ());
281 break;
283 case 's':
284 this->n_suppliers_ = ACE_OS::atoi (get_opt.opt_arg ());
285 break;
287 case 'h':
289 char* aux;
290 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
292 this->type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
293 arg = ACE_OS::strtok_r (0, ",", &aux);
294 this->type_count_ = ACE_OS::atoi (arg);
296 break;
298 case 'p':
299 this->pid_file_name_ = get_opt.opt_arg ();
300 break;
302 case 'o':
303 this->stall_length_ = ACE_OS::atoi (get_opt.opt_arg ());
304 break;
306 case '?':
307 default:
308 ACE_DEBUG ((LM_DEBUG,
309 "Usage: %s "
310 "[ORB options] "
311 "-d -x "
312 "-c <n_consumers> "
313 "-s <n_suppliers> "
314 "-h <type_start,type_count> "
315 "-p <pid file name> "
316 "\n",
317 argv[0]));
318 return -1;
322 if (this->n_suppliers_ <= 0)
324 ACE_DEBUG ((LM_DEBUG,
325 "%s: number of suppliers (%d) is out of range, "
326 "reset to default (%d)\n",
327 argv[0], this->n_suppliers_,
328 1));
329 this->n_suppliers_ = 1;
332 if (this->n_consumers_ <= 0)
334 ACE_ERROR_RETURN ((LM_ERROR,
335 "%s: number of consumers or "
336 "suppliers out of range\n", argv[0]), -1);
339 if (this->type_count_ <= 0)
341 this->type_count_ = 1;
342 ACE_ERROR_RETURN ((LM_ERROR,
343 "%s: number of event types "
344 "suppliers out of range, reset to default (1)\n",
345 argv[0]), -1);
348 return 0;