1 #include "DT_Creator.h"
2 #include "Thread_Task.h"
3 #include "Task_Stats.h"
4 #include "DT_Creator.h"
6 #include "tao/ORB_Core.h"
7 #include "tao/RTScheduling/Current.h"
9 #include "ace/High_Res_Timer.h"
11 ACE_Atomic_Op
<TAO_SYNCH_MUTEX
, long> guid_counter
;
14 DT_Creator::dt_task_init (ACE_Arg_Shifter
& arg_shifter
)
16 static int dt_index
= 0;
17 time_t start_time
= 0;
23 const ACE_TCHAR
* current_arg
= 0;
25 if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-Importance")) == 0)
27 arg_shifter
.consume_arg ();
28 current_arg
= arg_shifter
.get_current ();
29 importance
= ACE_OS::atoi (current_arg
);
30 arg_shifter
.consume_arg ();
33 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Start_Time"))))
35 start_time
= ACE_OS::atoi (current_arg
);
36 arg_shifter
.consume_arg ();
39 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Iter"))))
41 iter
= ACE_OS::atoi (current_arg
);
42 arg_shifter
.consume_arg ();
45 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Load"))))
47 load
= ACE_OS::atoi (current_arg
);
48 arg_shifter
.consume_arg ();
51 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-JobName"))))
53 job_name
= (char *)current_arg
;
55 arg_shifter
.consume_arg ();
58 dt_list_
[dt_index
++] = this->create_thr_task (importance
,
71 DT_Creator::init (int argc
, ACE_TCHAR
*argv
[])
73 gsf_
= ACE_High_Res_Timer::global_scale_factor ();
74 state_lock_
= new ACE_Lock_Adapter
<TAO_SYNCH_MUTEX
>;
75 shutdown_lock_
= new ACE_Lock_Adapter
<TAO_SYNCH_MUTEX
>;
77 active_job_count_
= 0;
78 ACE_NEW_RETURN (log
, char*[BUFSIZ
* 100],-1);
80 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
82 const ACE_TCHAR
* current_arg
= 0;
89 while (arg_shifter
.is_anything_left ())
91 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-GuidSeed"))))
93 guid_counter
= (long) ACE_OS::atoi (current_arg
);
94 arg_shifter
.consume_arg ();
96 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-DT_Count"))))
98 dt_count_
= ACE_OS::atoi (current_arg
);
99 ACE_NEW_RETURN (dt_list_
, Thread_Task
*[dt_count_
], -1);
100 arg_shifter
.consume_arg ();
102 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-POA_Count"))))
104 poa_count_
= ACE_OS::atoi (current_arg
);
105 ACE_NEW_RETURN (poa_list_
, POA_Holder
*[poa_count_
], -1);
106 arg_shifter
.consume_arg ();
108 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-JOB_Count"))))
110 job_count_
= ACE_OS::atoi (current_arg
);
111 ACE_NEW_RETURN (job_list_
, Job_i
*[job_count_
], -1);
112 arg_shifter
.consume_arg ();
114 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-DT_Task")) == 0)
116 arg_shifter
.consume_arg ();
117 dt_task_init (arg_shifter
);
119 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-POA")) == 0)
121 arg_shifter
.consume_arg ();
123 ACE_NEW_RETURN (this->poa_list_
[poa_count
], POA_Holder (), -1);
125 if (this->poa_list_
[poa_count
]->init (arg_shifter
) == -1)
127 delete this->poa_list_
[poa_count
];
135 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-Job")) == 0)
137 arg_shifter
.consume_arg ();
139 ACE_NEW_RETURN (this->job_list_
[job_count
], Job_i (this), -1);
141 if (this->job_list_
[job_count
]->init (arg_shifter
) == -1)
143 delete this->job_list_
[job_count
];
151 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-OutFile"))))
153 file_name_
= current_arg
;
154 arg_shifter
.consume_arg ();
156 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-LogFile"))))
158 log_file_name_
= current_arg
;
159 arg_shifter
.consume_arg ();
163 arg_shifter
.ignore_arg ();
171 DT_Creator::register_synch_obj (void)
173 CosNaming::Name
name (1);
176 CosNaming::NamingContext_var synch_context
;
180 // Try binding the sender context in the NS
182 CORBA::string_dup ("Synch");
184 synch_context
= this->naming_
->bind_new_context (name
);
187 // We reach here if there was no exception raised in
188 // <bind_new_context>. We then create a receiver context.
192 catch (const CosNaming::NamingContext::AlreadyBound
& )
195 // The synch context already exists, probably created by the
199 // Get the synch context.
201 CORBA::string_dup ("Synch");
203 CORBA::Object_var object
=
204 this->naming_
->resolve (name
);
206 synch_context
= CosNaming::NamingContext::_narrow (object
.in ());
210 ACE_CString
synch_name ("Synch");
211 ACE_Time_Value timestamp
= ACE_OS::gettimeofday ();
214 ACE_OS::sprintf(buf
, "%lu", static_cast<unsigned long> (timestamp
.sec ()));
218 CORBA::string_dup (synch_name
.c_str ());
220 ACE_DEBUG ((LM_DEBUG
,
222 synch_name
.c_str ()));
227 Synch_var synch
= synch_
->_this ();
229 // Register the synch object with the Synch context.
230 synch_context
->rebind (name
,
237 DT_Creator::activate_root_poa (void)
239 CORBA::Object_var object
=
240 this->orb_
->resolve_initial_references ("RootPOA");
242 PortableServer::POA_var root_poa
=
243 PortableServer::POA::_narrow (object
.in ());
245 PortableServer::POAManager_var poa_manager
=
246 root_poa
->the_POAManager ();
248 poa_manager
->activate ();
254 DT_Creator::activate_poa_list (void)
256 if (TAO_debug_level
> 0)
257 ACE_DEBUG ((LM_DEBUG
,
258 "DT_Creator::activate_poa_list\n"));
262 CORBA::Object_var object
=
263 this->orb_
->resolve_initial_references ("RTORB");
265 RTCORBA::RTORB_var rt_orb
= RTCORBA::RTORB::_narrow (object
.in ());
267 object
= this->orb_
->resolve_initial_references ("RootPOA");
269 PortableServer::POA_var root_poa
=
270 PortableServer::POA::_narrow (object
.in ());
272 for (int i
= 0; i
< poa_count_
; ++i
)
274 poa_list_
[i
]->activate (rt_orb
.in(), root_poa
.in ());
280 DT_Creator::activate_job_list (void)
283 if (TAO_debug_level
> 0)
284 ACE_DEBUG ((LM_DEBUG
,
285 "DT_Creator::activate_job_list\n"));
289 CORBA::Object_var object
=
290 this->orb_
->resolve_initial_references ("RootPOA");
292 PortableServer::POA_var root_poa
=
293 PortableServer::POA::_narrow (object
.in ());
295 for (int i
= 0; i
< job_count_
; ++i
)
301 if (TAO_debug_level
> 0)
302 ACE_DEBUG ((LM_DEBUG
, "Activating job:%C\n", job
->name ().c_str ()));
305 PortableServer::POA_var host_poa
=
306 root_poa
->find_POA (job
->poa ().c_str (), 0);
308 PortableServer::ServantBase_var
servant_var (job
);
310 // Register with poa.
311 PortableServer::ObjectId_var id
;
313 id
= host_poa
->activate_object (job
);
315 CORBA::Object_var server
=
316 host_poa
->id_to_reference (id
.in ());
318 CORBA::String_var ior
=
319 orb_
->object_to_string (server
.in ());
321 const ACE_CString
&job_name
= job
->name ();
323 CosNaming::Name_var name
=
324 this->naming_
->to_name (job_name
.c_str ());
326 this->naming_
->rebind (name
.in (),
332 ACE_DEBUG ((LM_DEBUG
,
333 "Activated Job List\n"));
337 DT_Creator::activate_schedule (void)
339 if (TAO_debug_level
> 0)
340 ACE_DEBUG ((LM_DEBUG
,
341 "Activating schedule, task count = %d\n",
346 for (int i
= 0; i
< dt_count_
; ++i
)
352 // resolve the object from the naming service
353 CosNaming::Name
name (1);
355 name
[0].id
= CORBA::string_dup (task
->job ());
357 CORBA::Object_var obj
=
358 this->naming_
->resolve (name
);
360 Job_var job
= Job::_narrow (obj
.in ());
362 // if (TAO_debug_level > 0)
364 // Check that the object is configured with some
365 // PriorityModelPolicy.
366 CORBA::Policy_var policy
=
367 job
->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE
);
369 RTCORBA::PriorityModelPolicy_var priority_policy
=
370 RTCORBA::PriorityModelPolicy::_narrow (policy
.in ());
372 if (CORBA::is_nil (priority_policy
.in ()))
373 ACE_DEBUG ((LM_DEBUG
,
374 "ERROR: Priority Model Policy not exposed!\n"));
378 RTCORBA::PriorityModel priority_model =
379 priority_policy->priority_model ();
381 if (priority_model == RTCORBA::CLIENT_PROPAGATED)
382 ACE_DEBUG ((LM_DEBUG,
383 "%C priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ()));
385 ACE_DEBUG ((LM_DEBUG,
386 "%C priority_model = RTCORBA::SERVER_DECLARED\n", task->job ()));
389 //} /* if (TAO_debug_level > 0) */
391 task
->job (job
.in ());
394 if (TAO_debug_level
> 0 && dt_count_
> 0)
395 ACE_DEBUG ((LM_DEBUG
,
396 "Activated schedule, task count = %d\n",
402 DT_Creator::resolve_naming_service (void)
404 CORBA::Object_var naming_obj
=
405 this->orb_
->resolve_initial_references ("NameService");
407 // Need to check return value for errors.
408 if (CORBA::is_nil (naming_obj
.in ()))
409 ACE_ERROR_RETURN ((LM_ERROR
,
410 " (%P|%t) Unable to resolve the Naming Service.\n"),
414 CosNaming::NamingContextExt::_narrow (naming_obj
.in ());
420 DT_Creator::create_distributable_threads (RTScheduling::Current_ptr current
)
423 flags
= THR_NEW_LWP
| THR_JOINABLE
;
425 orb_
->orb_core ()->orb_params ()->scope_policy () |
426 orb_
->orb_core ()->orb_params ()->sched_policy ();
428 ACE_DEBUG ((LM_DEBUG
,
429 "Waiting to Synch\n"));
431 while (!this->synch ()->synched ())
435 this->orb_
->perform_work ();
437 catch (const CORBA::Exception
&)
443 CORBA::Policy_var sched_param
;
444 sched_param
= this->sched_param (100);
445 const char * name
= 0;
446 current
->begin_scheduling_segment (name
,
451 ACE_Time_Value (*(this->synch ()->base_time ())));
453 for (int i
= 0; i
< this->dt_count_
; i
++)
455 ACE_Time_Value
now (ACE_OS::gettimeofday ());
457 ACE_Time_Value elapsed_time
= now
- *base_time_
;
460 ACE_OS::sprintf (buf
, "elapsed time = %d\n now = %d\n base_time_ = %d\n",
461 (int) elapsed_time
.sec (),
463 (int) base_time_
->sec());
465 log
[log_index
++] = ACE_OS::strdup (buf
) ;
467 ACE_hthread_t curr_thr
;
468 ACE_Thread::self (curr_thr
);
470 if (dt_list_
[i
]->start_time () != 0 &&
471 (elapsed_time
.sec () < dt_list_
[i
]->start_time ()))
473 time_t suspension_time
= dt_list_
[i
]->start_time () - elapsed_time
.sec ();
474 ACE_OS::sprintf (buf
, "suspension_time = %lu\n",
475 static_cast<unsigned long> (suspension_time
));
476 log
[log_index
++] = ACE_OS::strdup (buf
);
477 yield (suspension_time
, dt_list_
[i
]);
480 sched_param
= this->sched_param (dt_list_
[i
]->importance ());
481 dt_list_
[i
]->activate_task (current
,
491 current
->end_scheduling_segment (name
);
493 this->check_ifexit ();
497 DT_Creator::dt_ended (void)
500 ACE_GUARD (ACE_Lock
, ace_mon
, *state_lock_
);
502 if (TAO_debug_level
> 0)
503 ACE_DEBUG ((LM_DEBUG
, "Active dt count = %d\n",active_dt_count_
));
505 ACE_OS::sprintf (buf
,"Active dt count = %d\n",active_dt_count_
);
506 log
[log_index
++] = ACE_OS::strdup (buf
);
508 this->check_ifexit ();
512 DT_Creator::job_ended (void)
515 ACE_GUARD (ACE_Lock
, ace_mon
, *state_lock_
);
517 if (TAO_debug_level
> 0)
518 ACE_DEBUG ((LM_DEBUG
, "Active job count = %d\n",active_job_count_
));
520 ACE_OS::sprintf (buf
,"Active job count = %d\n",active_job_count_
);
521 log
[log_index
++] = ACE_OS::strdup (buf
);
524 this->check_ifexit ();
528 DT_Creator::check_ifexit (void)
530 if (TAO_debug_level
> 0)
531 ACE_DEBUG ((LM_DEBUG
,
532 "Checking exit status Job# = %d DT# = %d\n",
536 static int shutdown
= 0;
539 ACE_GUARD (ACE_Lock
, ace_mon
, *shutdown_lock_
);
543 // All tasks have finished and all jobs have been shutdown.
544 if (active_dt_count_
== 0 && active_job_count_
== 0)
546 ACE_DEBUG ((LM_DEBUG
, "Shutdown in progress ...\n"));
549 for (int i = 0; i < dt_count_; i++)
551 dt_list_[i]->dump_stats ();
554 for (int i = 0; i < job_count_; i ++)
556 job_list_[i]->dump_stats ();
559 TASK_STATS::instance ()->dump_samples (file_name_
.c_str (),
560 ACE_TEXT("#Schedule Output"));
564 FILE* log_file
= ACE_OS::fopen (log_file_name_
.c_str (), "w");
568 // first dump what the caller has to say.
569 ACE_OS::fprintf (log_file
, "Log File\n");
571 for (int i
= 0; i
< log_index
; i
++)
573 ACE_OS::fprintf (log_file
, "%s\n", log
[i
]);
576 ACE_OS::fclose (log_file
);
578 ACE_DEBUG ((LM_DEBUG
,
579 "Log File Ready\n"));
587 DT_Creator::dt_count (void)
592 DT_Creator::DT_Creator (void)
601 active_dt_count_ (0),
602 active_job_count_ (0),
612 DT_Creator::~DT_Creator (void)
614 for (int i
= 0; i
< log_index
; ++i
)
615 ACE_OS::free (log
[i
]);
618 for (int i
= 0; i
< this->dt_count_
; ++i
)
619 delete this->dt_list_
[i
];
621 for (int i
= 0; i
< this->poa_count_
; ++i
)
622 delete this->poa_list_
[i
];
629 delete shutdown_lock_
;
634 DT_Creator::log_msg (char* msg
)
636 log
[log_index
++] = ACE_OS::strdup (msg
);
640 DT_Creator::orb (void)
642 return this->orb_
.in ();
646 DT_Creator::orb (CORBA::ORB_ptr orb
)
648 this->orb_
= CORBA::ORB::_duplicate (orb
);
652 DT_Creator::base_time (void)
654 return this->base_time_
;
658 DT_Creator::base_time (ACE_Time_Value
* base_time
)
660 this->base_time_
= base_time
;
665 DT_Creator::synch (void)
667 return this->synch_
.in ();