Merge pull request #2303 from jwillemsen/jwi-803
[ACE_TAO.git] / TAO / examples / Kokyu_dsrt_schedulers / muf_example / client.cpp
blobc550dcc65f40f7b88e34ea6b6308ce934f967007
1 #include "ace/Get_Opt.h"
2 #include "ace/Task.h"
3 #include "ace/High_Res_Timer.h"
4 #include "ace/OS_NS_unistd.h"
5 #include "tao/RTScheduling/RTScheduler_Manager.h"
6 #include "testC.h"
7 #include "MUF_Scheduler.h"
8 #include "orbsvcs/Time_Utilities.h"
10 const ACE_TCHAR *ior = ACE_TEXT("file://test.ior");
11 int niterations = 5;
12 int do_shutdown = 0;
13 int enable_dynamic_scheduling = 0;
14 int enable_yield = 1;
16 /**
17 * Run a server thread
19 * Use the ACE_Task_Base class to run server threads
21 class Worker : public ACE_Task_Base
23 public:
24 Worker (CORBA::ORB_ptr orb,
25 Simple_Server_ptr server_ptr,
26 RTScheduling::Current_ptr current,
27 MUF_Scheduler* scheduler,
28 TimeBase::TimeT deadline,
29 TimeBase::TimeT estimated_initial_execution_time,
30 long criticality,
31 CORBA::Long server_load);
32 // int sleep_time);
33 // ctor
35 virtual int svc ();
36 // The thread entry point.
38 private:
39 CORBA::ORB_var orb_;
40 // The orb
42 Simple_Server_var server_;
43 RTScheduling::Current_var scheduler_current_;
44 MUF_Scheduler* scheduler_;
45 TimeBase::TimeT deadline_;
46 TimeBase::TimeT estimated_initial_execution_time_;
47 long criticality_;
48 CORBA::Long server_load_;
49 int sleep_time_;
52 int
53 parse_args (int argc, ACE_TCHAR *argv[])
55 ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("xk:i:ds"));
56 int c;
58 while ((c = get_opts ()) != -1)
59 switch (c)
61 case 'x':
62 do_shutdown = 1;
63 break;
65 case 'k':
66 ior = get_opts.opt_arg ();
67 break;
69 case 'i':
70 niterations = ACE_OS::atoi (get_opts.opt_arg ());
71 break;
73 case 'd':
74 enable_dynamic_scheduling = 1;
75 break;
77 case 's':
78 enable_yield = 0;
79 break;
81 case '?':
82 default:
83 ACE_ERROR_RETURN ((LM_ERROR,
84 "usage: %s "
85 "-k <ior> "
86 "-i <niterations> "
87 "-d (enable dynamic scheduling)"
88 "-s (disable yield)"
89 "\n",
90 argv [0]),
91 -1);
93 // Indicates successful parsing of the command line
94 return 0;
97 int
98 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
100 MUF_Scheduler* scheduler=0;
101 RTScheduling::Current_var current;
102 int prio;
103 int max_prio;
104 ACE_Sched_Params::Policy sched_policy = ACE_SCHED_RR;
105 int sched_scope = ACE_SCOPE_THREAD;
106 long flags;
108 if (sched_policy == ACE_SCHED_RR)
109 flags = THR_NEW_LWP | THR_BOUND | THR_JOINABLE | THR_SCHED_RR;
110 else
111 flags = THR_NEW_LWP | THR_BOUND | THR_JOINABLE | THR_SCHED_FIFO;
113 ACE_hthread_t main_thr_handle;
114 ACE_Thread::self (main_thr_handle);
116 max_prio = ACE_Sched_Params::priority_max (sched_policy,
117 sched_scope);
119 //FUZZ: disable check_for_lack_ACE_OS
120 ACE_Sched_Params sched_params (sched_policy, max_prio);
121 //FUZZ: enable check_for_lack_ACE_OS
123 ACE_OS::sched_params (sched_params);
125 if (ACE_Thread::getprio (main_thr_handle, prio) == -1)
127 if (errno == ENOTSUP)
129 ACE_ERROR((LM_ERROR,
130 ACE_TEXT ("getprio not supported\n")));
132 else
134 ACE_ERROR ((LM_ERROR,
135 ACE_TEXT ("%p\n")
136 ACE_TEXT ("thr_getprio failed")));
140 ACE_DEBUG ((LM_DEBUG, "(%t): main thread prio is %d\n", prio));
144 RTScheduling::Scheduler_var sched_owner;
146 CORBA::ORB_var orb =
147 CORBA::ORB_init (argc, argv);
149 if (parse_args (argc, argv) != 0)
150 return 1;
152 CORBA::Object_var object =
153 orb->string_to_object (ior);
155 Simple_Server_var server =
156 Simple_Server::_narrow (object.in ());
158 if (CORBA::is_nil (server.in ()))
160 ACE_ERROR_RETURN ((LM_ERROR,
161 "Object reference <%s> is nil.\n",
162 ior),
166 if (enable_dynamic_scheduling)
168 ACE_DEBUG ((LM_DEBUG, "Dyn Sched enabled\n"));
169 CORBA::Object_var manager_obj =
170 orb->resolve_initial_references ("RTSchedulerManager");
172 TAO_RTScheduler_Manager_var manager =
173 TAO_RTScheduler_Manager::_narrow (manager_obj.in ());
175 Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type;
176 if (enable_yield)
178 disp_impl_type = Kokyu::DSRT_CV_BASED;
180 else
182 disp_impl_type = Kokyu::DSRT_OS_BASED;
185 ACE_NEW_RETURN (scheduler,
186 MUF_Scheduler (orb.in (),
187 disp_impl_type,
188 sched_policy,
189 sched_scope), -1);
190 sched_owner = scheduler;
192 manager->rtscheduler (scheduler);
194 CORBA::Object_var object =
195 orb->resolve_initial_references ("RTScheduler_Current");
197 current =
198 RTScheduling::Current::_narrow (object.in ());
201 TimeBase::TimeT deadline;
202 TimeBase::TimeT exec_time;
203 int criticality=0;
205 ORBSVCS_Time::Time_Value_to_TimeT (deadline,
206 ACE_OS::gettimeofday () +
207 ACE_Time_Value (50,0) );
209 ORBSVCS_Time::Time_Value_to_TimeT (exec_time,
210 ACE_OS::gettimeofday () +
211 ACE_Time_Value (10,0) );
213 Worker worker1 (orb.in (),
214 server.in (),
215 current.in (),
216 scheduler,
217 deadline,
218 exec_time,
219 criticality,
220 30);
222 if (worker1.activate (flags, 1, 0, max_prio) != 0)
224 ACE_ERROR ((LM_ERROR,
225 "(%t|%T) cannot activate worker thread.\n"));
228 ACE_OS::sleep(2);
230 ORBSVCS_Time::Time_Value_to_TimeT (deadline,
231 ACE_OS::gettimeofday () +
232 ACE_Time_Value (30,0) );
234 ORBSVCS_Time::Time_Value_to_TimeT (exec_time,
235 ACE_OS::gettimeofday () +
236 ACE_Time_Value (10,0) );
238 criticality = 0;
239 Worker worker2 (orb.in (),
240 server.in (),
241 current.in (),
242 scheduler,
243 deadline,
244 exec_time,
245 criticality,
246 10);
248 if (worker2.activate (flags, 1, 0, max_prio) != 0)
250 ACE_ERROR ((LM_ERROR,
251 "(%t|%T) cannot activate scheduler thread in RT mode.\n"));
254 ORBSVCS_Time::Time_Value_to_TimeT (deadline,
255 ACE_OS::gettimeofday () +
256 ACE_Time_Value (100,0) );
258 ORBSVCS_Time::Time_Value_to_TimeT (exec_time,
259 ACE_OS::gettimeofday () +
260 ACE_Time_Value (10,0) );
261 criticality = 1;
263 Worker worker3 (orb.in (),
264 server.in (),
265 current.in (),
266 scheduler,
267 deadline,
268 exec_time,
269 criticality,
271 if (worker3.activate (flags, 1, 0, max_prio) != 0)
273 ACE_ERROR ((LM_ERROR,
274 "(%t|%T) cannot activate scheduler thread in RT mode.\n"));
277 worker1.wait ();
278 worker2.wait ();
279 worker3.wait ();
281 ACE_DEBUG ((LM_DEBUG,
282 "(%t): wait for worker threads done in main thread\n"));
284 if (do_shutdown)
286 if (enable_dynamic_scheduling)
288 MUF_Scheduling::SchedulingParameter sched_param;
289 sched_param.criticality = 0;
290 sched_param.deadline = 0;
291 sched_param.estimated_initial_execution_time = 0;
292 CORBA::Policy_var sched_param_policy =
293 scheduler->create_scheduling_parameter (sched_param);
294 CORBA::Policy_ptr implicit_sched_param = 0;
295 current->begin_scheduling_segment (0,
296 sched_param_policy.in (),
297 implicit_sched_param);
300 ACE_DEBUG ((LM_DEBUG, "(%t): about to call server shutdown\n"));
301 server->shutdown ();
303 ACE_DEBUG ((LM_DEBUG, "after shutdown call in main thread\n"));
306 if (enable_dynamic_scheduling)
308 current->end_scheduling_segment (0);
312 scheduler->shutdown ();
313 ACE_DEBUG ((LM_DEBUG, "scheduler shutdown done\n"));
315 orb->destroy ();
317 catch (const CORBA::Exception& ex)
319 ex._tao_print_exception ("Exception caught:");
320 return 1;
323 return 0;
326 // ****************************************************************
328 Worker::Worker (CORBA::ORB_ptr orb,
329 Simple_Server_ptr server_ptr,
330 RTScheduling::Current_ptr current,
331 MUF_Scheduler* scheduler,
332 TimeBase::TimeT deadline,
333 TimeBase::TimeT estimated_initial_execution_time,
334 long criticality,
335 CORBA::Long server_load)
336 // int sleep_time)
337 : orb_ (CORBA::ORB::_duplicate (orb)),
338 server_ (Simple_Server::_duplicate (server_ptr)),
339 scheduler_current_ (RTScheduling::Current::_duplicate (current)),
340 scheduler_ (scheduler),
341 deadline_ (deadline),
342 estimated_initial_execution_time_ ( estimated_initial_execution_time),
343 criticality_ (criticality),
344 server_load_ (server_load)
345 // sleep_time_ (sleep_time)
350 Worker::svc ()
352 const char * name = 0;
354 ACE_DEBUG ((LM_DEBUG, "(%t|%T):about to sleep for %d sec\n", sleep_time_));
355 ACE_OS::sleep (sleep_time_);
356 ACE_DEBUG ((LM_DEBUG, "(%t|%T):woke up from sleep for %d sec\n", sleep_time_));
358 ACE_hthread_t thr_handle;
359 ACE_Thread::self (thr_handle);
360 int prio;
362 if (ACE_Thread::getprio (thr_handle, prio) == -1)
364 if (errno == ENOTSUP)
366 ACE_ERROR((LM_ERROR,
367 ACE_TEXT ("getprio not supported\n")));
369 else
371 ACE_ERROR ((LM_ERROR,
372 ACE_TEXT ("%p\n")
373 ACE_TEXT ("thr_getprio failed")));
377 ACE_DEBUG ((LM_DEBUG, "(%t|%T) worker activated with prio %d\n", prio));
379 if (enable_dynamic_scheduling)
381 MUF_Scheduling::SchedulingParameter sched_param;
382 CORBA::Policy_var sched_param_policy;
383 sched_param.criticality = criticality_;
384 sched_param.deadline = deadline_;
385 sched_param.estimated_initial_execution_time = 0;
386 sched_param_policy = scheduler_->create_scheduling_parameter (sched_param);
387 CORBA::Policy_ptr implicit_sched_param = 0;
388 ACE_DEBUG ((LM_DEBUG, "(%t|%T):before begin_sched_segment\n"));
389 scheduler_current_->begin_scheduling_segment (name,
390 sched_param_policy.in (),
391 implicit_sched_param);
392 ACE_DEBUG ((LM_DEBUG, "(%t|%T):after begin_sched_segment\n"));
395 ACE_DEBUG ((LM_DEBUG, "(%t|%T):about to make two way call\n"));
396 server_->test_method (server_load_);
397 ACE_DEBUG ((LM_DEBUG, "(%t|%T):two way call done\n"));
399 if (enable_dynamic_scheduling)
401 scheduler_current_->end_scheduling_segment (name);
404 ACE_DEBUG ((LM_DEBUG, "client worker thread (%t) done\n"));
406 return 0;