Changes to attempt to silence bcc64x
[ACE_TAO.git] / TAO / examples / RTCORBA / Activity / Activity.cpp
blobf20c5e9d5312f739d2da6d500bfeebbe650c57ff
1 #include "Activity.h"
2 #include "Thread_Task.h"
3 #include "Job_i.h"
4 #include "POA_Holder.h"
5 #include "Builder.h"
7 #include "tao/ORB_Core.h"
8 #include "tao/debug.h"
10 #include "ace/High_Res_Timer.h"
11 #include "ace/Barrier.h"
12 #include "ace/Timer_Heap.h"
13 #include "ace/Service_Config.h"
14 #include "ace/Arg_Shifter.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/Argv_Type_Converter.h"
17 #include "ace/Signal.h"
18 #include "ace/Reactor.h"
21 //***************************************************************//
22 extern "C" void handler (int)
24 ACE_Service_Config::reconfig_occurred (1);
27 //***************************************************************//
29 Activity::Activity ()
30 :builder_ (0),
31 barrier_ (0),
32 active_task_count_ (0),
33 active_job_count_ (0)
35 state_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>;
38 Activity::~Activity ()
40 delete state_lock_;
41 delete barrier_;
44 void
45 Activity::builder (Builder* builder)
47 builder_ = builder;
50 CORBA::ORB_ptr
51 Activity::orb ()
53 return orb_.in ();
56 RTCORBA::Current_ptr
57 Activity::current ()
59 return current_.in ();
62 int
63 Activity::init (int& argc, ACE_TCHAR *argv [])
65 // Copy command line parameter.
66 ACE_Argv_Type_Converter command_line(argc, argv);
68 this->orb_ = CORBA::ORB_init (command_line.get_argc(),
69 command_line.get_TCHAR_argv());
71 CORBA::Object_var object =
72 orb_->resolve_initial_references ("RootPOA");
74 root_poa_ =
75 PortableServer::POA::_narrow (object.in ());
77 PortableServer::POAManager_var poa_manager =
78 root_poa_->the_POAManager ();
80 object =
81 orb_->resolve_initial_references ("RTORB");
83 this->rt_orb_ =
84 RTCORBA::RTORB::_narrow (object.in ());
86 object =
87 orb_->resolve_initial_references ("RTCurrent");
89 current_ =
90 RTCORBA::Current::_narrow (object.in ());
92 poa_manager->activate ();
94 object = this->orb_->resolve_initial_references ("PriorityMappingManager");
95 RTCORBA::PriorityMappingManager_var mapping_manager =
96 RTCORBA::PriorityMappingManager::_narrow (object.in ());
98 this->priority_mapping_ = mapping_manager->mapping ();
100 return 0;
104 Activity::resolve_naming_service ()
106 CORBA::Object_var naming_obj =
107 this->orb_->resolve_initial_references ("NameService");
109 // Need to check return value for errors.
110 if (CORBA::is_nil (naming_obj.in ()))
111 ACE_ERROR_RETURN ((LM_ERROR,
112 " (%P|%t) Unable to resolve the Naming Service.\n"),
113 -1);
115 this->naming_ =
116 CosNaming::NamingContextExt::_narrow (naming_obj.in ());
118 return 0;
121 void
122 Activity::activate_poa_list ()
124 POA_LIST list;
125 int count = builder_->poa_list (list);
127 for (int i = 0; i < count; ++i)
129 list[i]->activate (this->rt_orb_.in(), this->root_poa_.in ());
133 void
134 Activity::activate_job_list ()
136 JOB_LIST list;
137 int count = builder_->job_list (list);
138 Job_i* job;
140 for (int i = 0; i < count; ++i)
142 job = list[i];
144 if (TAO_debug_level > 0)
145 ACE_DEBUG ((LM_DEBUG, "Activating job:%C\n", job->name ().c_str ()));
147 // find your poa
148 PortableServer::POA_var host_poa =
149 root_poa_->find_POA (job->poa ().c_str (), 0);
151 PortableServer::ServantBase_var servant_var (job);
153 // Register with poa.
154 PortableServer::ObjectId_var id;
156 id = host_poa->activate_object (job);
158 CORBA::Object_var server =
159 host_poa->id_to_reference (id.in ());
161 CORBA::String_var ior =
162 orb_->object_to_string (server.in ());
164 const ACE_CString &job_name = job->name ();
166 CosNaming::Name_var name =
167 this->naming_->to_name (job_name.c_str ());
169 this->naming_->rebind (name.in (),
170 server.in ());
172 ACE_DEBUG ((LM_DEBUG,
173 "Registered %C with the naming service\n",
174 job_name.c_str ()));
176 active_job_count_++;
177 } /* while */
180 void
181 Activity::activate_schedule ()
183 TASK_LIST list;
184 int count = builder_->task_list (list);
186 if (TAO_debug_level > 0)
187 ACE_DEBUG ((LM_DEBUG, "Activating schedule, task count = %d\n",
188 count));
190 ACE_NEW (barrier_, ACE_Barrier (count+1));
192 Periodic_Task* task;
194 for (int i = 0; i < count; ++i)
196 task = list[i];
198 // resolve the object from the naming service
199 CosNaming::Name name (1);
200 name.length (1);
201 name[0].id = CORBA::string_dup (task->job ());
203 CORBA::Object_var obj =
204 this->naming_->resolve (name);
206 Job_var job = Job::_narrow (obj.in ());
208 if (TAO_debug_level > 0)
210 // Check that the object is configured with some
211 // PriorityModelPolicy.
212 CORBA::Policy_var policy =
213 job->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE);
215 RTCORBA::PriorityModelPolicy_var priority_policy =
216 RTCORBA::PriorityModelPolicy::_narrow (policy.in ());
218 if (CORBA::is_nil (priority_policy.in ()))
219 ACE_DEBUG ((LM_DEBUG,
220 "ERROR: Priority Model Policy not exposed!\n"));
221 else
223 RTCORBA::PriorityModel priority_model =
224 priority_policy->priority_model ();
226 if (priority_model == RTCORBA::CLIENT_PROPAGATED)
227 ACE_DEBUG ((LM_DEBUG,
228 "%C priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ()));
229 else
230 ACE_DEBUG ((LM_DEBUG,
231 "%C priority_model = RTCORBA::SERVER_DECLARED\n", task->job ()));
233 } /* if (TAO_debug_level > 0) */
235 task->job (job.in ());
236 task->activate_task (this->barrier_, this->priority_mapping_);
237 active_task_count_++;
239 ACE_DEBUG ((LM_DEBUG, "Job %C scheduled\n", task->job ()));
242 ACE_DEBUG ((LM_DEBUG, "(%P,%t) Waiting for tasks to synch...\n"));
243 barrier_->wait ();
244 ACE_DEBUG ((LM_DEBUG, "(%P,%t) Tasks have synched...\n"));
247 void
248 Activity::task_ended (Periodic_Task* /*ended_task*/)
250 ACE_DEBUG ((LM_DEBUG, "Active task count = %d\n",active_task_count_));
252 ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
253 --active_task_count_;
256 this->check_ifexit ();
259 void
260 Activity::job_ended (Job_i* /*ended_job*/)
262 ACE_DEBUG ((LM_DEBUG, "Active job count = %d\n",active_job_count_));
264 ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
265 --active_job_count_;
268 this->check_ifexit ();
271 void
272 Activity::check_ifexit ()
274 // All tasks have finished and all jobs have been shutdown.
275 if (active_task_count_ == 0 && active_job_count_ == 0)
277 ACE_DEBUG ((LM_DEBUG, "Shutdown in progress ...\n"));
278 // ask all tasks to dump stats.
280 TASK_LIST task_list;
281 int count = builder_->task_list (task_list);
283 ACE_TCHAR msg[BUFSIZ];
284 ACE_OS::sprintf (msg, ACE_TEXT("# Stats generated on --\n"));
286 for (int i = 0; i < count; ++i)
288 task_list[i]->dump_stats (msg);
291 // shutdown the ORB
292 orb_->shutdown (false);
296 CORBA::Short
297 Activity::get_server_priority (CORBA::Object_ptr server)
299 // Get the Priority Model Policy from the stub.
300 CORBA::Policy_var policy =
301 server->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE);
303 // Narrow down to correct type.
304 RTCORBA::PriorityModelPolicy_var priority_policy =
305 RTCORBA::PriorityModelPolicy::_narrow (policy.in ());
307 // Make sure that we have the SERVER_DECLARED priority model.
308 RTCORBA::PriorityModel priority_model =
309 priority_policy->priority_model ();
310 if (priority_model != RTCORBA::SERVER_DECLARED)
311 return -1;
313 // Return the server priority.
314 return priority_policy->server_priority ();
317 void
318 Activity::run (int argc, ACE_TCHAR *argv[])
320 this->init (argc, argv);
322 if (this->resolve_naming_service () == -1)
323 return;
325 this->activate_poa_list ();
327 this->activate_job_list ();
329 this->activate_schedule ();
331 this->create_started_flag_file (argc, argv);
333 orb_->run ();
335 ACE_Thread_Manager::instance ()->wait ();
337 CORBA::release (this->naming_);
339 // Hack for proper cleanup.
340 this->builder_->fini ();
342 orb_->destroy ();
345 void
346 Activity::create_started_flag_file (int argc, ACE_TCHAR *argv[])
348 ACE_Arg_Shifter arg_shifter (argc, argv);
350 const ACE_TCHAR* current_arg = 0;
352 while (arg_shifter.is_anything_left ())
354 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-Started_Flag"))))
356 FILE *file = ACE_OS::fopen (current_arg, ACE_TEXT("w"));
358 if (file == 0)
359 ACE_ERROR ((LM_ERROR,
360 "Unable to open %s for writing: %p\n",
361 current_arg));
363 ACE_OS::fprintf (file, "ignore");
365 ACE_OS::fclose (file);
367 arg_shifter.consume_arg ();
369 else
371 arg_shifter.ignore_arg ();
377 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
379 ACE_Service_Config::static_svcs ()->insert (&ace_svc_desc_Builder);
381 ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGHUP);
383 ACE_Timer_Heap timer_queue;
384 ACE_Reactor::instance ()->timer_queue (&timer_queue);
386 int rc = 0;
389 ACTIVITY::instance()->run (argc, argv);
391 catch (const CORBA::Exception& ex)
393 ex._tao_print_exception ("Caught exception:");
394 rc = 1;
397 // reset stack based timer queue
398 ACE_Reactor::instance ()->timer_queue (0);
400 return rc;
403 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton, Activity, ACE_Null_Mutex);