2 #include "Thread_Task.h"
4 #include "POA_Holder.h"
7 #include "tao/ORB_Core.h"
10 #include "ace/High_Res_Timer.h"
11 #include "ace/Barrier.h"
12 #include "ace/Timer_Heap.h"
13 #include "ace/Service_Config.h"
14 #include "ace/Arg_Shifter.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Argv_Type_Converter.h"
17 #include "ace/Signal.h"
18 #include "ace/Reactor.h"
21 //***************************************************************//
22 extern "C" void handler (int)
24 ACE_Service_Config::reconfig_occurred (1);
27 //***************************************************************//
29 Activity::Activity (void)
32 active_task_count_ (0),
35 state_lock_
= new ACE_Lock_Adapter
<TAO_SYNCH_MUTEX
>;
38 Activity::~Activity (void)
45 Activity::builder (Builder
* builder
)
57 Activity::current (void)
59 return current_
.in ();
63 Activity::init (int& argc
, ACE_TCHAR
*argv
[])
65 // Copy command line parameter.
66 ACE_Argv_Type_Converter
command_line(argc
, argv
);
68 this->orb_
= CORBA::ORB_init (command_line
.get_argc(),
69 command_line
.get_TCHAR_argv());
71 CORBA::Object_var object
=
72 orb_
->resolve_initial_references ("RootPOA");
75 PortableServer::POA::_narrow (object
.in ());
77 PortableServer::POAManager_var poa_manager
=
78 root_poa_
->the_POAManager ();
81 orb_
->resolve_initial_references ("RTORB");
84 RTCORBA::RTORB::_narrow (object
.in ());
87 orb_
->resolve_initial_references ("RTCurrent");
90 RTCORBA::Current::_narrow (object
.in ());
92 poa_manager
->activate ();
94 object
= this->orb_
->resolve_initial_references ("PriorityMappingManager");
95 RTCORBA::PriorityMappingManager_var mapping_manager
=
96 RTCORBA::PriorityMappingManager::_narrow (object
.in ());
98 this->priority_mapping_
= mapping_manager
->mapping ();
104 Activity::resolve_naming_service (void)
106 CORBA::Object_var naming_obj
=
107 this->orb_
->resolve_initial_references ("NameService");
109 // Need to check return value for errors.
110 if (CORBA::is_nil (naming_obj
.in ()))
111 ACE_ERROR_RETURN ((LM_ERROR
,
112 " (%P|%t) Unable to resolve the Naming Service.\n"),
116 CosNaming::NamingContextExt::_narrow (naming_obj
.in ());
122 Activity::activate_poa_list (void)
125 int count
= builder_
->poa_list (list
);
127 for (int i
= 0; i
< count
; ++i
)
129 list
[i
]->activate (this->rt_orb_
.in(), this->root_poa_
.in ());
134 Activity::activate_job_list (void)
137 int count
= builder_
->job_list (list
);
140 for (int i
= 0; i
< count
; ++i
)
144 if (TAO_debug_level
> 0)
145 ACE_DEBUG ((LM_DEBUG
, "Activating job:%C\n", job
->name ().c_str ()));
148 PortableServer::POA_var host_poa
=
149 root_poa_
->find_POA (job
->poa ().c_str (), 0);
151 PortableServer::ServantBase_var
servant_var (job
);
153 // Register with poa.
154 PortableServer::ObjectId_var id
;
156 id
= host_poa
->activate_object (job
);
158 CORBA::Object_var server
=
159 host_poa
->id_to_reference (id
.in ());
161 CORBA::String_var ior
=
162 orb_
->object_to_string (server
.in ());
164 const ACE_CString
&job_name
= job
->name ();
166 CosNaming::Name_var name
=
167 this->naming_
->to_name (job_name
.c_str ());
169 this->naming_
->rebind (name
.in (),
172 ACE_DEBUG ((LM_DEBUG
,
173 "Registered %C with the naming service\n",
182 Activity::activate_schedule (void)
185 int count
= builder_
->task_list (list
);
187 if (TAO_debug_level
> 0)
188 ACE_DEBUG ((LM_DEBUG
, "Activating schedule, task count = %d\n",
191 ACE_NEW (barrier_
, ACE_Barrier (count
+1));
195 for (int i
= 0; i
< count
; ++i
)
199 // resolve the object from the naming service
200 CosNaming::Name
name (1);
202 name
[0].id
= CORBA::string_dup (task
->job ());
204 CORBA::Object_var obj
=
205 this->naming_
->resolve (name
);
207 Job_var job
= Job::_narrow (obj
.in ());
209 if (TAO_debug_level
> 0)
211 // Check that the object is configured with some
212 // PriorityModelPolicy.
213 CORBA::Policy_var policy
=
214 job
->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE
);
216 RTCORBA::PriorityModelPolicy_var priority_policy
=
217 RTCORBA::PriorityModelPolicy::_narrow (policy
.in ());
219 if (CORBA::is_nil (priority_policy
.in ()))
220 ACE_DEBUG ((LM_DEBUG
,
221 "ERROR: Priority Model Policy not exposed!\n"));
224 RTCORBA::PriorityModel priority_model
=
225 priority_policy
->priority_model ();
227 if (priority_model
== RTCORBA::CLIENT_PROPAGATED
)
228 ACE_DEBUG ((LM_DEBUG
,
229 "%C priority_model = RTCORBA::CLIENT_PROPAGATED\n", task
->job ()));
231 ACE_DEBUG ((LM_DEBUG
,
232 "%C priority_model = RTCORBA::SERVER_DECLARED\n", task
->job ()));
234 } /* if (TAO_debug_level > 0) */
236 task
->job (job
.in ());
237 task
->activate_task (this->barrier_
, this->priority_mapping_
);
238 active_task_count_
++;
240 ACE_DEBUG ((LM_DEBUG
, "Job %C scheduled\n", task
->job ()));
243 ACE_DEBUG ((LM_DEBUG
, "(%P,%t) Waiting for tasks to synch...\n"));
245 ACE_DEBUG ((LM_DEBUG
, "(%P,%t) Tasks have synched...\n"));
249 Activity::task_ended (Periodic_Task
* /*ended_task*/)
251 ACE_DEBUG ((LM_DEBUG
, "Active task count = %d\n",active_task_count_
));
253 ACE_GUARD (ACE_Lock
, ace_mon
, *state_lock_
);
254 --active_task_count_
;
257 this->check_ifexit ();
261 Activity::job_ended (Job_i
* /*ended_job*/)
263 ACE_DEBUG ((LM_DEBUG
, "Active job count = %d\n",active_job_count_
));
265 ACE_GUARD (ACE_Lock
, ace_mon
, *state_lock_
);
269 this->check_ifexit ();
273 Activity::check_ifexit (void)
275 // All tasks have finished and all jobs have been shutdown.
276 if (active_task_count_
== 0 && active_job_count_
== 0)
278 ACE_DEBUG ((LM_DEBUG
, "Shutdown in progress ...\n"));
279 // ask all tasks to dump stats.
282 int count
= builder_
->task_list (task_list
);
284 ACE_TCHAR msg
[BUFSIZ
];
285 ACE_OS::sprintf (msg
, ACE_TEXT("# Stats generated on --\n"));
287 for (int i
= 0; i
< count
; ++i
)
289 task_list
[i
]->dump_stats (msg
);
298 Activity::get_server_priority (CORBA::Object_ptr server
)
300 // Get the Priority Model Policy from the stub.
301 CORBA::Policy_var policy
=
302 server
->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE
);
304 // Narrow down to correct type.
305 RTCORBA::PriorityModelPolicy_var priority_policy
=
306 RTCORBA::PriorityModelPolicy::_narrow (policy
.in ());
308 // Make sure that we have the SERVER_DECLARED priority model.
309 RTCORBA::PriorityModel priority_model
=
310 priority_policy
->priority_model ();
311 if (priority_model
!= RTCORBA::SERVER_DECLARED
)
314 // Return the server priority.
315 return priority_policy
->server_priority ();
319 Activity::run (int argc
, ACE_TCHAR
*argv
[])
321 this->init (argc
, argv
);
323 if (this->resolve_naming_service () == -1)
326 this->activate_poa_list ();
328 this->activate_job_list ();
330 this->activate_schedule ();
332 this->create_started_flag_file (argc
, argv
);
336 ACE_Thread_Manager::instance ()->wait ();
338 CORBA::release (this->naming_
);
340 // Hack for proper cleanup.
341 this->builder_
->fini ();
347 Activity::create_started_flag_file (int argc
, ACE_TCHAR
*argv
[])
349 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
351 const ACE_TCHAR
* current_arg
= 0;
353 while (arg_shifter
.is_anything_left ())
355 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Started_Flag"))))
357 FILE *file
= ACE_OS::fopen (current_arg
, ACE_TEXT("w"));
360 ACE_ERROR ((LM_ERROR
,
361 "Unable to open %s for writing: %p\n",
364 ACE_OS::fprintf (file
, "ignore");
366 ACE_OS::fclose (file
);
368 arg_shifter
.consume_arg ();
372 arg_shifter
.ignore_arg ();
378 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
380 ACE_Service_Config::static_svcs ()->insert (&ace_svc_desc_Builder
);
382 ACE_Sig_Action
sa ((ACE_SignalHandler
) handler
, SIGHUP
);
384 ACE_Timer_Heap timer_queue
;
385 ACE_Reactor::instance ()->timer_queue (&timer_queue
);
390 ACTIVITY::instance()->run (argc
, argv
);
392 catch (const CORBA::Exception
& ex
)
394 ex
._tao_print_exception ("Caught exception:");
398 // reset stack based timer queue
399 ACE_Reactor::instance ()->timer_queue (0);
404 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton
, Activity
, ACE_Null_Mutex
);