Changes to attempt to silence bcc64x
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_MT_Mcast / MCast.cpp
blob876f00d0c1bc088e26139d76b23ab96340f0f563
1 // Adapted from: $TAO_ROOT/orbsvcs/examples/RtEC/MCast
3 #include "Consumer.h"
4 #include "Supplier.h"
5 #include "AddrServer.h"
6 #include "orbsvcs/Event_Service_Constants.h"
7 #include "orbsvcs/Event/EC_Event_Channel.h"
8 #include "orbsvcs/Event/EC_Default_Factory.h"
9 #include "orbsvcs/Event/ECG_Mcast_EH.h"
10 #include "orbsvcs/Event/ECG_UDP_Sender.h"
11 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
12 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
13 #include "tao/Strategies/advanced_resource.h"
14 #include "tao/ORB_Core.h"
15 #include "ace/Get_Opt.h"
16 #include "ace/OS_NS_unistd.h"
18 const ACE_TCHAR *udp_mcast_address =
19 ACE_TEXT (ACE_DEFAULT_MULTICAST_ADDR) ACE_TEXT(":10001");
21 static CORBA::ORB_var orb = CORBA::ORB::_nil ();
22 static bool terminate_threads = false;
23 static const unsigned pool_size = 2;
24 static const int data_items = 60000;
26 void *
27 run_orb_within_thread (void *)
29 while (! terminate_threads)
31 try
33 CORBA::Boolean there_is_work =
34 orb->work_pending ();
35 if (there_is_work)
37 // We use a TAO extension. The CORBA mechanism does not
38 // provide any decent way to control the duration of
39 // perform_work() or work_pending(), so just calling
40 // them results in a spin loop.
41 ACE_Time_Value tv (0, 50000);
42 orb->perform_work (tv);
45 catch (const CORBA::Exception& ex)
47 ex._tao_print_exception ("perform work");
49 return 0;
53 return 0;
56 int parse_args (int argc, ACE_TCHAR *argv[]);
58 int
59 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
61 // Register the default factory in the Service Configurator.
62 // If your platform supports static constructors then you can
63 // simply using the ACE_STATIC_SVC_DEFINE() macro, unfortunately TAO
64 // must run on platforms where static constructors do not work well,
65 // so we have to explicitly invoke this function.
66 TAO_EC_Default_Factory::init_svcs ();
68 try
70 // **************** HERE IS THE ORB SETUP
72 // Create the ORB, pass the argv list for parsing.
73 orb = CORBA::ORB_init (argc, argv);
75 // Parse the arguments, you usually want to do this after
76 // invoking ORB_init() because ORB_init() will remove all the
77 // -ORB options from the command line.
78 if (parse_args (argc, argv) == -1)
80 ACE_ERROR ((LM_ERROR,
81 "Usage: Service [-m udp_mcast_addr]\n"));
82 return 1;
85 // This is the standard code to get access to the POA and
86 // activate it.
87 // The POA starts in the holding state, if it is not activated
88 // it will not process any requests.
89 CORBA::Object_var object =
90 orb->resolve_initial_references ("RootPOA");
91 PortableServer::POA_var poa =
92 PortableServer::POA::_narrow (object.in ());
93 PortableServer::POAManager_var poa_manager =
94 poa->the_POAManager ();
95 poa_manager->activate ();
97 // **************** THAT COMPLETES THE ORB SETUP
99 // **************** HERE IS THE LOCAL EVENT CHANNEL SETUP
101 // This structure is used to define the startup time event
102 // channel configuration.
103 // This structure is described in
105 // $TAO_ROOT/docs/ec_options.html
107 TAO_EC_Event_Channel_Attributes attributes (poa.in (),
108 poa.in ());
110 // Create the Event Channel implementation class
111 TAO_EC_Event_Channel ec_impl (attributes);
113 // Activate the Event Channel, depending on the configuration
114 // that may involve creating some threads.
115 // But it should always be invoked because several internal data
116 // structures are initialized at that point.
117 ec_impl.activate ();
119 // The event channel is activated as any other CORBA servant.
120 // In this case we use the simple implicit activation with the
121 // RootPOA
122 RtecEventChannelAdmin::EventChannel_var event_channel =
123 ec_impl._this ();
125 // **************** THAT COMPLETES THE LOCAL EVENT CHANNEL SETUP
127 // **************** HERE IS THE FEDERATION SETUP
129 // The next step is to setup the multicast gateways.
130 // There are two gateways involved, one sends the locally
131 // generated events to the federated peers, the second gateway
132 // receives multicast traffic and turns it into local events.
134 // The sender requires a helper object to select what
135 // multicast group will carry what traffic, this is the
136 // so-called 'Address Server'.
137 // The intention is that advanced applications can use different
138 // multicast groups for different events, this can exploit
139 // network interfaces that filter unwanted multicast traffic.
140 // The helper object is accessed through an IDL interface, so it
141 // can reside remotely.
142 // In this example, and in many application, using a fixed
143 // multicast group is enough, and a local address server is the
144 // right approach.
146 // First we convert the string into an INET address, then we
147 // convert that into the right IDL structure:
148 ACE_INET_Addr udp_addr (ACE_TEXT_ALWAYS_CHAR(udp_mcast_address));
149 ACE_DEBUG ((LM_DEBUG,
150 "Multicast address is: %s\n",
151 udp_mcast_address));
153 // Now we create and activate the servant
154 AddrServer as_impl (udp_addr);
155 RtecUDPAdmin::AddrServer_var address_server =
156 as_impl._this ();
158 // We need a local socket to send the data, open it and check
159 // that everything is OK:
160 TAO_ECG_UDP_Out_Endpoint* endpointptr = 0;
162 ACE_NEW_RETURN (endpointptr, TAO_ECG_UDP_Out_Endpoint, 0);
164 TAO_ECG_Refcounted_Endpoint endpoint (endpointptr);
165 if (endpoint->dgram ().open (ACE_Addr::sap_any,
166 udp_addr.get_type()) == -1)
168 ACE_ERROR_RETURN ((LM_ERROR, "Cannot open send endpoint\n"),
172 // Now we setup the sender:
173 PortableServer::Servant_var<TAO_ECG_UDP_Sender> sender;
174 sender = TAO_ECG_UDP_Sender::create();
176 sender->init (event_channel.in (),
177 address_server.in (),
178 endpoint);
180 // Now we connect the sender as a consumer of events, it will
181 // receive any event from any source and send it to the "right"
182 // multicast group, as defined by the address server set above:
183 RtecEventChannelAdmin::ConsumerQOS sub;
184 sub.is_gateway = 1;
186 sub.dependencies.length (1);
187 sub.dependencies[0].event.header.type =
188 ACE_ES_EVENT_ANY; // first free event type
189 sub.dependencies[0].event.header.source =
190 ACE_ES_EVENT_SOURCE_ANY; // Any source is OK
192 sender->connect (sub);
194 // To receive events we need to setup an event handler:
195 PortableServer::Servant_var<TAO_ECG_UDP_Receiver> receiver;
196 receiver = TAO_ECG_UDP_Receiver::create();
198 TAO_ECG_Mcast_EH mcast_eh (&*receiver);
200 // The event handler uses the ORB reactor to wait for multicast
201 // traffic:
202 mcast_eh.reactor (orb->orb_core ()->reactor ());
204 // The multicast Event Handler needs to know to what multicast
205 // groups it should listen to. To do so it becomes an observer
206 // with the event channel, to determine the list of events
207 // required by all the local consumer.
208 // Then it register for the multicast groups that carry those
209 // events:
210 mcast_eh.open (event_channel.in ());
212 // Again the receiver connects to the event channel as a
213 // supplier of events, using the Observer features to detect
214 // local consumers and their interests:
215 receiver->init (event_channel.in (),
216 endpoint,
217 address_server.in ());
219 // The Receiver is also a supplier of events. The exact type of
220 // events is only known to the application, because it depends
221 // on the traffic carried by all the multicast groups that the
222 // different event handlers subscribe to.
223 // In this example we choose to simply describe our publications
224 // using wilcards, any event from any source. More advanced
225 // application could use the Observer features in the event
226 // channel to update this information (and reduce the number of
227 // multicast groups that each receive subscribes to).
228 // In a future version the event channel could perform some of
229 // those tasks automatically
230 RtecEventChannelAdmin::SupplierQOS pub;
231 pub.publications.length (1);
232 pub.publications[0].event.header.type = ACE_ES_EVENT_ANY;
233 pub.publications[0].event.header.source = ACE_ES_EVENT_SOURCE_ANY;
234 pub.is_gateway = 1;
236 receiver->connect (pub);
238 // **************** THAT COMPLETES THE FEDERATION SETUP
240 // **************** HERE IS THE CLIENT SETUP
242 // First let us create consumers and connect them to the event
243 // channel
244 Consumer consumer1;
245 Consumer consumer2;
246 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
247 event_channel->for_consumers ();
248 consumer1.connect (consumer_admin.in ());
249 consumer2.connect (consumer_admin.in ());
251 // And now create a supplier
252 Supplier supplier;
253 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
254 event_channel->for_suppliers ();
255 supplier.connect (supplier_admin.in ());
257 // **************** THAT COMPLETES THE CLIENT SETUP
259 // **************** HERE IS THE EVENT LOOP
261 // creating thread pool
262 ACE_Thread_Manager the_ace_manager;
263 the_ace_manager.open ();
264 int thread_pool_id = the_ace_manager.spawn_n (
265 pool_size, ACE_THR_FUNC (run_orb_within_thread), 0, THR_DETACHED | THR_NEW_LWP);
266 if (thread_pool_id == -1) {
267 ACE_ERROR_RETURN ((LM_ERROR, "Cannot spawn thread pool\n"), 1);
269 ACE_OS::sleep (1); // simple solution ensures ready thread pool
271 for (int i = 0; i < data_items; i++)
273 supplier.perform_push ();
276 ACE_OS::sleep (2); // simple solution ensures ready receivers
277 terminate_threads = true; // terminate thread pool
279 the_ace_manager.wait(); // wait until all threads in the pool are stopped
281 the_ace_manager.close ();
283 // **************** THAT COMPLETES THE EVENT LOOP
285 // **************** HERE IS THE CLEANUP CODE
287 // First the easy ones
288 supplier.disconnect ();
289 consumer1.disconnect ();
290 consumer2.disconnect ();
292 // Now let us disconnect the Receiver
293 receiver->shutdown ();
295 int r = mcast_eh.shutdown ();
297 if (r == -1)
299 ACE_ERROR_RETURN ((LM_ERROR,
300 "Closing MCast event handler\n"), 1);
303 // And also disconnect the sender of events
304 sender->shutdown ();
306 // The event channel must be destroyed, so it can release its
307 // resources, and inform all the clients that are still
308 // connected that it is going away.
309 event_channel->destroy ();
311 // Deactivating the event channel implementation is not strictly
312 // required, the POA will do it for us, but it is good manners:
314 // Using _this() activates with the default POA, we must gain
315 // access to that POA to deactivate the object.
316 // Notice that we 'know' that the default POA for this servant
317 // is the root POA, but the code is more robust if we don't
318 // rely on that.
319 PortableServer::POA_var poa =
320 ec_impl._default_POA ();
321 // Get the Object Id used for the servant..
322 PortableServer::ObjectId_var oid =
323 poa->servant_to_id (&ec_impl);
324 // Deactivate the object
325 poa->deactivate_object (oid.in ());
328 // Now we can destroy the POA, the flags mean that we want to
329 // wait until the POA is really destroyed
330 poa->destroy (true, true);
332 // Finally destroy the ORB
333 orb->destroy ();
335 // **************** THAT COMPLETES THE CLEANUP CODE
337 ACE_DEBUG ((LM_DEBUG,
338 "MCast example finished\n"));
340 catch (const CORBA::Exception& ex)
342 ex._tao_print_exception ("Service");
343 return 1;
345 return 0;
348 // ****************************************************************
350 int parse_args (int argc, ACE_TCHAR *argv[])
352 ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("m:"));
353 int c;
355 while ((c = get_opts ()) != -1)
356 switch (c)
358 case 'm':
359 udp_mcast_address = get_opts.opt_arg ();
360 break;
362 case '?':
363 default:
364 ACE_ERROR_RETURN ((LM_ERROR,
365 "usage: %s "
366 "[-m udp_mcast_address]"
367 "\n",
368 argv [0]),
369 -1);
371 // Indicates successful parsing of the command line
372 return 0;