Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / TAO / orbsvcs / FTRT_Event_Service / Event_Service / FT_EventService.cpp
blob40368f9daef5abe3742663cea3f2175a0500ebd0
1 #include "FT_EventService.h"
2 #include "ace/Argv_Type_Converter.h"
3 #include "ace/Thread_Manager.h"
4 #include "ace/Get_Opt.h"
5 #include "ace/INET_Addr.h"
6 #include "ace/SOCK_Connector.h"
7 #include "ace/SOCK_Stream.h"
8 #include "orbsvcs/Sched/Config_Scheduler.h"
9 #include "orbsvcs/Scheduler_Factory.h"
10 #include "orbsvcs/FtRtEvent/EventChannel/FTRTEC_ServiceActivate.h"
11 #include "orbsvcs/FtRtEvent/Utils/Log.h"
12 #include "orbsvcs/Log_Macros.h"
13 #include "ace/OS_main.h"
14 #include "ace/OS_NS_strings.h"
16 int ACE_TMAIN (int argc, ACE_TCHAR* argv[])
18 FT_EventService event_service;
19 return event_service.run (argc, argv);
22 FT_EventService::FT_EventService()
23 : global_scheduler_(0)
24 , sched_impl_(0)
25 , membership_(TAO_FTEC_Event_Channel::UNSPECIFIED)
26 , num_threads_(1)
27 , task_(orb_)
31 FT_EventService::~FT_EventService()
33 delete sched_impl_;
36 int
37 FT_EventService::run(int argc, ACE_TCHAR* argv[])
39 try
41 // Initialize ORB.
42 orb_ = CORBA::ORB_init (argc, argv);
44 if (this->parse_args (argc, argv) == -1)
45 return 1;
47 CORBA::Object_var root_poa_object =
48 orb_->resolve_initial_references("RootPOA");
49 if (CORBA::is_nil (root_poa_object.in ()))
50 ORBSVCS_ERROR_RETURN ((LM_ERROR,
51 " (%P|%t) Unable to initialize the root POA.\n"),
52 1);
54 PortableServer::POA_var root_poa =
55 PortableServer::POA::_narrow (root_poa_object.in ());
57 PortableServer::POAManager_var poa_manager =
58 root_poa->the_POAManager ();
60 poa_manager->activate ();
62 CORBA::Object_var naming_obj =
63 orb_->resolve_initial_references ("NameService");
64 if (CORBA::is_nil (naming_obj.in ()))
65 ORBSVCS_ERROR_RETURN ((LM_ERROR,
66 " (%P|%t) Unable to initialize the Naming Service.\n"),
67 1);
69 CosNaming::NamingContext_var naming_context =
70 CosNaming::NamingContext::_narrow (naming_obj.in ());
72 setup_scheduler(naming_context.in());
74 poa_manager->activate();
76 // Activate the Event channel implementation
78 TAO_FTEC_Event_Channel ec(orb_, root_poa);
80 FtRtecEventChannelAdmin::EventChannel_var ec_ior =
81 ec.activate(membership_);
83 if (report_factory(orb_.in(), ec_ior.in() )==-1)
84 return -1;
86 orb_->run();
88 catch (const CORBA::Exception& ex)
90 ex._tao_print_exception ("A CORBA Exception occurred.");
91 return -1;
95 ACE_Thread_Manager::instance()->wait();
96 return 0;
99 int
100 FT_EventService::parse_args (int argc, ACE_TCHAR* argv [])
102 /// get the membership from the environment variable
103 char* member = ACE_OS::getenv("FTEC_MEMBERSHIP");
105 membership_ = TAO_FTEC_Event_Channel::UNSPECIFIED;
107 if (member) {
108 if (ACE_OS::strcasecmp(member, "PRIMARY")==0) {
109 membership_ = TAO_FTEC_Event_Channel::PRIMARY;
111 else if (ACE_OS::strcasecmp(member, "BACKUP")==0) {
112 membership_ = TAO_FTEC_Event_Channel::BACKUP;
116 char* n_threads = ACE_OS::getenv("FTEC_NUM_THREAD");
118 this->num_threads_ = 1;
119 if (n_threads)
120 this->num_threads_ = ACE_OS::atoi(n_threads);
122 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("d:jn:ps:"));
123 int opt;
125 while ((opt = get_opt ()) != EOF)
127 switch (opt)
129 case 'd':
130 TAO_FTRTEC::Log::level(ACE_OS::atoi(get_opt.opt_arg ()));
131 break;
132 case 'j':
133 this->membership_ = TAO_FTEC_Event_Channel::BACKUP;
134 break;
135 case 'n':
136 this->num_threads_ = ACE_OS::atoi(get_opt.opt_arg ());
137 break;
138 case 'p':
139 this->membership_ = TAO_FTEC_Event_Channel::PRIMARY;
140 break;
141 case 's':
142 // It could be just a flag (i.e. no "global" or "local"
143 // argument, but this is consistent with the EC_Multiple
144 // test and also allows for a runtime scheduling service.
146 if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT("global")) == 0)
148 this->global_scheduler_ = 1;
150 else if (ACE_OS::strcasecmp (get_opt.opt_arg (), ACE_TEXT("local")) == 0)
152 this->global_scheduler_ = 0;
154 else
156 ORBSVCS_DEBUG ((LM_DEBUG,
157 ACE_TEXT("Unknown scheduling type <%s> ")
158 ACE_TEXT("defaulting to local\n"),
159 get_opt.opt_arg ()));
160 this->global_scheduler_ = 0;
162 break;
164 case '?':
165 default:
166 ORBSVCS_DEBUG ((LM_DEBUG,
167 ACE_TEXT("Usage: %s\n")
168 ACE_TEXT(" -j join the object group\n")
169 ACE_TEXT(" -p set as primary\n")
170 ACE_TEXT(" -s <global|local>\n")
171 ACE_TEXT("\n"),
172 argv[0]));
173 return -1;
177 if (this->num_threads_ < 1)
178 ORBSVCS_ERROR_RETURN((LM_ERROR, "Invalid number of threads specified\n"), -1);
180 return 0;
183 void
184 FT_EventService::setup_scheduler(CosNaming::NamingContext_ptr naming_context)
186 RtecScheduler::Scheduler_var scheduler;
187 if (CORBA::is_nil(naming_context)) {
188 ACE_NEW_THROW_EX (this->sched_impl_,
189 ACE_Config_Scheduler,
190 CORBA::NO_MEMORY());
192 scheduler = this->sched_impl_->_this ();
194 if (ACE_Scheduler_Factory::server(scheduler.in()) == -1)
195 ORBSVCS_ERROR((LM_ERROR,"Unable to install scheduler\n"));
197 else {
198 // This is the name we (potentially) register the Scheduling
199 // Service in the Naming Service.
200 CosNaming::Name schedule_name (1);
201 schedule_name.length (1);
202 schedule_name[0].id = CORBA::string_dup ("ScheduleService");
205 if (1)
207 // We must find the scheduler object reference...
209 if (this->global_scheduler_ == 0)
211 ACE_NEW_THROW_EX (this->sched_impl_,
212 ACE_Config_Scheduler,
213 CORBA::NO_MEMORY());
215 scheduler = this->sched_impl_->_this ();
217 // Register the servant with the Naming Context....
218 naming_context->rebind (schedule_name, scheduler.in ());
220 else
222 CORBA::Object_var tmp =
223 naming_context->resolve (schedule_name);
225 scheduler = RtecScheduler::Scheduler::_narrow (tmp.in ());
232 FT_EventService::report_factory(CORBA::ORB_ptr orb,
233 FtRtecEventChannelAdmin::EventChannel_ptr ec)
235 try{
236 char* addr = ACE_OS::getenv("EventChannelFactoryAddr");
238 if (addr != 0) {
239 // instaniated by object factory, report my ior back to the factory
240 ACE_INET_Addr factory_addr(addr);
241 ACE_SOCK_Connector connector;
242 ACE_SOCK_Stream stream;
244 ORBSVCS_DEBUG((LM_DEBUG,"connecting to %s\n",addr));
245 if (connector.connect(stream, factory_addr) == -1)
246 ORBSVCS_ERROR_RETURN((LM_ERROR, "(%P|%t) Invalid Factory Address\n"), -1);
248 ORBSVCS_DEBUG((LM_DEBUG,"Factory connected\n"));
249 CORBA::String_var my_ior_string = orb->object_to_string(ec);
251 int const len = ACE_OS::strlen(my_ior_string.in()) ;
253 if (stream.send_n(my_ior_string.in(), len) != len)
254 ORBSVCS_ERROR_RETURN((LM_ERROR, "(%P|%t) IOR Transmission Error\n"), -1);
256 stream.close();
259 catch (...){
260 return -1;
262 return 0;
265 void FT_EventService::become_primary()
267 if (this->num_threads_ > 1) {
268 task_.activate(THR_NEW_LWP | THR_JOINABLE, num_threads_-1);