1 #include "DT_Creator.h"
2 #include "Thread_Task.h"
3 #include "Task_Stats.h"
4 #include "DT_Creator.h"
6 #include "tao/ORB_Core.h"
7 #include "tao/RTScheduling/Current.h"
9 #include "ace/High_Res_Timer.h"
11 ACE_Atomic_Op
<TAO_SYNCH_MUTEX
, long> guid_counter
;
14 DT_Creator::dt_task_init (ACE_Arg_Shifter
& arg_shifter
)
16 static int dt_index
= 0;
17 time_t start_time
= 0;
23 const ACE_TCHAR
* current_arg
= 0;
25 if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-Importance")) == 0)
27 arg_shifter
.consume_arg ();
28 current_arg
= arg_shifter
.get_current ();
29 importance
= ACE_OS::atoi (current_arg
);
30 arg_shifter
.consume_arg ();
33 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Start_Time"))))
35 start_time
= ACE_OS::atoi (current_arg
);
36 arg_shifter
.consume_arg ();
39 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Iter"))))
41 iter
= ACE_OS::atoi (current_arg
);
42 arg_shifter
.consume_arg ();
45 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Load"))))
47 load
= ACE_OS::atoi (current_arg
);
48 arg_shifter
.consume_arg ();
51 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-JobName"))))
53 job_name
= (char *)current_arg
;
55 arg_shifter
.consume_arg ();
58 dt_list_
[dt_index
++] = this->create_thr_task (importance
,
71 DT_Creator::init (int argc
, ACE_TCHAR
*argv
[])
73 gsf_
= ACE_High_Res_Timer::global_scale_factor ();
74 state_lock_
= new ACE_Lock_Adapter
<TAO_SYNCH_MUTEX
>;
75 shutdown_lock_
= new ACE_Lock_Adapter
<TAO_SYNCH_MUTEX
>;
77 active_job_count_
= 0;
78 ACE_NEW_RETURN (log
, char*[BUFSIZ
* 100],-1);
80 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
82 const ACE_TCHAR
* current_arg
= 0;
89 while (arg_shifter
.is_anything_left ())
91 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-GuidSeed"))))
93 guid_counter
= (long) ACE_OS::atoi (current_arg
);
94 arg_shifter
.consume_arg ();
96 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-DT_Count"))))
98 dt_count_
= ACE_OS::atoi (current_arg
);
99 ACE_NEW_RETURN (dt_list_
, Thread_Task
*[dt_count_
], -1);
100 arg_shifter
.consume_arg ();
102 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-POA_Count"))))
104 poa_count_
= ACE_OS::atoi (current_arg
);
105 ACE_NEW_RETURN (poa_list_
, POA_Holder
*[poa_count_
], -1);
106 arg_shifter
.consume_arg ();
108 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-JOB_Count"))))
110 job_count_
= ACE_OS::atoi (current_arg
);
111 ACE_NEW_RETURN (job_list_
, Job_i
*[job_count_
], -1);
112 arg_shifter
.consume_arg ();
114 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-DT_Task")) == 0)
116 arg_shifter
.consume_arg ();
117 dt_task_init (arg_shifter
);
119 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-POA")) == 0)
121 arg_shifter
.consume_arg ();
123 ACE_NEW_RETURN (this->poa_list_
[poa_count
], POA_Holder (), -1);
125 if (this->poa_list_
[poa_count
]->init (arg_shifter
) == -1)
127 delete this->poa_list_
[poa_count
];
135 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-Job")) == 0)
137 arg_shifter
.consume_arg ();
139 ACE_NEW_RETURN (this->job_list_
[job_count
], Job_i (this), -1);
141 if (this->job_list_
[job_count
]->init (arg_shifter
) == -1)
143 delete this->job_list_
[job_count
];
151 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-OutFile"))))
153 file_name_
= current_arg
;
154 arg_shifter
.consume_arg ();
156 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-LogFile"))))
158 log_file_name_
= current_arg
;
159 arg_shifter
.consume_arg ();
163 arg_shifter
.ignore_arg ();
171 DT_Creator::register_synch_obj ()
173 CosNaming::Name
name (1);
176 CosNaming::NamingContext_var synch_context
;
180 // Try binding the sender context in the NS
182 CORBA::string_dup ("Synch");
184 synch_context
= this->naming_
->bind_new_context (name
);
187 // We reach here if there was no exception raised in
188 // <bind_new_context>. We then create a receiver context.
192 catch (const CosNaming::NamingContext::AlreadyBound
& )
195 // The synch context already exists, probably created by the
199 // Get the synch context.
201 CORBA::string_dup ("Synch");
203 CORBA::Object_var object
=
204 this->naming_
->resolve (name
);
206 synch_context
= CosNaming::NamingContext::_narrow (object
.in ());
209 ACE_CString
synch_name ("Synch");
210 ACE_Time_Value timestamp
= ACE_OS::gettimeofday ();
213 ACE_OS::sprintf(buf
, "%lu", static_cast<unsigned long> (timestamp
.sec ()));
217 CORBA::string_dup (synch_name
.c_str ());
219 ACE_DEBUG ((LM_DEBUG
,
221 synch_name
.c_str ()));
226 Synch_var synch
= synch_
->_this ();
228 // Register the synch object with the Synch context.
229 synch_context
->rebind (name
,
235 DT_Creator::activate_root_poa ()
237 CORBA::Object_var object
=
238 this->orb_
->resolve_initial_references ("RootPOA");
240 PortableServer::POA_var root_poa
=
241 PortableServer::POA::_narrow (object
.in ());
243 PortableServer::POAManager_var poa_manager
=
244 root_poa
->the_POAManager ();
246 poa_manager
->activate ();
252 DT_Creator::activate_poa_list ()
254 if (TAO_debug_level
> 0)
255 ACE_DEBUG ((LM_DEBUG
,
256 "DT_Creator::activate_poa_list\n"));
260 CORBA::Object_var object
=
261 this->orb_
->resolve_initial_references ("RTORB");
263 RTCORBA::RTORB_var rt_orb
= RTCORBA::RTORB::_narrow (object
.in ());
265 object
= this->orb_
->resolve_initial_references ("RootPOA");
267 PortableServer::POA_var root_poa
=
268 PortableServer::POA::_narrow (object
.in ());
270 for (int i
= 0; i
< poa_count_
; ++i
)
272 poa_list_
[i
]->activate (rt_orb
.in(), root_poa
.in ());
278 DT_Creator::activate_job_list ()
280 if (TAO_debug_level
> 0)
281 ACE_DEBUG ((LM_DEBUG
,
282 "DT_Creator::activate_job_list\n"));
286 CORBA::Object_var object
=
287 this->orb_
->resolve_initial_references ("RootPOA");
289 PortableServer::POA_var root_poa
=
290 PortableServer::POA::_narrow (object
.in ());
292 for (int i
= 0; i
< job_count_
; ++i
)
298 if (TAO_debug_level
> 0)
299 ACE_DEBUG ((LM_DEBUG
, "Activating job:%C\n", job
->name ().c_str ()));
302 PortableServer::POA_var host_poa
=
303 root_poa
->find_POA (job
->poa ().c_str (), 0);
305 PortableServer::ServantBase_var
servant_var (job
);
307 // Register with poa.
308 PortableServer::ObjectId_var id
;
310 id
= host_poa
->activate_object (job
);
312 CORBA::Object_var server
=
313 host_poa
->id_to_reference (id
.in ());
315 CORBA::String_var ior
=
316 orb_
->object_to_string (server
.in ());
318 const ACE_CString
&job_name
= job
->name ();
320 CosNaming::Name_var name
=
321 this->naming_
->to_name (job_name
.c_str ());
323 this->naming_
->rebind (name
.in (),
327 ACE_DEBUG ((LM_DEBUG
,
328 "Activated Job List\n"));
332 DT_Creator::activate_schedule ()
334 if (TAO_debug_level
> 0)
335 ACE_DEBUG ((LM_DEBUG
,
336 "Activating schedule, task count = %d\n",
341 for (int i
= 0; i
< dt_count_
; ++i
)
347 // resolve the object from the naming service
348 CosNaming::Name
name (1);
350 name
[0].id
= CORBA::string_dup (task
->job ());
352 CORBA::Object_var obj
=
353 this->naming_
->resolve (name
);
355 Job_var job
= Job::_narrow (obj
.in ());
357 // if (TAO_debug_level > 0)
359 // Check that the object is configured with some
360 // PriorityModelPolicy.
361 CORBA::Policy_var policy
=
362 job
->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE
);
364 RTCORBA::PriorityModelPolicy_var priority_policy
=
365 RTCORBA::PriorityModelPolicy::_narrow (policy
.in ());
367 if (CORBA::is_nil (priority_policy
.in ()))
368 ACE_DEBUG ((LM_DEBUG
,
369 "ERROR: Priority Model Policy not exposed!\n"));
373 RTCORBA::PriorityModel priority_model =
374 priority_policy->priority_model ();
376 if (priority_model == RTCORBA::CLIENT_PROPAGATED)
377 ACE_DEBUG ((LM_DEBUG,
378 "%C priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ()));
380 ACE_DEBUG ((LM_DEBUG,
381 "%C priority_model = RTCORBA::SERVER_DECLARED\n", task->job ()));
384 //} /* if (TAO_debug_level > 0) */
386 task
->job (job
.in ());
389 if (TAO_debug_level
> 0 && dt_count_
> 0)
390 ACE_DEBUG ((LM_DEBUG
,
391 "Activated schedule, task count = %d\n",
396 DT_Creator::resolve_naming_service ()
398 CORBA::Object_var naming_obj
=
399 this->orb_
->resolve_initial_references ("NameService");
401 // Need to check return value for errors.
402 if (CORBA::is_nil (naming_obj
.in ()))
403 ACE_ERROR_RETURN ((LM_ERROR
,
404 " (%P|%t) Unable to resolve the Naming Service.\n"),
408 CosNaming::NamingContextExt::_narrow (naming_obj
.in ());
414 DT_Creator::create_distributable_threads (RTScheduling::Current_ptr current
)
417 flags
= THR_NEW_LWP
| THR_JOINABLE
;
419 orb_
->orb_core ()->orb_params ()->scope_policy () |
420 orb_
->orb_core ()->orb_params ()->sched_policy ();
422 ACE_DEBUG ((LM_DEBUG
,
423 "Waiting to Synch\n"));
425 while (!this->synch ()->synched ())
429 this->orb_
->perform_work ();
431 catch (const CORBA::Exception
&)
437 CORBA::Policy_var sched_param
;
438 sched_param
= this->sched_param (100);
439 const char * name
= 0;
440 current
->begin_scheduling_segment (name
,
445 ACE_Time_Value (*(this->synch ()->base_time ())));
447 for (int i
= 0; i
< this->dt_count_
; i
++)
449 ACE_Time_Value
now (ACE_OS::gettimeofday ());
451 ACE_Time_Value elapsed_time
= now
- *base_time_
;
454 ACE_OS::sprintf (buf
, "elapsed time = %d\n now = %d\n base_time_ = %d\n",
455 (int) elapsed_time
.sec (),
457 (int) base_time_
->sec());
459 log
[log_index
++] = ACE_OS::strdup (buf
) ;
461 ACE_hthread_t curr_thr
;
462 ACE_Thread::self (curr_thr
);
464 if (dt_list_
[i
]->start_time () != 0 &&
465 (elapsed_time
.sec () < dt_list_
[i
]->start_time ()))
467 time_t suspension_time
= dt_list_
[i
]->start_time () - elapsed_time
.sec ();
468 ACE_OS::sprintf (buf
, "suspension_time = %lu\n",
469 static_cast<unsigned long> (suspension_time
));
470 log
[log_index
++] = ACE_OS::strdup (buf
);
471 yield (suspension_time
, dt_list_
[i
]);
474 sched_param
= this->sched_param (dt_list_
[i
]->importance ());
475 dt_list_
[i
]->activate_task (current
,
485 current
->end_scheduling_segment (name
);
487 this->check_ifexit ();
491 DT_Creator::dt_ended ()
494 ACE_GUARD (ACE_Lock
, ace_mon
, *state_lock_
);
496 if (TAO_debug_level
> 0)
497 ACE_DEBUG ((LM_DEBUG
, "Active dt count = %d\n",active_dt_count_
));
499 ACE_OS::sprintf (buf
,"Active dt count = %d\n",active_dt_count_
);
500 log
[log_index
++] = ACE_OS::strdup (buf
);
502 this->check_ifexit ();
506 DT_Creator::job_ended ()
509 ACE_GUARD (ACE_Lock
, ace_mon
, *state_lock_
);
511 if (TAO_debug_level
> 0)
512 ACE_DEBUG ((LM_DEBUG
, "Active job count = %d\n",active_job_count_
));
514 ACE_OS::sprintf (buf
,"Active job count = %d\n",active_job_count_
);
515 log
[log_index
++] = ACE_OS::strdup (buf
);
518 this->check_ifexit ();
522 DT_Creator::check_ifexit ()
524 if (TAO_debug_level
> 0)
525 ACE_DEBUG ((LM_DEBUG
,
526 "Checking exit status Job# = %d DT# = %d\n",
530 static int shutdown
= 0;
533 ACE_GUARD (ACE_Lock
, ace_mon
, *shutdown_lock_
);
537 // All tasks have finished and all jobs have been shutdown.
538 if (active_dt_count_
== 0 && active_job_count_
== 0)
540 ACE_DEBUG ((LM_DEBUG
, "Shutdown in progress ...\n"));
543 for (int i = 0; i < dt_count_; i++)
545 dt_list_[i]->dump_stats ();
548 for (int i = 0; i < job_count_; i ++)
550 job_list_[i]->dump_stats ();
553 TASK_STATS::instance ()->dump_samples (file_name_
.c_str (),
554 ACE_TEXT("#Schedule Output"));
558 FILE* log_file
= ACE_OS::fopen (log_file_name_
.c_str (), "w");
562 // first dump what the caller has to say.
563 ACE_OS::fprintf (log_file
, "Log File\n");
565 for (int i
= 0; i
< log_index
; i
++)
567 ACE_OS::fprintf (log_file
, "%s\n", log
[i
]);
570 ACE_OS::fclose (log_file
);
572 ACE_DEBUG ((LM_DEBUG
,
573 "Log File Ready\n"));
580 DT_Creator::dt_count ()
585 DT_Creator::DT_Creator ()
594 active_dt_count_ (0),
595 active_job_count_ (0),
605 DT_Creator::~DT_Creator ()
607 for (int i
= 0; i
< log_index
; ++i
)
608 ACE_OS::free (log
[i
]);
611 for (int i
= 0; i
< this->dt_count_
; ++i
)
612 delete this->dt_list_
[i
];
614 for (int i
= 0; i
< this->poa_count_
; ++i
)
615 delete this->poa_list_
[i
];
622 delete shutdown_lock_
;
627 DT_Creator::log_msg (char* msg
)
629 log
[log_index
++] = ACE_OS::strdup (msg
);
635 return this->orb_
.in ();
639 DT_Creator::orb (CORBA::ORB_ptr orb
)
641 this->orb_
= CORBA::ORB::_duplicate (orb
);
645 DT_Creator::base_time ()
647 return this->base_time_
;
651 DT_Creator::base_time (ACE_Time_Value
* base_time
)
653 this->base_time_
= base_time
;
660 return this->synch_
.in ();