1 #include "ECT_Supplier_Driver.h"
3 #include "orbsvcs/CosNamingC.h"
4 #include "orbsvcs/Event_Utilities.h"
5 #include "orbsvcs/Event_Service_Constants.h"
6 #include "orbsvcs/Time_Utilities.h"
8 #include "tao/Timeprobe.h"
11 #include "ace/Get_Opt.h"
13 #include "ace/Sched_Params.h"
14 #include "ace/OS_NS_errno.h"
15 #include "ace/OS_NS_unistd.h"
18 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
20 ECT_Supplier_Driver driver
;
21 return driver
.run (argc
, argv
);
24 // ****************************************************************
26 ECT_Supplier_Driver::ECT_Supplier_Driver ()
32 type_start_ (ACE_ES_EVENT_UNDEFINED
),
38 ECT_Supplier_Driver::~ECT_Supplier_Driver ()
43 ECT_Supplier_Driver::shutdown_consumer (void*)
48 ECT_Supplier_Driver::run (int argc
, ACE_TCHAR
* argv
[])
53 CORBA::ORB_init (argc
, argv
);
55 CORBA::Object_var poa_object
=
56 orb
->resolve_initial_references("RootPOA");
58 if (CORBA::is_nil (poa_object
.in ()))
59 ACE_ERROR_RETURN ((LM_ERROR
,
60 " (%P|%t) Unable to initialize the POA.\n"),
63 PortableServer::POA_var root_poa
=
64 PortableServer::POA::_narrow (poa_object
.in ());
66 PortableServer::POAManager_var poa_manager
=
67 root_poa
->the_POAManager ();
69 if (this->parse_args (argc
, argv
))
72 if (TAO_debug_level
> 0)
75 "Execution parameters:\n"
77 " burst count = <%d>\n"
78 " burst size = <%d>\n"
79 " event size = <%d>\n"
80 " burst pause = <%d>\n"
81 " type start = <%d>\n"
82 " type count = <%d>\n"
83 " pid file name = <%s>\n",
93 this->pid_file_name_
?this->pid_file_name_
:ACE_TEXT("nil")));
96 if (this->pid_file_name_
!= 0)
98 FILE* pid
= ACE_OS::fopen (this->pid_file_name_
, "w");
101 ACE_OS::fprintf (pid
, "%ld\n",
102 static_cast<long> (ACE_OS::getpid ()));
103 ACE_OS::fclose (pid
);
107 int min_priority
= ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
);
109 // Enable FIFO scheduling
110 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
112 ACE_SCOPE_PROCESS
)) != 0)
114 if (ACE_OS::last_error () == EPERM
)
115 ACE_DEBUG ((LM_DEBUG
,
116 "%s: user is not superuser, "
117 "so remain in time-sharing class\n", argv
[0]));
119 ACE_ERROR ((LM_ERROR
,
120 "%s: ACE_OS::sched_params failed\n", argv
[0]));
123 if (ACE_OS::thr_setprio (min_priority
) == -1)
125 ACE_ERROR ((LM_ERROR
, "(%P|%t) main thr_setprio failed,"
126 "no real-time features\n"));
129 CORBA::Object_var naming_obj
=
130 orb
->resolve_initial_references ("NameService");
132 if (CORBA::is_nil (naming_obj
.in ()))
133 ACE_ERROR_RETURN ((LM_ERROR
,
134 " (%P|%t) Unable to get the Naming Service.\n"),
137 CosNaming::NamingContext_var naming_context
=
138 CosNaming::NamingContext::_narrow (naming_obj
.in ());
140 CosNaming::Name
schedule_name (1);
141 schedule_name
.length (1);
142 schedule_name
[0].id
= CORBA::string_dup ("ScheduleService");
144 CORBA::Object_var sched_obj
=
145 naming_context
->resolve (schedule_name
);
146 if (CORBA::is_nil (sched_obj
.in ()))
148 RtecScheduler::Scheduler_var scheduler
=
149 RtecScheduler::Scheduler::_narrow (sched_obj
.in ());
150 CosNaming::Name
name (1);
152 name
[0].id
= CORBA::string_dup ("EventService");
154 CORBA::Object_var ec_obj
=
155 naming_context
->resolve (name
);
157 RtecEventChannelAdmin::EventChannel_var channel
;
158 if (CORBA::is_nil (ec_obj
.in ()))
159 channel
= RtecEventChannelAdmin::EventChannel::_nil ();
161 channel
= RtecEventChannelAdmin::EventChannel::_narrow (ec_obj
.in ());
163 poa_manager
->activate ();
165 this->connect_suppliers (scheduler
.in (),
168 ACE_DEBUG ((LM_DEBUG
, "connected supplier(s)\n"));
170 this->activate_suppliers ();
172 ACE_DEBUG ((LM_DEBUG
, "suppliers are active\n"));
174 // Wait for the supplier threads...
175 if (ACE_Thread_Manager::instance ()->wait () == -1)
177 ACE_ERROR ((LM_ERROR
, "Thread_Manager wait failed\n"));
181 ACE_DEBUG ((LM_DEBUG
, "suppliers finished\n"));
183 this->dump_results ();
185 this->disconnect_suppliers ();
187 ACE_DEBUG ((LM_DEBUG
, "suppliers disconnected\n"));
189 // @@ Deactivate the suppliers (as CORBA Objects?)
191 root_poa
->destroy (true, true);
195 ACE_DEBUG ((LM_DEBUG
, "orb and poa destroyed\n"));
197 catch (const CORBA::SystemException
& sys_ex
)
199 sys_ex
._tao_print_exception ("SYS_EX");
201 catch (const CORBA::Exception
& ex
)
203 ex
._tao_print_exception ("NON SYS EX");
209 ECT_Supplier_Driver::connect_suppliers
210 (RtecScheduler::Scheduler_ptr scheduler
,
211 RtecEventChannelAdmin::EventChannel_ptr channel
)
213 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
216 ACE_OS::sprintf (buf
, "supplier_%02d", i
);
218 ACE_NEW (this->suppliers_
[i
], Test_Supplier (this));
220 this->suppliers_
[i
]->connect (scheduler
,
233 ECT_Supplier_Driver::activate_suppliers ()
235 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
237 this->suppliers_
[i
]->activate ();
242 ECT_Supplier_Driver::disconnect_suppliers ()
244 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
246 this->suppliers_
[i
]->disconnect ();
248 delete this->suppliers_
[i
];
249 this->suppliers_
[i
] = 0;
254 ECT_Supplier_Driver::dump_results ()
256 ACE_High_Res_Timer::global_scale_factor_type gsf
=
257 ACE_High_Res_Timer::global_scale_factor ();
259 ACE_Throughput_Stats throughput
;
260 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
262 ACE_TCHAR buf
[BUFSIZ
];
263 ACE_OS::sprintf (buf
, ACE_TEXT("supplier_%02d"), i
);
265 this->suppliers_
[i
]->dump_results (buf
, gsf
);
266 this->suppliers_
[i
]->accumulate (throughput
);
268 throughput
.dump_results (ACE_TEXT("ECT_Supplier/totals"), gsf
);
272 ECT_Supplier_Driver::parse_args (int argc
, ACE_TCHAR
*argv
[])
274 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("ds:u:n:t:b:h:p:"));
277 while ((opt
= get_opt ()) != EOF
)
282 this->n_suppliers_
= ACE_OS::atoi (get_opt
.opt_arg ());
286 this->burst_count_
= ACE_OS::atoi (get_opt
.opt_arg ());
290 this->burst_size_
= ACE_OS::atoi (get_opt
.opt_arg ());
294 this->event_size_
= ACE_OS::atoi (get_opt
.opt_arg ());
298 this->burst_pause_
= ACE_OS::atoi (get_opt
.opt_arg ());
304 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
306 this->type_start_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
307 arg
= ACE_OS::strtok_r (0, ",", &aux
);
308 this->type_count_
= ACE_OS::atoi (arg
);
313 this->pid_file_name_
= get_opt
.opt_arg ();
318 ACE_DEBUG ((LM_DEBUG
,
324 "-b <event payload size> "
325 "-t <burst pause (usecs)> "
326 "-h <type_start,type_count> "
327 "-p <pid file name> "
334 if (this->burst_count_
<= 0)
336 ACE_DEBUG ((LM_DEBUG
,
337 "%s: burst count (%d) is out of range, "
338 "reset to default (%d)\n",
339 argv
[0], this->burst_count_
,
341 this->burst_count_
= 100;
344 if (this->burst_size_
<= 0)
346 ACE_DEBUG ((LM_DEBUG
,
347 "%s: burst size (%d) is out of range, "
348 "reset to default (%d)\n",
349 argv
[0], this->burst_size_
,
351 this->burst_size_
= 10;
354 if (this->event_size_
< 0)
356 ACE_DEBUG ((LM_DEBUG
,
357 "%s: event size (%d) is out of range, "
358 "reseting to default (%d)\n",
359 argv
[0], this->event_size_
,
361 this->event_size_
= 128;
364 if (this->n_suppliers_
<= 0)
366 this->n_suppliers_
= 1;
367 ACE_ERROR_RETURN ((LM_ERROR
,
368 "%s: number of suppliers out of range, "
369 "reset to default (%d)\n",