Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / examples / RTScheduling / DT_Creator.cpp
blobd1ee3959cec23ca0f3703d82fa6c8aa151cf45b4
1 #include "DT_Creator.h"
2 #include "Thread_Task.h"
3 #include "Task_Stats.h"
4 #include "DT_Creator.h"
6 #include "tao/ORB_Core.h"
7 #include "tao/RTScheduling/Current.h"
9 #include "ace/High_Res_Timer.h"
11 ACE_Atomic_Op<TAO_SYNCH_MUTEX, long> guid_counter;
13 int
14 DT_Creator::dt_task_init (ACE_Arg_Shifter& arg_shifter)
16 static int dt_index = 0;
17 time_t start_time = 0;
18 int load = 0;
19 int iter = 0;
20 int importance = 0;
21 char *job_name = 0;
22 int dist = 0;
23 const ACE_TCHAR* current_arg = 0;
25 if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-Importance")) == 0)
27 arg_shifter.consume_arg ();
28 current_arg = arg_shifter.get_current ();
29 importance = ACE_OS::atoi (current_arg);
30 arg_shifter.consume_arg ();
33 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-Start_Time"))))
35 start_time = ACE_OS::atoi (current_arg);
36 arg_shifter.consume_arg ();
39 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-Iter"))))
41 iter = ACE_OS::atoi (current_arg);
42 arg_shifter.consume_arg ();
45 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-Load"))))
47 load = ACE_OS::atoi (current_arg);
48 arg_shifter.consume_arg ();
51 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-JobName"))))
53 job_name = (char *)current_arg;
54 dist = 1;
55 arg_shifter.consume_arg ();
58 dt_list_ [dt_index++] = this->create_thr_task (importance,
59 start_time,
60 load,
61 iter,
62 dist,
63 job_name);
65 return 0;
68 int log_index = 0;
70 int
71 DT_Creator::init (int argc, ACE_TCHAR *argv [])
73 gsf_ = ACE_High_Res_Timer::global_scale_factor ();
74 state_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>;
75 shutdown_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>;
76 active_dt_count_ = 0;
77 active_job_count_ = 0;
78 ACE_NEW_RETURN (log, char*[BUFSIZ * 100],-1);
80 ACE_Arg_Shifter arg_shifter (argc, argv);
82 const ACE_TCHAR* current_arg = 0;
84 dt_count_ = 0;
85 poa_count_ = 0;
86 int poa_count = 0;
87 job_count_ = 0;
88 int job_count = 0;
89 while (arg_shifter.is_anything_left ())
91 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-GuidSeed"))))
93 guid_counter = (long) ACE_OS::atoi (current_arg);
94 arg_shifter.consume_arg ();
96 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-DT_Count"))))
98 dt_count_ = ACE_OS::atoi (current_arg);
99 ACE_NEW_RETURN (dt_list_, Thread_Task*[dt_count_], -1);
100 arg_shifter.consume_arg ();
102 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-POA_Count"))))
104 poa_count_ = ACE_OS::atoi (current_arg);
105 ACE_NEW_RETURN (poa_list_, POA_Holder*[poa_count_], -1);
106 arg_shifter.consume_arg ();
108 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-JOB_Count"))))
110 job_count_ = ACE_OS::atoi (current_arg);
111 ACE_NEW_RETURN (job_list_, Job_i*[job_count_], -1);
112 arg_shifter.consume_arg ();
114 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-DT_Task")) == 0)
116 arg_shifter.consume_arg ();
117 dt_task_init (arg_shifter);
119 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-POA")) == 0)
121 arg_shifter.consume_arg ();
123 ACE_NEW_RETURN (this->poa_list_[poa_count], POA_Holder (), -1);
125 if (this->poa_list_[poa_count]->init (arg_shifter) == -1)
127 delete this->poa_list_[poa_count];
128 return -1;
130 else
132 poa_count++;
135 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-Job")) == 0)
137 arg_shifter.consume_arg ();
139 ACE_NEW_RETURN (this->job_list_[job_count], Job_i (this), -1);
141 if (this->job_list_[job_count]->init (arg_shifter) == -1)
143 delete this->job_list_[job_count];
144 return -1;
146 else
148 job_count++;
151 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-OutFile"))))
153 file_name_ = current_arg;
154 arg_shifter.consume_arg ();
156 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-LogFile"))))
158 log_file_name_ = current_arg;
159 arg_shifter.consume_arg ();
161 else
163 arg_shifter.ignore_arg ();
167 return 0;
170 void
171 DT_Creator::register_synch_obj (void)
173 CosNaming::Name name (1);
174 name.length (1);
176 CosNaming::NamingContext_var synch_context;
180 // Try binding the sender context in the NS
181 name [0].id =
182 CORBA::string_dup ("Synch");
184 synch_context = this->naming_->bind_new_context (name);
187 // We reach here if there was no exception raised in
188 // <bind_new_context>. We then create a receiver context.
192 catch (const CosNaming::NamingContext::AlreadyBound& )
195 // The synch context already exists, probably created by the
196 // receiver(s).
199 // Get the synch context.
200 name [0].id =
201 CORBA::string_dup ("Synch");
203 CORBA::Object_var object =
204 this->naming_->resolve (name);
206 synch_context = CosNaming::NamingContext::_narrow (object.in ());
210 ACE_CString synch_name ("Synch");
211 ACE_Time_Value timestamp = ACE_OS::gettimeofday ();
213 char buf [BUFSIZ];
214 ACE_OS::sprintf(buf, "%lu", static_cast<unsigned long> (timestamp.sec ()));
215 synch_name += buf;
217 name [0].id =
218 CORBA::string_dup (synch_name.c_str ());
220 ACE_DEBUG ((LM_DEBUG,
221 "Synch Name %C\n\n",
222 synch_name.c_str ()));
224 ACE_NEW (synch_,
225 Synch_i);
227 Synch_var synch = synch_->_this ();
229 // Register the synch object with the Synch context.
230 synch_context->rebind (name,
231 synch.in ());
237 DT_Creator::activate_root_poa (void)
239 CORBA::Object_var object =
240 this->orb_->resolve_initial_references ("RootPOA");
242 PortableServer::POA_var root_poa =
243 PortableServer::POA::_narrow (object.in ());
245 PortableServer::POAManager_var poa_manager =
246 root_poa->the_POAManager ();
248 poa_manager->activate ();
250 return 0;
253 void
254 DT_Creator::activate_poa_list (void)
256 if (TAO_debug_level > 0)
257 ACE_DEBUG ((LM_DEBUG,
258 "DT_Creator::activate_poa_list\n"));
260 if (poa_count_ > 0)
262 CORBA::Object_var object =
263 this->orb_->resolve_initial_references ("RTORB");
265 RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow (object.in ());
267 object = this->orb_->resolve_initial_references ("RootPOA");
269 PortableServer::POA_var root_poa =
270 PortableServer::POA::_narrow (object.in ());
272 for (int i = 0; i < poa_count_; ++i)
274 poa_list_[i]->activate (rt_orb.in(), root_poa.in ());
279 void
280 DT_Creator::activate_job_list (void)
283 if (TAO_debug_level > 0)
284 ACE_DEBUG ((LM_DEBUG,
285 "DT_Creator::activate_job_list\n"));
287 Job_i* job;
289 CORBA::Object_var object =
290 this->orb_->resolve_initial_references ("RootPOA");
292 PortableServer::POA_var root_poa =
293 PortableServer::POA::_narrow (object.in ());
295 for (int i = 0; i < job_count_; ++i)
297 ++active_job_count_;
299 job = job_list_[i];
301 if (TAO_debug_level > 0)
302 ACE_DEBUG ((LM_DEBUG, "Activating job:%C\n", job->name ().c_str ()));
304 // find your poa
305 PortableServer::POA_var host_poa =
306 root_poa->find_POA (job->poa ().c_str (), 0);
308 PortableServer::ServantBase_var servant_var (job);
310 // Register with poa.
311 PortableServer::ObjectId_var id;
313 id = host_poa->activate_object (job);
315 CORBA::Object_var server =
316 host_poa->id_to_reference (id.in ());
318 CORBA::String_var ior =
319 orb_->object_to_string (server.in ());
321 const ACE_CString &job_name = job->name ();
323 CosNaming::Name_var name =
324 this->naming_->to_name (job_name.c_str ());
326 this->naming_->rebind (name.in (),
327 server.in ());
330 } /* while */
332 ACE_DEBUG ((LM_DEBUG,
333 "Activated Job List\n"));
336 void
337 DT_Creator::activate_schedule (void)
339 if (TAO_debug_level > 0)
340 ACE_DEBUG ((LM_DEBUG,
341 "Activating schedule, task count = %d\n",
342 dt_count_));
344 Thread_Task* task;
346 for (int i = 0; i < dt_count_; ++i)
348 task = dt_list_[i];
350 if (task->dist ())
352 // resolve the object from the naming service
353 CosNaming::Name name (1);
354 name.length (1);
355 name[0].id = CORBA::string_dup (task->job ());
357 CORBA::Object_var obj =
358 this->naming_->resolve (name);
360 Job_var job = Job::_narrow (obj.in ());
362 // if (TAO_debug_level > 0)
363 // {
364 // Check that the object is configured with some
365 // PriorityModelPolicy.
366 CORBA::Policy_var policy =
367 job->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE);
369 RTCORBA::PriorityModelPolicy_var priority_policy =
370 RTCORBA::PriorityModelPolicy::_narrow (policy.in ());
372 if (CORBA::is_nil (priority_policy.in ()))
373 ACE_DEBUG ((LM_DEBUG,
374 "ERROR: Priority Model Policy not exposed!\n"));
375 else
378 RTCORBA::PriorityModel priority_model =
379 priority_policy->priority_model ();
381 if (priority_model == RTCORBA::CLIENT_PROPAGATED)
382 ACE_DEBUG ((LM_DEBUG,
383 "%C priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ()));
384 else
385 ACE_DEBUG ((LM_DEBUG,
386 "%C priority_model = RTCORBA::SERVER_DECLARED\n", task->job ()));
389 //} /* if (TAO_debug_level > 0) */
391 task->job (job.in ());
394 if (TAO_debug_level > 0 && dt_count_ > 0)
395 ACE_DEBUG ((LM_DEBUG,
396 "Activated schedule, task count = %d\n",
397 dt_count_));
402 DT_Creator::resolve_naming_service (void)
404 CORBA::Object_var naming_obj =
405 this->orb_->resolve_initial_references ("NameService");
407 // Need to check return value for errors.
408 if (CORBA::is_nil (naming_obj.in ()))
409 ACE_ERROR_RETURN ((LM_ERROR,
410 " (%P|%t) Unable to resolve the Naming Service.\n"),
411 -1);
413 this->naming_ =
414 CosNaming::NamingContextExt::_narrow (naming_obj.in ());
416 return 0;
419 void
420 DT_Creator::create_distributable_threads (RTScheduling::Current_ptr current)
422 long flags;
423 flags = THR_NEW_LWP | THR_JOINABLE;
424 flags |=
425 orb_->orb_core ()->orb_params ()->scope_policy () |
426 orb_->orb_core ()->orb_params ()->sched_policy ();
428 ACE_DEBUG ((LM_DEBUG,
429 "Waiting to Synch\n"));
431 while (!this->synch ()->synched ())
435 this->orb_->perform_work ();
437 catch (const CORBA::Exception &)
439 return;
443 CORBA::Policy_var sched_param;
444 sched_param = this->sched_param (100);
445 const char * name = 0;
446 current->begin_scheduling_segment (name,
447 sched_param.in (),
448 sched_param.in ());
450 ACE_NEW (base_time_,
451 ACE_Time_Value (*(this->synch ()->base_time ())));
453 for (int i = 0; i < this->dt_count_; i++)
455 ACE_Time_Value now (ACE_OS::gettimeofday ());
457 ACE_Time_Value elapsed_time = now - *base_time_;
459 char buf [BUFSIZ];
460 ACE_OS::sprintf (buf, "elapsed time = %d\n now = %d\n base_time_ = %d\n",
461 (int) elapsed_time.sec (),
462 (int) now.sec (),
463 (int) base_time_->sec());
465 log [log_index++] = ACE_OS::strdup (buf) ;
467 ACE_hthread_t curr_thr;
468 ACE_Thread::self (curr_thr);
470 if (dt_list_ [i]->start_time () != 0 &&
471 (elapsed_time.sec () < dt_list_[i]->start_time ()))
473 time_t suspension_time = dt_list_[i]->start_time () - elapsed_time.sec ();
474 ACE_OS::sprintf (buf, "suspension_time = %lu\n",
475 static_cast<unsigned long> (suspension_time));
476 log [log_index++] = ACE_OS::strdup (buf);
477 yield (suspension_time, dt_list_[i]);
480 sched_param = this->sched_param (dt_list_ [i]->importance ());
481 dt_list_ [i]->activate_task (current,
482 sched_param.in (),
483 flags,
484 base_time_);
486 ++active_dt_count_;
489 this->wait ();
491 current->end_scheduling_segment (name);
493 this->check_ifexit ();
496 void
497 DT_Creator::dt_ended (void)
500 ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
501 --active_dt_count_;
502 if (TAO_debug_level > 0)
503 ACE_DEBUG ((LM_DEBUG, "Active dt count = %d\n",active_dt_count_));
504 char buf [BUFSIZ];
505 ACE_OS::sprintf (buf,"Active dt count = %d\n",active_dt_count_);
506 log [log_index++] = ACE_OS::strdup (buf);
508 this->check_ifexit ();
511 void
512 DT_Creator::job_ended (void)
515 ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
516 --active_job_count_;
517 if (TAO_debug_level > 0)
518 ACE_DEBUG ((LM_DEBUG, "Active job count = %d\n",active_job_count_));
519 char buf [BUFSIZ];
520 ACE_OS::sprintf (buf,"Active job count = %d\n",active_job_count_);
521 log [log_index++] = ACE_OS::strdup (buf);
524 this->check_ifexit ();
527 void
528 DT_Creator::check_ifexit (void)
530 if (TAO_debug_level > 0)
531 ACE_DEBUG ((LM_DEBUG,
532 "Checking exit status Job# = %d DT# = %d\n",
533 active_job_count_,
534 active_dt_count_));
536 static int shutdown = 0;
539 ACE_GUARD (ACE_Lock, ace_mon, *shutdown_lock_);
541 if (!shutdown)
543 // All tasks have finished and all jobs have been shutdown.
544 if (active_dt_count_ == 0 && active_job_count_ == 0)
546 ACE_DEBUG ((LM_DEBUG, "Shutdown in progress ...\n"));
549 for (int i = 0; i < dt_count_; i++)
551 dt_list_[i]->dump_stats ();
554 for (int i = 0; i < job_count_; i ++)
556 job_list_[i]->dump_stats ();
559 TASK_STATS::instance ()->dump_samples (file_name_.c_str (),
560 ACE_TEXT("#Schedule Output"));
562 shutdown = 1;
564 FILE* log_file = ACE_OS::fopen (log_file_name_.c_str (), "w");
566 if (log_file != 0)
568 // first dump what the caller has to say.
569 ACE_OS::fprintf (log_file, "Log File\n");
571 for (int i = 0; i < log_index; i++)
573 ACE_OS::fprintf (log_file, "%s\n", log [i]);
576 ACE_OS::fclose (log_file);
578 ACE_DEBUG ((LM_DEBUG,
579 "Log File Ready\n"));
587 DT_Creator::dt_count (void)
589 return dt_count_;
592 DT_Creator::DT_Creator (void)
593 : dt_list_ (0),
594 poa_list_ (0),
595 job_list_ (0),
596 dt_count_ (0),
597 poa_count_ (0),
598 job_count_ (0),
599 state_lock_ (0),
600 shutdown_lock_ (0),
601 active_dt_count_ (0),
602 active_job_count_ (0),
603 log (0),
604 base_time_ (0),
605 file_name_ (),
606 log_file_name_ (),
607 gsf_ (0),
608 synch_ (0)
612 DT_Creator::~DT_Creator (void)
614 for (int i = 0; i < log_index; ++i)
615 ACE_OS::free (log[i]);
616 delete[] log;
618 for (int i = 0; i < this->dt_count_; ++i)
619 delete this->dt_list_[i];
620 delete[] dt_list_;
621 for (int i = 0; i < this->poa_count_; ++i)
622 delete this->poa_list_[i];
623 delete[] poa_list_;
624 delete[] job_list_;
626 delete base_time_;
628 delete state_lock_;
629 delete shutdown_lock_;
633 void
634 DT_Creator::log_msg (char* msg)
636 log [log_index++] = ACE_OS::strdup (msg);
639 CORBA::ORB_ptr
640 DT_Creator::orb (void)
642 return this->orb_.in ();
645 void
646 DT_Creator::orb (CORBA::ORB_ptr orb)
648 this->orb_ = CORBA::ORB::_duplicate (orb);
651 ACE_Time_Value*
652 DT_Creator::base_time (void)
654 return this->base_time_;
657 void
658 DT_Creator::base_time (ACE_Time_Value* base_time)
660 this->base_time_ = base_time;
664 Synch_i*
665 DT_Creator::synch (void)
667 return this->synch_.in ();