=default for generated implementation copy ctor
[ACE_TAO.git] / TAO / examples / RTScheduling / DT_Creator.cpp
blob0d01b0bbf5b6245c37d34fa6298aaf5c9b9a9d89
1 #include "DT_Creator.h"
2 #include "Thread_Task.h"
3 #include "Task_Stats.h"
4 #include "DT_Creator.h"
6 #include "tao/ORB_Core.h"
7 #include "tao/RTScheduling/Current.h"
9 #include "ace/High_Res_Timer.h"
11 ACE_Atomic_Op<TAO_SYNCH_MUTEX, long> guid_counter;
13 int
14 DT_Creator::dt_task_init (ACE_Arg_Shifter& arg_shifter)
16 static int dt_index = 0;
17 time_t start_time = 0;
18 int load = 0;
19 int iter = 0;
20 int importance = 0;
21 char *job_name = 0;
22 int dist = 0;
23 const ACE_TCHAR* current_arg = 0;
25 if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-Importance")) == 0)
27 arg_shifter.consume_arg ();
28 current_arg = arg_shifter.get_current ();
29 importance = ACE_OS::atoi (current_arg);
30 arg_shifter.consume_arg ();
33 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-Start_Time"))))
35 start_time = ACE_OS::atoi (current_arg);
36 arg_shifter.consume_arg ();
39 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-Iter"))))
41 iter = ACE_OS::atoi (current_arg);
42 arg_shifter.consume_arg ();
45 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-Load"))))
47 load = ACE_OS::atoi (current_arg);
48 arg_shifter.consume_arg ();
51 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-JobName"))))
53 job_name = (char *)current_arg;
54 dist = 1;
55 arg_shifter.consume_arg ();
58 dt_list_ [dt_index++] = this->create_thr_task (importance,
59 start_time,
60 load,
61 iter,
62 dist,
63 job_name);
65 return 0;
68 int log_index = 0;
70 int
71 DT_Creator::init (int argc, ACE_TCHAR *argv [])
73 gsf_ = ACE_High_Res_Timer::global_scale_factor ();
74 state_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>;
75 shutdown_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>;
76 active_dt_count_ = 0;
77 active_job_count_ = 0;
78 ACE_NEW_RETURN (log, char*[BUFSIZ * 100],-1);
80 ACE_Arg_Shifter arg_shifter (argc, argv);
82 const ACE_TCHAR* current_arg = 0;
84 dt_count_ = 0;
85 poa_count_ = 0;
86 int poa_count = 0;
87 job_count_ = 0;
88 int job_count = 0;
89 while (arg_shifter.is_anything_left ())
91 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-GuidSeed"))))
93 guid_counter = (long) ACE_OS::atoi (current_arg);
94 arg_shifter.consume_arg ();
96 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-DT_Count"))))
98 dt_count_ = ACE_OS::atoi (current_arg);
99 ACE_NEW_RETURN (dt_list_, Thread_Task*[dt_count_], -1);
100 arg_shifter.consume_arg ();
102 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-POA_Count"))))
104 poa_count_ = ACE_OS::atoi (current_arg);
105 ACE_NEW_RETURN (poa_list_, POA_Holder*[poa_count_], -1);
106 arg_shifter.consume_arg ();
108 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-JOB_Count"))))
110 job_count_ = ACE_OS::atoi (current_arg);
111 ACE_NEW_RETURN (job_list_, Job_i*[job_count_], -1);
112 arg_shifter.consume_arg ();
114 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-DT_Task")) == 0)
116 arg_shifter.consume_arg ();
117 dt_task_init (arg_shifter);
119 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-POA")) == 0)
121 arg_shifter.consume_arg ();
123 ACE_NEW_RETURN (this->poa_list_[poa_count], POA_Holder (), -1);
125 if (this->poa_list_[poa_count]->init (arg_shifter) == -1)
127 delete this->poa_list_[poa_count];
128 return -1;
130 else
132 poa_count++;
135 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-Job")) == 0)
137 arg_shifter.consume_arg ();
139 ACE_NEW_RETURN (this->job_list_[job_count], Job_i (this), -1);
141 if (this->job_list_[job_count]->init (arg_shifter) == -1)
143 delete this->job_list_[job_count];
144 return -1;
146 else
148 job_count++;
151 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-OutFile"))))
153 file_name_ = current_arg;
154 arg_shifter.consume_arg ();
156 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-LogFile"))))
158 log_file_name_ = current_arg;
159 arg_shifter.consume_arg ();
161 else
163 arg_shifter.ignore_arg ();
167 return 0;
170 void
171 DT_Creator::register_synch_obj ()
173 CosNaming::Name name (1);
174 name.length (1);
176 CosNaming::NamingContext_var synch_context;
180 // Try binding the sender context in the NS
181 name [0].id =
182 CORBA::string_dup ("Synch");
184 synch_context = this->naming_->bind_new_context (name);
187 // We reach here if there was no exception raised in
188 // <bind_new_context>. We then create a receiver context.
192 catch (const CosNaming::NamingContext::AlreadyBound& )
195 // The synch context already exists, probably created by the
196 // receiver(s).
199 // Get the synch context.
200 name [0].id =
201 CORBA::string_dup ("Synch");
203 CORBA::Object_var object =
204 this->naming_->resolve (name);
206 synch_context = CosNaming::NamingContext::_narrow (object.in ());
209 ACE_CString synch_name ("Synch");
210 ACE_Time_Value timestamp = ACE_OS::gettimeofday ();
212 char buf [BUFSIZ];
213 ACE_OS::sprintf(buf, "%lu", static_cast<unsigned long> (timestamp.sec ()));
214 synch_name += buf;
216 name [0].id =
217 CORBA::string_dup (synch_name.c_str ());
219 ACE_DEBUG ((LM_DEBUG,
220 "Synch Name %C\n\n",
221 synch_name.c_str ()));
223 ACE_NEW (synch_,
224 Synch_i);
226 Synch_var synch = synch_->_this ();
228 // Register the synch object with the Synch context.
229 synch_context->rebind (name,
230 synch.in ());
235 DT_Creator::activate_root_poa ()
237 CORBA::Object_var object =
238 this->orb_->resolve_initial_references ("RootPOA");
240 PortableServer::POA_var root_poa =
241 PortableServer::POA::_narrow (object.in ());
243 PortableServer::POAManager_var poa_manager =
244 root_poa->the_POAManager ();
246 poa_manager->activate ();
248 return 0;
251 void
252 DT_Creator::activate_poa_list ()
254 if (TAO_debug_level > 0)
255 ACE_DEBUG ((LM_DEBUG,
256 "DT_Creator::activate_poa_list\n"));
258 if (poa_count_ > 0)
260 CORBA::Object_var object =
261 this->orb_->resolve_initial_references ("RTORB");
263 RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow (object.in ());
265 object = this->orb_->resolve_initial_references ("RootPOA");
267 PortableServer::POA_var root_poa =
268 PortableServer::POA::_narrow (object.in ());
270 for (int i = 0; i < poa_count_; ++i)
272 poa_list_[i]->activate (rt_orb.in(), root_poa.in ());
277 void
278 DT_Creator::activate_job_list ()
280 if (TAO_debug_level > 0)
281 ACE_DEBUG ((LM_DEBUG,
282 "DT_Creator::activate_job_list\n"));
284 Job_i* job;
286 CORBA::Object_var object =
287 this->orb_->resolve_initial_references ("RootPOA");
289 PortableServer::POA_var root_poa =
290 PortableServer::POA::_narrow (object.in ());
292 for (int i = 0; i < job_count_; ++i)
294 ++active_job_count_;
296 job = job_list_[i];
298 if (TAO_debug_level > 0)
299 ACE_DEBUG ((LM_DEBUG, "Activating job:%C\n", job->name ().c_str ()));
301 // find your poa
302 PortableServer::POA_var host_poa =
303 root_poa->find_POA (job->poa ().c_str (), 0);
305 PortableServer::ServantBase_var servant_var (job);
307 // Register with poa.
308 PortableServer::ObjectId_var id;
310 id = host_poa->activate_object (job);
312 CORBA::Object_var server =
313 host_poa->id_to_reference (id.in ());
315 CORBA::String_var ior =
316 orb_->object_to_string (server.in ());
318 const ACE_CString &job_name = job->name ();
320 CosNaming::Name_var name =
321 this->naming_->to_name (job_name.c_str ());
323 this->naming_->rebind (name.in (),
324 server.in ());
325 } /* while */
327 ACE_DEBUG ((LM_DEBUG,
328 "Activated Job List\n"));
331 void
332 DT_Creator::activate_schedule ()
334 if (TAO_debug_level > 0)
335 ACE_DEBUG ((LM_DEBUG,
336 "Activating schedule, task count = %d\n",
337 dt_count_));
339 Thread_Task* task;
341 for (int i = 0; i < dt_count_; ++i)
343 task = dt_list_[i];
345 if (task->dist ())
347 // resolve the object from the naming service
348 CosNaming::Name name (1);
349 name.length (1);
350 name[0].id = CORBA::string_dup (task->job ());
352 CORBA::Object_var obj =
353 this->naming_->resolve (name);
355 Job_var job = Job::_narrow (obj.in ());
357 // if (TAO_debug_level > 0)
358 // {
359 // Check that the object is configured with some
360 // PriorityModelPolicy.
361 CORBA::Policy_var policy =
362 job->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE);
364 RTCORBA::PriorityModelPolicy_var priority_policy =
365 RTCORBA::PriorityModelPolicy::_narrow (policy.in ());
367 if (CORBA::is_nil (priority_policy.in ()))
368 ACE_DEBUG ((LM_DEBUG,
369 "ERROR: Priority Model Policy not exposed!\n"));
370 else
373 RTCORBA::PriorityModel priority_model =
374 priority_policy->priority_model ();
376 if (priority_model == RTCORBA::CLIENT_PROPAGATED)
377 ACE_DEBUG ((LM_DEBUG,
378 "%C priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ()));
379 else
380 ACE_DEBUG ((LM_DEBUG,
381 "%C priority_model = RTCORBA::SERVER_DECLARED\n", task->job ()));
384 //} /* if (TAO_debug_level > 0) */
386 task->job (job.in ());
389 if (TAO_debug_level > 0 && dt_count_ > 0)
390 ACE_DEBUG ((LM_DEBUG,
391 "Activated schedule, task count = %d\n",
392 dt_count_));
396 DT_Creator::resolve_naming_service ()
398 CORBA::Object_var naming_obj =
399 this->orb_->resolve_initial_references ("NameService");
401 // Need to check return value for errors.
402 if (CORBA::is_nil (naming_obj.in ()))
403 ACE_ERROR_RETURN ((LM_ERROR,
404 " (%P|%t) Unable to resolve the Naming Service.\n"),
405 -1);
407 this->naming_ =
408 CosNaming::NamingContextExt::_narrow (naming_obj.in ());
410 return 0;
413 void
414 DT_Creator::create_distributable_threads (RTScheduling::Current_ptr current)
416 long flags;
417 flags = THR_NEW_LWP | THR_JOINABLE;
418 flags |=
419 orb_->orb_core ()->orb_params ()->scope_policy () |
420 orb_->orb_core ()->orb_params ()->sched_policy ();
422 ACE_DEBUG ((LM_DEBUG,
423 "Waiting to Synch\n"));
425 while (!this->synch ()->synched ())
429 this->orb_->perform_work ();
431 catch (const CORBA::Exception &)
433 return;
437 CORBA::Policy_var sched_param;
438 sched_param = this->sched_param (100);
439 const char * name = 0;
440 current->begin_scheduling_segment (name,
441 sched_param.in (),
442 sched_param.in ());
444 ACE_NEW (base_time_,
445 ACE_Time_Value (*(this->synch ()->base_time ())));
447 for (int i = 0; i < this->dt_count_; i++)
449 ACE_Time_Value now (ACE_OS::gettimeofday ());
451 ACE_Time_Value elapsed_time = now - *base_time_;
453 char buf [BUFSIZ];
454 ACE_OS::sprintf (buf, "elapsed time = %d\n now = %d\n base_time_ = %d\n",
455 (int) elapsed_time.sec (),
456 (int) now.sec (),
457 (int) base_time_->sec());
459 log [log_index++] = ACE_OS::strdup (buf) ;
461 ACE_hthread_t curr_thr;
462 ACE_Thread::self (curr_thr);
464 if (dt_list_ [i]->start_time () != 0 &&
465 (elapsed_time.sec () < dt_list_[i]->start_time ()))
467 time_t suspension_time = dt_list_[i]->start_time () - elapsed_time.sec ();
468 ACE_OS::sprintf (buf, "suspension_time = %lu\n",
469 static_cast<unsigned long> (suspension_time));
470 log [log_index++] = ACE_OS::strdup (buf);
471 yield (suspension_time, dt_list_[i]);
474 sched_param = this->sched_param (dt_list_ [i]->importance ());
475 dt_list_ [i]->activate_task (current,
476 sched_param.in (),
477 flags,
478 base_time_);
480 ++active_dt_count_;
483 this->wait ();
485 current->end_scheduling_segment (name);
487 this->check_ifexit ();
490 void
491 DT_Creator::dt_ended ()
494 ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
495 --active_dt_count_;
496 if (TAO_debug_level > 0)
497 ACE_DEBUG ((LM_DEBUG, "Active dt count = %d\n",active_dt_count_));
498 char buf [BUFSIZ];
499 ACE_OS::sprintf (buf,"Active dt count = %d\n",active_dt_count_);
500 log [log_index++] = ACE_OS::strdup (buf);
502 this->check_ifexit ();
505 void
506 DT_Creator::job_ended ()
509 ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
510 --active_job_count_;
511 if (TAO_debug_level > 0)
512 ACE_DEBUG ((LM_DEBUG, "Active job count = %d\n",active_job_count_));
513 char buf [BUFSIZ];
514 ACE_OS::sprintf (buf,"Active job count = %d\n",active_job_count_);
515 log [log_index++] = ACE_OS::strdup (buf);
518 this->check_ifexit ();
521 void
522 DT_Creator::check_ifexit ()
524 if (TAO_debug_level > 0)
525 ACE_DEBUG ((LM_DEBUG,
526 "Checking exit status Job# = %d DT# = %d\n",
527 active_job_count_,
528 active_dt_count_));
530 static int shutdown = 0;
533 ACE_GUARD (ACE_Lock, ace_mon, *shutdown_lock_);
535 if (!shutdown)
537 // All tasks have finished and all jobs have been shutdown.
538 if (active_dt_count_ == 0 && active_job_count_ == 0)
540 ACE_DEBUG ((LM_DEBUG, "Shutdown in progress ...\n"));
543 for (int i = 0; i < dt_count_; i++)
545 dt_list_[i]->dump_stats ();
548 for (int i = 0; i < job_count_; i ++)
550 job_list_[i]->dump_stats ();
553 TASK_STATS::instance ()->dump_samples (file_name_.c_str (),
554 ACE_TEXT("#Schedule Output"));
556 shutdown = 1;
558 FILE* log_file = ACE_OS::fopen (log_file_name_.c_str (), "w");
560 if (log_file != 0)
562 // first dump what the caller has to say.
563 ACE_OS::fprintf (log_file, "Log File\n");
565 for (int i = 0; i < log_index; i++)
567 ACE_OS::fprintf (log_file, "%s\n", log [i]);
570 ACE_OS::fclose (log_file);
572 ACE_DEBUG ((LM_DEBUG,
573 "Log File Ready\n"));
580 DT_Creator::dt_count ()
582 return dt_count_;
585 DT_Creator::DT_Creator ()
586 : dt_list_ (0),
587 poa_list_ (0),
588 job_list_ (0),
589 dt_count_ (0),
590 poa_count_ (0),
591 job_count_ (0),
592 state_lock_ (0),
593 shutdown_lock_ (0),
594 active_dt_count_ (0),
595 active_job_count_ (0),
596 log (0),
597 base_time_ (0),
598 file_name_ (),
599 log_file_name_ (),
600 gsf_ (0),
601 synch_ (0)
605 DT_Creator::~DT_Creator ()
607 for (int i = 0; i < log_index; ++i)
608 ACE_OS::free (log[i]);
609 delete[] log;
611 for (int i = 0; i < this->dt_count_; ++i)
612 delete this->dt_list_[i];
613 delete[] dt_list_;
614 for (int i = 0; i < this->poa_count_; ++i)
615 delete this->poa_list_[i];
616 delete[] poa_list_;
617 delete[] job_list_;
619 delete base_time_;
621 delete state_lock_;
622 delete shutdown_lock_;
626 void
627 DT_Creator::log_msg (char* msg)
629 log [log_index++] = ACE_OS::strdup (msg);
632 CORBA::ORB_ptr
633 DT_Creator::orb ()
635 return this->orb_.in ();
638 void
639 DT_Creator::orb (CORBA::ORB_ptr orb)
641 this->orb_ = CORBA::ORB::_duplicate (orb);
644 ACE_Time_Value*
645 DT_Creator::base_time ()
647 return this->base_time_;
650 void
651 DT_Creator::base_time (ACE_Time_Value* base_time)
653 this->base_time_ = base_time;
657 Synch_i*
658 DT_Creator::synch ()
660 return this->synch_.in ();