1 #include "FT_EventService.h"
2 #include "ace/Argv_Type_Converter.h"
3 #include "ace/Thread_Manager.h"
4 #include "ace/Get_Opt.h"
5 #include "ace/INET_Addr.h"
6 #include "ace/SOCK_Connector.h"
7 #include "ace/SOCK_Stream.h"
8 #include "orbsvcs/Sched/Config_Scheduler.h"
9 #include "orbsvcs/Scheduler_Factory.h"
10 #include "orbsvcs/FtRtEvent/EventChannel/FTRTEC_ServiceActivate.h"
11 #include "orbsvcs/FtRtEvent/Utils/Log.h"
12 #include "orbsvcs/Log_Macros.h"
13 #include "ace/OS_main.h"
14 #include "ace/OS_NS_strings.h"
16 int ACE_TMAIN (int argc
, ACE_TCHAR
* argv
[])
18 FT_EventService event_service
;
19 return event_service
.run (argc
, argv
);
22 FT_EventService::FT_EventService()
23 : global_scheduler_(0)
25 , membership_(TAO_FTEC_Event_Channel::UNSPECIFIED
)
31 FT_EventService::~FT_EventService()
37 FT_EventService::run(int argc
, ACE_TCHAR
* argv
[])
42 orb_
= CORBA::ORB_init (argc
, argv
);
44 if (this->parse_args (argc
, argv
) == -1)
47 CORBA::Object_var root_poa_object
=
48 orb_
->resolve_initial_references("RootPOA");
49 if (CORBA::is_nil (root_poa_object
.in ()))
50 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
51 " (%P|%t) Unable to initialize the root POA.\n"),
54 PortableServer::POA_var root_poa
=
55 PortableServer::POA::_narrow (root_poa_object
.in ());
57 PortableServer::POAManager_var poa_manager
=
58 root_poa
->the_POAManager ();
60 poa_manager
->activate ();
62 CORBA::Object_var naming_obj
=
63 orb_
->resolve_initial_references ("NameService");
64 if (CORBA::is_nil (naming_obj
.in ()))
65 ORBSVCS_ERROR_RETURN ((LM_ERROR
,
66 " (%P|%t) Unable to initialize the Naming Service.\n"),
69 CosNaming::NamingContext_var naming_context
=
70 CosNaming::NamingContext::_narrow (naming_obj
.in ());
72 setup_scheduler(naming_context
.in());
74 poa_manager
->activate();
76 // Activate the Event channel implementation
78 TAO_FTEC_Event_Channel
ec(orb_
, root_poa
);
80 FtRtecEventChannelAdmin::EventChannel_var ec_ior
=
81 ec
.activate(membership_
);
83 if (report_factory(orb_
.in(), ec_ior
.in() )==-1)
88 catch (const CORBA::Exception
& ex
)
90 ex
._tao_print_exception ("A CORBA Exception occurred.");
95 ACE_Thread_Manager::instance()->wait();
100 FT_EventService::parse_args (int argc
, ACE_TCHAR
* argv
[])
102 /// get the membership from the environment variable
103 char* member
= ACE_OS::getenv("FTEC_MEMBERSHIP");
105 membership_
= TAO_FTEC_Event_Channel::UNSPECIFIED
;
108 if (ACE_OS::strcasecmp(member
, "PRIMARY")==0) {
109 membership_
= TAO_FTEC_Event_Channel::PRIMARY
;
111 else if (ACE_OS::strcasecmp(member
, "BACKUP")==0) {
112 membership_
= TAO_FTEC_Event_Channel::BACKUP
;
116 char* n_threads
= ACE_OS::getenv("FTEC_NUM_THREAD");
118 this->num_threads_
= 1;
120 this->num_threads_
= ACE_OS::atoi(n_threads
);
122 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("d:jn:ps:"));
125 while ((opt
= get_opt ()) != EOF
)
130 TAO_FTRTEC::Log::level(ACE_OS::atoi(get_opt
.opt_arg ()));
133 this->membership_
= TAO_FTEC_Event_Channel::BACKUP
;
136 this->num_threads_
= ACE_OS::atoi(get_opt
.opt_arg ());
139 this->membership_
= TAO_FTEC_Event_Channel::PRIMARY
;
142 // It could be just a flag (i.e. no "global" or "local"
143 // argument, but this is consistent with the EC_Multiple
144 // test and also allows for a runtime scheduling service.
146 if (ACE_OS::strcasecmp (get_opt
.opt_arg (), ACE_TEXT("global")) == 0)
148 this->global_scheduler_
= 1;
150 else if (ACE_OS::strcasecmp (get_opt
.opt_arg (), ACE_TEXT("local")) == 0)
152 this->global_scheduler_
= 0;
156 ORBSVCS_DEBUG ((LM_DEBUG
,
157 ACE_TEXT("Unknown scheduling type <%s> ")
158 ACE_TEXT("defaulting to local\n"),
159 get_opt
.opt_arg ()));
160 this->global_scheduler_
= 0;
166 ORBSVCS_DEBUG ((LM_DEBUG
,
167 ACE_TEXT("Usage: %s\n")
168 ACE_TEXT(" -j join the object group\n")
169 ACE_TEXT(" -p set as primary\n")
170 ACE_TEXT(" -s <global|local>\n")
177 if (this->num_threads_
< 1)
178 ORBSVCS_ERROR_RETURN((LM_ERROR
, "Invalid number of threads specified\n"), -1);
184 FT_EventService::setup_scheduler(CosNaming::NamingContext_ptr naming_context
)
186 RtecScheduler::Scheduler_var scheduler
;
187 if (CORBA::is_nil(naming_context
)) {
188 ACE_NEW_THROW_EX (this->sched_impl_
,
189 ACE_Config_Scheduler
,
192 scheduler
= this->sched_impl_
->_this ();
194 if (ACE_Scheduler_Factory::server(scheduler
.in()) == -1)
195 ORBSVCS_ERROR((LM_ERROR
,"Unable to install scheduler\n"));
198 // This is the name we (potentially) register the Scheduling
199 // Service in the Naming Service.
200 CosNaming::Name
schedule_name (1);
201 schedule_name
.length (1);
202 schedule_name
[0].id
= CORBA::string_dup ("ScheduleService");
207 // We must find the scheduler object reference...
209 if (this->global_scheduler_
== 0)
211 ACE_NEW_THROW_EX (this->sched_impl_
,
212 ACE_Config_Scheduler
,
215 scheduler
= this->sched_impl_
->_this ();
217 // Register the servant with the Naming Context....
218 naming_context
->rebind (schedule_name
, scheduler
.in ());
222 CORBA::Object_var tmp
=
223 naming_context
->resolve (schedule_name
);
225 scheduler
= RtecScheduler::Scheduler::_narrow (tmp
.in ());
232 FT_EventService::report_factory(CORBA::ORB_ptr orb
,
233 FtRtecEventChannelAdmin::EventChannel_ptr ec
)
236 char* addr
= ACE_OS::getenv("EventChannelFactoryAddr");
239 // instaniated by object factory, report my ior back to the factory
240 ACE_INET_Addr
factory_addr(addr
);
241 ACE_SOCK_Connector connector
;
242 ACE_SOCK_Stream stream
;
244 ORBSVCS_DEBUG((LM_DEBUG
,"connecting to %s\n",addr
));
245 if (connector
.connect(stream
, factory_addr
) == -1)
246 ORBSVCS_ERROR_RETURN((LM_ERROR
, "(%P|%t) Invalid Factory Address\n"), -1);
248 ORBSVCS_DEBUG((LM_DEBUG
,"Factory connected\n"));
249 CORBA::String_var my_ior_string
= orb
->object_to_string(ec
);
251 size_t const len
= ACE_OS::strlen(my_ior_string
.in()) ;
253 if (stream
.send_n(my_ior_string
.in(), len
) != len
)
254 ORBSVCS_ERROR_RETURN((LM_ERROR
, "(%P|%t) IOR Transmission Error\n"), -1);
265 void FT_EventService::become_primary()
267 if (this->num_threads_
> 1) {
268 task_
.activate(THR_NEW_LWP
| THR_JOINABLE
, num_threads_
-1);