Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / examples / RTCORBA / Activity / Activity.cpp
blob89ff0aa17e5f831f850c9be4f4e64ef818439fe6
1 #include "Activity.h"
2 #include "Thread_Task.h"
3 #include "Job_i.h"
4 #include "POA_Holder.h"
5 #include "Builder.h"
7 #include "tao/ORB_Core.h"
8 #include "tao/debug.h"
10 #include "ace/High_Res_Timer.h"
11 #include "ace/Barrier.h"
12 #include "ace/Timer_Heap.h"
13 #include "ace/Service_Config.h"
14 #include "ace/Arg_Shifter.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Argv_Type_Converter.h"
17 #include "ace/Signal.h"
18 #include "ace/Reactor.h"
21 //***************************************************************//
22 extern "C" void handler (int)
24 ACE_Service_Config::reconfig_occurred (1);
27 //***************************************************************//
29 Activity::Activity (void)
30 :builder_ (0),
31 barrier_ (0),
32 active_task_count_ (0),
33 active_job_count_ (0)
35 state_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>;
38 Activity::~Activity (void)
40 delete state_lock_;
41 delete barrier_;
44 void
45 Activity::builder (Builder* builder)
47 builder_ = builder;
50 CORBA::ORB_ptr
51 Activity::orb (void)
53 return orb_.in ();
56 RTCORBA::Current_ptr
57 Activity::current (void)
59 return current_.in ();
62 int
63 Activity::init (int& argc, ACE_TCHAR *argv [])
65 // Copy command line parameter.
66 ACE_Argv_Type_Converter command_line(argc, argv);
68 this->orb_ = CORBA::ORB_init (command_line.get_argc(),
69 command_line.get_TCHAR_argv());
71 CORBA::Object_var object =
72 orb_->resolve_initial_references ("RootPOA");
74 root_poa_ =
75 PortableServer::POA::_narrow (object.in ());
77 PortableServer::POAManager_var poa_manager =
78 root_poa_->the_POAManager ();
80 object =
81 orb_->resolve_initial_references ("RTORB");
83 this->rt_orb_ =
84 RTCORBA::RTORB::_narrow (object.in ());
86 object =
87 orb_->resolve_initial_references ("RTCurrent");
89 current_ =
90 RTCORBA::Current::_narrow (object.in ());
92 poa_manager->activate ();
94 object = this->orb_->resolve_initial_references ("PriorityMappingManager");
95 RTCORBA::PriorityMappingManager_var mapping_manager =
96 RTCORBA::PriorityMappingManager::_narrow (object.in ());
98 this->priority_mapping_ = mapping_manager->mapping ();
100 return 0;
104 Activity::resolve_naming_service (void)
106 CORBA::Object_var naming_obj =
107 this->orb_->resolve_initial_references ("NameService");
109 // Need to check return value for errors.
110 if (CORBA::is_nil (naming_obj.in ()))
111 ACE_ERROR_RETURN ((LM_ERROR,
112 " (%P|%t) Unable to resolve the Naming Service.\n"),
113 -1);
115 this->naming_ =
116 CosNaming::NamingContextExt::_narrow (naming_obj.in ());
118 return 0;
121 void
122 Activity::activate_poa_list (void)
124 POA_LIST list;
125 int count = builder_->poa_list (list);
127 for (int i = 0; i < count; ++i)
129 list[i]->activate (this->rt_orb_.in(), this->root_poa_.in ());
133 void
134 Activity::activate_job_list (void)
136 JOB_LIST list;
137 int count = builder_->job_list (list);
138 Job_i* job;
140 for (int i = 0; i < count; ++i)
142 job = list[i];
144 if (TAO_debug_level > 0)
145 ACE_DEBUG ((LM_DEBUG, "Activating job:%C\n", job->name ().c_str ()));
147 // find your poa
148 PortableServer::POA_var host_poa =
149 root_poa_->find_POA (job->poa ().c_str (), 0);
151 PortableServer::ServantBase_var servant_var (job);
153 // Register with poa.
154 PortableServer::ObjectId_var id;
156 id = host_poa->activate_object (job);
158 CORBA::Object_var server =
159 host_poa->id_to_reference (id.in ());
161 CORBA::String_var ior =
162 orb_->object_to_string (server.in ());
164 const ACE_CString &job_name = job->name ();
166 CosNaming::Name_var name =
167 this->naming_->to_name (job_name.c_str ());
169 this->naming_->rebind (name.in (),
170 server.in ());
172 ACE_DEBUG ((LM_DEBUG,
173 "Registered %C with the naming service\n",
174 job_name.c_str ()));
176 active_job_count_++;
178 } /* while */
181 void
182 Activity::activate_schedule (void)
184 TASK_LIST list;
185 int count = builder_->task_list (list);
187 if (TAO_debug_level > 0)
188 ACE_DEBUG ((LM_DEBUG, "Activating schedule, task count = %d\n",
189 count));
191 ACE_NEW (barrier_, ACE_Barrier (count+1));
193 Periodic_Task* task;
195 for (int i = 0; i < count; ++i)
197 task = list[i];
199 // resolve the object from the naming service
200 CosNaming::Name name (1);
201 name.length (1);
202 name[0].id = CORBA::string_dup (task->job ());
204 CORBA::Object_var obj =
205 this->naming_->resolve (name);
207 Job_var job = Job::_narrow (obj.in ());
209 if (TAO_debug_level > 0)
211 // Check that the object is configured with some
212 // PriorityModelPolicy.
213 CORBA::Policy_var policy =
214 job->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE);
216 RTCORBA::PriorityModelPolicy_var priority_policy =
217 RTCORBA::PriorityModelPolicy::_narrow (policy.in ());
219 if (CORBA::is_nil (priority_policy.in ()))
220 ACE_DEBUG ((LM_DEBUG,
221 "ERROR: Priority Model Policy not exposed!\n"));
222 else
224 RTCORBA::PriorityModel priority_model =
225 priority_policy->priority_model ();
227 if (priority_model == RTCORBA::CLIENT_PROPAGATED)
228 ACE_DEBUG ((LM_DEBUG,
229 "%C priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ()));
230 else
231 ACE_DEBUG ((LM_DEBUG,
232 "%C priority_model = RTCORBA::SERVER_DECLARED\n", task->job ()));
234 } /* if (TAO_debug_level > 0) */
236 task->job (job.in ());
237 task->activate_task (this->barrier_, this->priority_mapping_);
238 active_task_count_++;
240 ACE_DEBUG ((LM_DEBUG, "Job %C scheduled\n", task->job ()));
243 ACE_DEBUG ((LM_DEBUG, "(%P,%t) Waiting for tasks to synch...\n"));
244 barrier_->wait ();
245 ACE_DEBUG ((LM_DEBUG, "(%P,%t) Tasks have synched...\n"));
248 void
249 Activity::task_ended (Periodic_Task* /*ended_task*/)
251 ACE_DEBUG ((LM_DEBUG, "Active task count = %d\n",active_task_count_));
253 ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
254 --active_task_count_;
257 this->check_ifexit ();
260 void
261 Activity::job_ended (Job_i* /*ended_job*/)
263 ACE_DEBUG ((LM_DEBUG, "Active job count = %d\n",active_job_count_));
265 ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
266 --active_job_count_;
269 this->check_ifexit ();
272 void
273 Activity::check_ifexit (void)
275 // All tasks have finished and all jobs have been shutdown.
276 if (active_task_count_ == 0 && active_job_count_ == 0)
278 ACE_DEBUG ((LM_DEBUG, "Shutdown in progress ...\n"));
279 // ask all tasks to dump stats.
281 TASK_LIST task_list;
282 int count = builder_->task_list (task_list);
284 ACE_TCHAR msg[BUFSIZ];
285 ACE_OS::sprintf (msg, ACE_TEXT("# Stats generated on --\n"));
287 for (int i = 0; i < count; ++i)
289 task_list[i]->dump_stats (msg);
292 // shutdown the ORB
293 orb_->shutdown (0);
297 CORBA::Short
298 Activity::get_server_priority (CORBA::Object_ptr server)
300 // Get the Priority Model Policy from the stub.
301 CORBA::Policy_var policy =
302 server->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE);
304 // Narrow down to correct type.
305 RTCORBA::PriorityModelPolicy_var priority_policy =
306 RTCORBA::PriorityModelPolicy::_narrow (policy.in ());
308 // Make sure that we have the SERVER_DECLARED priority model.
309 RTCORBA::PriorityModel priority_model =
310 priority_policy->priority_model ();
311 if (priority_model != RTCORBA::SERVER_DECLARED)
312 return -1;
314 // Return the server priority.
315 return priority_policy->server_priority ();
318 void
319 Activity::run (int argc, ACE_TCHAR *argv[])
321 this->init (argc, argv);
323 if (this->resolve_naming_service () == -1)
324 return;
326 this->activate_poa_list ();
328 this->activate_job_list ();
330 this->activate_schedule ();
332 this->create_started_flag_file (argc, argv);
334 orb_->run ();
336 ACE_Thread_Manager::instance ()->wait ();
338 CORBA::release (this->naming_);
340 // Hack for proper cleanup.
341 this->builder_->fini ();
343 orb_->destroy ();
346 void
347 Activity::create_started_flag_file (int argc, ACE_TCHAR *argv[])
349 ACE_Arg_Shifter arg_shifter (argc, argv);
351 const ACE_TCHAR* current_arg = 0;
353 while (arg_shifter.is_anything_left ())
355 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-Started_Flag"))))
357 FILE *file = ACE_OS::fopen (current_arg, ACE_TEXT("w"));
359 if (file == 0)
360 ACE_ERROR ((LM_ERROR,
361 "Unable to open %s for writing: %p\n",
362 current_arg));
364 ACE_OS::fprintf (file, "ignore");
366 ACE_OS::fclose (file);
368 arg_shifter.consume_arg ();
370 else
372 arg_shifter.ignore_arg ();
378 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
380 ACE_Service_Config::static_svcs ()->insert (&ace_svc_desc_Builder);
382 ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGHUP);
384 ACE_Timer_Heap timer_queue;
385 ACE_Reactor::instance ()->timer_queue (&timer_queue);
387 int rc = 0;
390 ACTIVITY::instance()->run (argc, argv);
392 catch (const CORBA::Exception& ex)
394 ex._tao_print_exception ("Caught exception:");
395 rc = 1;
398 // reset stack based timer queue
399 ACE_Reactor::instance ()->timer_queue (0);
401 return rc;
404 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton, Activity, ACE_Null_Mutex);