2 #include "Thread_Task.h"
4 #include "POA_Holder.h"
7 #include "tao/ORB_Core.h"
10 #include "ace/High_Res_Timer.h"
11 #include "ace/Barrier.h"
12 #include "ace/Timer_Heap.h"
13 #include "ace/Service_Config.h"
14 #include "ace/Arg_Shifter.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Argv_Type_Converter.h"
17 #include "ace/Signal.h"
18 #include "ace/Reactor.h"
21 //***************************************************************//
22 extern "C" void handler (int)
24 ACE_Service_Config::reconfig_occurred (1);
27 //***************************************************************//
32 active_task_count_ (0),
35 state_lock_
= new ACE_Lock_Adapter
<TAO_SYNCH_MUTEX
>;
38 Activity::~Activity ()
45 Activity::builder (Builder
* builder
)
59 return current_
.in ();
63 Activity::init (int& argc
, ACE_TCHAR
*argv
[])
65 // Copy command line parameter.
66 ACE_Argv_Type_Converter
command_line(argc
, argv
);
68 this->orb_
= CORBA::ORB_init (command_line
.get_argc(),
69 command_line
.get_TCHAR_argv());
71 CORBA::Object_var object
=
72 orb_
->resolve_initial_references ("RootPOA");
75 PortableServer::POA::_narrow (object
.in ());
77 PortableServer::POAManager_var poa_manager
=
78 root_poa_
->the_POAManager ();
81 orb_
->resolve_initial_references ("RTORB");
84 RTCORBA::RTORB::_narrow (object
.in ());
87 orb_
->resolve_initial_references ("RTCurrent");
90 RTCORBA::Current::_narrow (object
.in ());
92 poa_manager
->activate ();
94 object
= this->orb_
->resolve_initial_references ("PriorityMappingManager");
95 RTCORBA::PriorityMappingManager_var mapping_manager
=
96 RTCORBA::PriorityMappingManager::_narrow (object
.in ());
98 this->priority_mapping_
= mapping_manager
->mapping ();
104 Activity::resolve_naming_service ()
106 CORBA::Object_var naming_obj
=
107 this->orb_
->resolve_initial_references ("NameService");
109 // Need to check return value for errors.
110 if (CORBA::is_nil (naming_obj
.in ()))
111 ACE_ERROR_RETURN ((LM_ERROR
,
112 " (%P|%t) Unable to resolve the Naming Service.\n"),
116 CosNaming::NamingContextExt::_narrow (naming_obj
.in ());
122 Activity::activate_poa_list ()
125 int count
= builder_
->poa_list (list
);
127 for (int i
= 0; i
< count
; ++i
)
129 list
[i
]->activate (this->rt_orb_
.in(), this->root_poa_
.in ());
134 Activity::activate_job_list ()
137 int count
= builder_
->job_list (list
);
140 for (int i
= 0; i
< count
; ++i
)
144 if (TAO_debug_level
> 0)
145 ACE_DEBUG ((LM_DEBUG
, "Activating job:%C\n", job
->name ().c_str ()));
148 PortableServer::POA_var host_poa
=
149 root_poa_
->find_POA (job
->poa ().c_str (), 0);
151 PortableServer::ServantBase_var
servant_var (job
);
153 // Register with poa.
154 PortableServer::ObjectId_var id
;
156 id
= host_poa
->activate_object (job
);
158 CORBA::Object_var server
=
159 host_poa
->id_to_reference (id
.in ());
161 CORBA::String_var ior
=
162 orb_
->object_to_string (server
.in ());
164 const ACE_CString
&job_name
= job
->name ();
166 CosNaming::Name_var name
=
167 this->naming_
->to_name (job_name
.c_str ());
169 this->naming_
->rebind (name
.in (),
172 ACE_DEBUG ((LM_DEBUG
,
173 "Registered %C with the naming service\n",
181 Activity::activate_schedule ()
184 int count
= builder_
->task_list (list
);
186 if (TAO_debug_level
> 0)
187 ACE_DEBUG ((LM_DEBUG
, "Activating schedule, task count = %d\n",
190 ACE_NEW (barrier_
, ACE_Barrier (count
+1));
194 for (int i
= 0; i
< count
; ++i
)
198 // resolve the object from the naming service
199 CosNaming::Name
name (1);
201 name
[0].id
= CORBA::string_dup (task
->job ());
203 CORBA::Object_var obj
=
204 this->naming_
->resolve (name
);
206 Job_var job
= Job::_narrow (obj
.in ());
208 if (TAO_debug_level
> 0)
210 // Check that the object is configured with some
211 // PriorityModelPolicy.
212 CORBA::Policy_var policy
=
213 job
->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE
);
215 RTCORBA::PriorityModelPolicy_var priority_policy
=
216 RTCORBA::PriorityModelPolicy::_narrow (policy
.in ());
218 if (CORBA::is_nil (priority_policy
.in ()))
219 ACE_DEBUG ((LM_DEBUG
,
220 "ERROR: Priority Model Policy not exposed!\n"));
223 RTCORBA::PriorityModel priority_model
=
224 priority_policy
->priority_model ();
226 if (priority_model
== RTCORBA::CLIENT_PROPAGATED
)
227 ACE_DEBUG ((LM_DEBUG
,
228 "%C priority_model = RTCORBA::CLIENT_PROPAGATED\n", task
->job ()));
230 ACE_DEBUG ((LM_DEBUG
,
231 "%C priority_model = RTCORBA::SERVER_DECLARED\n", task
->job ()));
233 } /* if (TAO_debug_level > 0) */
235 task
->job (job
.in ());
236 task
->activate_task (this->barrier_
, this->priority_mapping_
);
237 active_task_count_
++;
239 ACE_DEBUG ((LM_DEBUG
, "Job %C scheduled\n", task
->job ()));
242 ACE_DEBUG ((LM_DEBUG
, "(%P,%t) Waiting for tasks to synch...\n"));
244 ACE_DEBUG ((LM_DEBUG
, "(%P,%t) Tasks have synched...\n"));
248 Activity::task_ended (Periodic_Task
* /*ended_task*/)
250 ACE_DEBUG ((LM_DEBUG
, "Active task count = %d\n",active_task_count_
));
252 ACE_GUARD (ACE_Lock
, ace_mon
, *state_lock_
);
253 --active_task_count_
;
256 this->check_ifexit ();
260 Activity::job_ended (Job_i
* /*ended_job*/)
262 ACE_DEBUG ((LM_DEBUG
, "Active job count = %d\n",active_job_count_
));
264 ACE_GUARD (ACE_Lock
, ace_mon
, *state_lock_
);
268 this->check_ifexit ();
272 Activity::check_ifexit ()
274 // All tasks have finished and all jobs have been shutdown.
275 if (active_task_count_
== 0 && active_job_count_
== 0)
277 ACE_DEBUG ((LM_DEBUG
, "Shutdown in progress ...\n"));
278 // ask all tasks to dump stats.
281 int count
= builder_
->task_list (task_list
);
283 ACE_TCHAR msg
[BUFSIZ
];
284 ACE_OS::sprintf (msg
, ACE_TEXT("# Stats generated on --\n"));
286 for (int i
= 0; i
< count
; ++i
)
288 task_list
[i
]->dump_stats (msg
);
292 orb_
->shutdown (false);
297 Activity::get_server_priority (CORBA::Object_ptr server
)
299 // Get the Priority Model Policy from the stub.
300 CORBA::Policy_var policy
=
301 server
->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE
);
303 // Narrow down to correct type.
304 RTCORBA::PriorityModelPolicy_var priority_policy
=
305 RTCORBA::PriorityModelPolicy::_narrow (policy
.in ());
307 // Make sure that we have the SERVER_DECLARED priority model.
308 RTCORBA::PriorityModel priority_model
=
309 priority_policy
->priority_model ();
310 if (priority_model
!= RTCORBA::SERVER_DECLARED
)
313 // Return the server priority.
314 return priority_policy
->server_priority ();
318 Activity::run (int argc
, ACE_TCHAR
*argv
[])
320 this->init (argc
, argv
);
322 if (this->resolve_naming_service () == -1)
325 this->activate_poa_list ();
327 this->activate_job_list ();
329 this->activate_schedule ();
331 this->create_started_flag_file (argc
, argv
);
335 ACE_Thread_Manager::instance ()->wait ();
337 CORBA::release (this->naming_
);
339 // Hack for proper cleanup.
340 this->builder_
->fini ();
346 Activity::create_started_flag_file (int argc
, ACE_TCHAR
*argv
[])
348 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
350 const ACE_TCHAR
* current_arg
= 0;
352 while (arg_shifter
.is_anything_left ())
354 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Started_Flag"))))
356 FILE *file
= ACE_OS::fopen (current_arg
, ACE_TEXT("w"));
359 ACE_ERROR ((LM_ERROR
,
360 "Unable to open %s for writing: %p\n",
363 ACE_OS::fprintf (file
, "ignore");
365 ACE_OS::fclose (file
);
367 arg_shifter
.consume_arg ();
371 arg_shifter
.ignore_arg ();
377 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
379 ACE_Service_Config::static_svcs ()->insert (&ace_svc_desc_Builder
);
381 ACE_Sig_Action
sa ((ACE_SignalHandler
) handler
, SIGHUP
);
383 ACE_Timer_Heap timer_queue
;
384 ACE_Reactor::instance ()->timer_queue (&timer_queue
);
389 ACTIVITY::instance()->run (argc
, argv
);
391 catch (const CORBA::Exception
& ex
)
393 ex
._tao_print_exception ("Caught exception:");
397 // reset stack based timer queue
398 ACE_Reactor::instance ()->timer_queue (0);
403 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton
, Activity
, ACE_Null_Mutex
);