1 // Adapted from: $TAO_ROOT/orbsvcs/examples/RtEC/MCast
5 #include "AddrServer.h"
6 #include "orbsvcs/Event_Service_Constants.h"
7 #include "orbsvcs/Event/EC_Event_Channel.h"
8 #include "orbsvcs/Event/EC_Default_Factory.h"
9 #include "orbsvcs/Event/ECG_Mcast_EH.h"
10 #include "orbsvcs/Event/ECG_UDP_Sender.h"
11 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
12 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
13 #include "tao/Strategies/advanced_resource.h"
14 #include "tao/ORB_Core.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/OS_NS_unistd.h"
18 const ACE_TCHAR
*udp_mcast_address
=
19 ACE_TEXT (ACE_DEFAULT_MULTICAST_ADDR
) ACE_TEXT(":10001");
21 static CORBA::ORB_var orb
= CORBA::ORB::_nil ();
22 static bool terminate_threads
= false;
23 static const unsigned pool_size
= 2;
24 static const int data_items
= 60000;
27 run_orb_within_thread (void *)
29 while (! terminate_threads
)
33 CORBA::Boolean there_is_work
=
37 // We use a TAO extension. The CORBA mechanism does not
38 // provide any decent way to control the duration of
39 // perform_work() or work_pending(), so just calling
40 // them results in a spin loop.
41 ACE_Time_Value
tv (0, 50000);
42 orb
->perform_work (tv
);
45 catch (const CORBA::Exception
& ex
)
47 ex
._tao_print_exception ("perform work");
56 int parse_args (int argc
, ACE_TCHAR
*argv
[]);
59 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
61 // Register the default factory in the Service Configurator.
62 // If your platform supports static constructors then you can
63 // simply using the ACE_STATIC_SVC_DEFINE() macro, unfortunately TAO
64 // must run on platforms where static constructors do not work well,
65 // so we have to explicitly invoke this function.
66 TAO_EC_Default_Factory::init_svcs ();
70 // **************** HERE IS THE ORB SETUP
72 // Create the ORB, pass the argv list for parsing.
73 orb
= CORBA::ORB_init (argc
, argv
);
75 // Parse the arguments, you usually want to do this after
76 // invoking ORB_init() because ORB_init() will remove all the
77 // -ORB options from the command line.
78 if (parse_args (argc
, argv
) == -1)
81 "Usage: Service [-m udp_mcast_addr]\n"));
85 // This is the standard code to get access to the POA and
87 // The POA starts in the holding state, if it is not activated
88 // it will not process any requests.
89 CORBA::Object_var object
=
90 orb
->resolve_initial_references ("RootPOA");
91 PortableServer::POA_var poa
=
92 PortableServer::POA::_narrow (object
.in ());
93 PortableServer::POAManager_var poa_manager
=
94 poa
->the_POAManager ();
95 poa_manager
->activate ();
97 // **************** THAT COMPLETES THE ORB SETUP
99 // **************** HERE IS THE LOCAL EVENT CHANNEL SETUP
101 // This structure is used to define the startup time event
102 // channel configuration.
103 // This structure is described in
105 // $TAO_ROOT/docs/ec_options.html
107 TAO_EC_Event_Channel_Attributes
attributes (poa
.in (),
110 // Create the Event Channel implementation class
111 TAO_EC_Event_Channel
ec_impl (attributes
);
113 // Activate the Event Channel, depending on the configuration
114 // that may involve creating some threads.
115 // But it should always be invoked because several internal data
116 // structures are initialized at that point.
119 // The event channel is activated as any other CORBA servant.
120 // In this case we use the simple implicit activation with the
122 RtecEventChannelAdmin::EventChannel_var event_channel
=
125 // **************** THAT COMPLETES THE LOCAL EVENT CHANNEL SETUP
127 // **************** HERE IS THE FEDERATION SETUP
129 // The next step is to setup the multicast gateways.
130 // There are two gateways involved, one sends the locally
131 // generated events to the federated peers, the second gateway
132 // receives multicast traffic and turns it into local events.
134 // The sender requires a helper object to select what
135 // multicast group will carry what traffic, this is the
136 // so-called 'Address Server'.
137 // The intention is that advanced applications can use different
138 // multicast groups for different events, this can exploit
139 // network interfaces that filter unwanted multicast traffic.
140 // The helper object is accessed through an IDL interface, so it
141 // can reside remotely.
142 // In this example, and in many application, using a fixed
143 // multicast group is enough, and a local address server is the
146 // First we convert the string into an INET address, then we
147 // convert that into the right IDL structure:
148 ACE_INET_Addr
udp_addr (ACE_TEXT_ALWAYS_CHAR(udp_mcast_address
));
149 ACE_DEBUG ((LM_DEBUG
,
150 "Multicast address is: %s\n",
153 // Now we create and activate the servant
154 AddrServer
as_impl (udp_addr
);
155 RtecUDPAdmin::AddrServer_var address_server
=
158 // We need a local socket to send the data, open it and check
159 // that everything is OK:
160 TAO_ECG_UDP_Out_Endpoint
* endpointptr
= 0;
162 ACE_NEW_RETURN (endpointptr
, TAO_ECG_UDP_Out_Endpoint
, 0);
164 TAO_ECG_Refcounted_Endpoint
endpoint (endpointptr
);
165 if (endpoint
->dgram ().open (ACE_Addr::sap_any
,
166 udp_addr
.get_type()) == -1)
168 ACE_ERROR_RETURN ((LM_ERROR
, "Cannot open send endpoint\n"),
172 // Now we setup the sender:
173 PortableServer::Servant_var
<TAO_ECG_UDP_Sender
> sender
;
174 sender
= TAO_ECG_UDP_Sender::create();
176 sender
->init (event_channel
.in (),
177 address_server
.in (),
180 // Now we connect the sender as a consumer of events, it will
181 // receive any event from any source and send it to the "right"
182 // multicast group, as defined by the address server set above:
183 RtecEventChannelAdmin::ConsumerQOS sub
;
186 sub
.dependencies
.length (1);
187 sub
.dependencies
[0].event
.header
.type
=
188 ACE_ES_EVENT_ANY
; // first free event type
189 sub
.dependencies
[0].event
.header
.source
=
190 ACE_ES_EVENT_SOURCE_ANY
; // Any source is OK
192 sender
->connect (sub
);
194 // To receive events we need to setup an event handler:
195 PortableServer::Servant_var
<TAO_ECG_UDP_Receiver
> receiver
;
196 receiver
= TAO_ECG_UDP_Receiver::create();
198 TAO_ECG_Mcast_EH
mcast_eh (&*receiver
);
200 // The event handler uses the ORB reactor to wait for multicast
202 mcast_eh
.reactor (orb
->orb_core ()->reactor ());
204 // The multicast Event Handler needs to know to what multicast
205 // groups it should listen to. To do so it becomes an observer
206 // with the event channel, to determine the list of events
207 // required by all the local consumer.
208 // Then it register for the multicast groups that carry those
210 mcast_eh
.open (event_channel
.in ());
212 // Again the receiver connects to the event channel as a
213 // supplier of events, using the Observer features to detect
214 // local consumers and their interests:
215 receiver
->init (event_channel
.in (),
217 address_server
.in ());
219 // The Receiver is also a supplier of events. The exact type of
220 // events is only known to the application, because it depends
221 // on the traffic carried by all the multicast groups that the
222 // different event handlers subscribe to.
223 // In this example we choose to simply describe our publications
224 // using wilcards, any event from any source. More advanced
225 // application could use the Observer features in the event
226 // channel to update this information (and reduce the number of
227 // multicast groups that each receive subscribes to).
228 // In a future version the event channel could perform some of
229 // those tasks automatically
230 RtecEventChannelAdmin::SupplierQOS pub
;
231 pub
.publications
.length (1);
232 pub
.publications
[0].event
.header
.type
= ACE_ES_EVENT_ANY
;
233 pub
.publications
[0].event
.header
.source
= ACE_ES_EVENT_SOURCE_ANY
;
236 receiver
->connect (pub
);
238 // **************** THAT COMPLETES THE FEDERATION SETUP
240 // **************** HERE IS THE CLIENT SETUP
242 // First let us create consumers and connect them to the event
246 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
247 event_channel
->for_consumers ();
248 consumer1
.connect (consumer_admin
.in ());
249 consumer2
.connect (consumer_admin
.in ());
251 // And now create a supplier
253 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin
=
254 event_channel
->for_suppliers ();
255 supplier
.connect (supplier_admin
.in ());
257 // **************** THAT COMPLETES THE CLIENT SETUP
259 // **************** HERE IS THE EVENT LOOP
261 // creating thread pool
262 ACE_Thread_Manager the_ace_manager
;
263 the_ace_manager
.open ();
264 int thread_pool_id
= the_ace_manager
.spawn_n (
265 pool_size
, ACE_THR_FUNC (run_orb_within_thread
), 0, THR_DETACHED
| THR_NEW_LWP
);
266 if (thread_pool_id
== -1) {
267 ACE_ERROR_RETURN ((LM_ERROR
, "Cannot spawn thread pool\n"), 1);
269 ACE_OS::sleep (1); // simple solution ensures ready thread pool
271 for (int i
= 0; i
< data_items
; i
++)
273 supplier
.perform_push ();
276 ACE_OS::sleep (2); // simple solution ensures ready receivers
277 terminate_threads
= true; // terminate thread pool
279 the_ace_manager
.wait(); // wait until all threads in the pool are stopped
281 the_ace_manager
.close ();
283 // **************** THAT COMPLETES THE EVENT LOOP
285 // **************** HERE IS THE CLEANUP CODE
287 // First the easy ones
288 supplier
.disconnect ();
289 consumer1
.disconnect ();
290 consumer2
.disconnect ();
292 // Now let us disconnect the Receiver
293 receiver
->shutdown ();
295 int r
= mcast_eh
.shutdown ();
299 ACE_ERROR_RETURN ((LM_ERROR
,
300 "Closing MCast event handler\n"), 1);
303 // And also disconnect the sender of events
306 // The event channel must be destroyed, so it can release its
307 // resources, and inform all the clients that are still
308 // connected that it is going away.
309 event_channel
->destroy ();
311 // Deactivating the event channel implementation is not strictly
312 // required, the POA will do it for us, but it is good manners:
314 // Using _this() activates with the default POA, we must gain
315 // access to that POA to deactivate the object.
316 // Notice that we 'know' that the default POA for this servant
317 // is the root POA, but the code is more robust if we don't
319 PortableServer::POA_var poa
=
320 ec_impl
._default_POA ();
321 // Get the Object Id used for the servant..
322 PortableServer::ObjectId_var oid
=
323 poa
->servant_to_id (&ec_impl
);
324 // Deactivate the object
325 poa
->deactivate_object (oid
.in ());
328 // Now we can destroy the POA, the flags mean that we want to
329 // wait until the POA is really destroyed
330 poa
->destroy (true, true);
332 // Finally destroy the ORB
335 // **************** THAT COMPLETES THE CLEANUP CODE
337 ACE_DEBUG ((LM_DEBUG
,
338 "MCast example finished\n"));
340 catch (const CORBA::Exception
& ex
)
342 ex
._tao_print_exception ("Service");
348 // ****************************************************************
350 int parse_args (int argc
, ACE_TCHAR
*argv
[])
352 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("m:"));
355 while ((c
= get_opts ()) != -1)
359 udp_mcast_address
= get_opts
.opt_arg ();
364 ACE_ERROR_RETURN ((LM_ERROR
,
366 "[-m udp_mcast_address]"
371 // Indicates successful parsing of the command line