Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Component_Switching / distributer.cpp
blob5a7d703a2e85ccb3b68ee190b19a7659c3ab3d80
1 #include "distributer.h"
2 #include "tao/debug.h"
3 #include "ace/Get_Opt.h"
4 #include "orbsvcs/AV/Protocol_Factory.h"
5 #include "orbsvcs/AV/FlowSpec_Entry.h"
7 #include "tao/Strategies/advanced_resource.h"
9 typedef ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> DISTRIBUTER;
11 // constructor.
12 Signal_Handler::Signal_Handler ()
16 int
17 Signal_Handler::handle_signal (int signum, siginfo_t *, ucontext_t*)
19 if (signum == SIGINT)
21 if (TAO_debug_level > 0)
22 ACE_DEBUG ((LM_DEBUG,
23 "In the signal handler\n"));
25 DISTRIBUTER::instance ()->done (1);
27 return 0;
30 int
31 Distributer_Sender_StreamEndPoint::get_callback (const char *flow_name,
32 TAO_AV_Callback *&callback)
34 /// Create and return the sender application callback to AVStreams
35 /// for further upcalls.
36 callback = &this->callback_;
38 ACE_CString fname = flow_name;
40 this->callback_.flowname (fname);
42 return 0;
45 int
46 Distributer_Sender_StreamEndPoint::set_protocol_object (const char *flowname,
47 TAO_AV_Protocol_Object *object)
49 Connection_Manager &connection_manager =
50 DISTRIBUTER::instance ()->connection_manager ();
52 /// Add to the map of protocol objects.
53 connection_manager.protocol_objects ().bind (flowname, object);
55 /// Store the related streamctrl.
56 connection_manager.add_streamctrl (flowname, this);
58 return 0;
61 int
62 Distributer_Receiver_StreamEndPoint::get_callback (const char *flow_name,
63 TAO_AV_Callback *&callback)
65 /// Create and return the receiver application callback to AVStreams
66 /// for further upcalls.
67 callback = &this->callback_;
69 ACE_CString flowname (flow_name);
70 this->callback_.flowname (flowname);
72 return 0;
75 int
76 Distributer_Receiver_StreamEndPoint::set_protocol_object (const char *,
77 TAO_AV_Protocol_Object *)
79 /// Increment the stream count.
80 DISTRIBUTER::instance ()->stream_created ();
82 return 0;
85 CORBA::Boolean
86 Distributer_Receiver_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &flowspec)
88 //if (TAO_debug_level > 0)
89 ACE_DEBUG ((LM_DEBUG,
90 "Distributer_Receiver_StreamEndPoint::handle_connection_requested\n"));
92 Connection_Manager &connection_manager =
93 DISTRIBUTER::instance ()->connection_manager ();
95 /// Check to see if the flow already exists. If it does then close the
96 /// old connection and setup a new one with the new sender.
98 for (CORBA::ULong i = 0;
99 i < flowspec.length ();
100 i++)
102 TAO_Forward_FlowSpec_Entry entry;
103 entry.parse (flowspec[i]);
105 //if (TAO_debug_level > 0)
106 ACE_DEBUG ((LM_DEBUG,
107 "Handle Connection Requested flowname %C\n",
108 entry.flowname ()));
110 ACE_CString flowname (entry.flowname ());
112 int const result = connection_manager.streamctrls ().find (flowname);
114 /// If the flowname is found.
115 if (result == 0)
117 ACE_DEBUG ((LM_DEBUG, "\nDistributer switching senders handle connection requested\n\n"));
119 ///Destroy old stream with the same flowname.
120 connection_manager.destroy (flowname);
123 /// Store the related streamctrl.
124 connection_manager.add_streamctrl (flowname.c_str (), this);
126 return true;
130 Distributer_Receiver_Callback::Distributer_Receiver_Callback ()
131 : frame_count_ (1)
135 ACE_CString &
136 Distributer_Receiver_Callback::flowname ()
138 return this->flowname_;
141 void
142 Distributer_Receiver_Callback::flowname (const ACE_CString &flowname)
144 this->flowname_ = flowname;
149 Distributer_Receiver_Callback::receive_frame (ACE_Message_Block *frame,
150 TAO_AV_frame_info *,
151 const ACE_Addr &)
153 /// Upcall from the AVStreams when there is data to be received from
154 /// the sender.
155 ACE_DEBUG ((LM_DEBUG,
156 "Distributer_Callback::receive_frame for frame %d\n",
157 this->frame_count_++));
159 Connection_Manager::Protocol_Objects &protocol_objects =
160 DISTRIBUTER::instance ()->connection_manager ().protocol_objects ();
162 /// Send frame to all receivers.
163 for (Connection_Manager::Protocol_Objects::iterator iterator = protocol_objects.begin ();
164 iterator != protocol_objects.end ();
165 ++iterator)
167 int const result = (*iterator).int_id_->send_frame (frame);
169 if (result < 0)
170 ACE_ERROR_RETURN ((LM_ERROR,
171 "send failed:%p",
172 "Sender::pace_data send\n"),
173 -1);
176 return 0;
180 Distributer_Receiver_Callback::handle_destroy ()
182 /// Called when the sender requests the stream to be shutdown.
183 ACE_DEBUG ((LM_DEBUG,
184 "Distributer_Receiver_Callback::end_stream\n"));
186 DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());
188 /// Decrement the stream count.
189 DISTRIBUTER::instance ()->stream_destroyed ();
191 return 0;
194 ACE_CString &
195 Distributer_Sender_Callback::flowname ()
197 return this->flowname_;
200 void
201 Distributer_Sender_Callback::flowname (const ACE_CString &flowname)
203 this->flowname_ = flowname;
207 Distributer_Sender_Callback::handle_destroy ()
209 /// Called when the sender requests the stream to be shutdown.
211 ACE_DEBUG ((LM_DEBUG,
212 "Distributer_Sender_Callback::end_stream\n"));
214 DISTRIBUTER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_.c_str ());
216 DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());
218 DISTRIBUTER::instance ()->connection_manager ().receivers ().unbind (this->flowname_.c_str ());
220 return 0;
223 Distributer::Distributer ()
224 : distributer_receiver_mmdevice_ (0),
225 sender_name_ ("sender"),
226 distributer_name_ ("distributer"),
227 done_ (false),
228 stream_count_ (0)
232 Distributer::~Distributer ()
236 void
237 Distributer::stream_created ()
239 ++this->stream_count_;
242 void
243 Distributer::stream_destroyed ()
245 --this->stream_count_;
247 if (this->stream_count_ == 0)
248 this->done_ = true;
252 Connection_Manager &
253 Distributer::connection_manager ()
255 return this->connection_manager_;
259 Distributer::parse_args (int argc,
260 ACE_TCHAR *argv[])
262 /// Parse command line arguments
263 ACE_Get_Opt opts (argc, argv, ACE_TEXT("s:r:"));
265 int c;
266 while ((c= opts ()) != -1)
268 switch (c)
270 case 's':
271 this->sender_name_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
272 break;
273 case 'r':
274 this->distributer_name_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
275 break;
276 default:
277 ACE_DEBUG ((LM_DEBUG,"Unknown Option\n"));
278 return -1;
281 return 0;
286 Distributer::init (int argc,
287 ACE_TCHAR *argv[])
289 /// Initialize the connection class.
290 int result =
291 this->connection_manager_.init (TAO_AV_CORE::instance ()->orb ());
292 if (result != 0)
293 return result;
295 /// Initialize the endpoint strategy with the orb and poa.
296 result =
297 this->sender_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
298 TAO_AV_CORE::instance ()->poa ());
299 if (result != 0)
300 return result;
302 result =
303 this->receiver_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
304 TAO_AV_CORE::instance ()->poa ());
305 if (result != 0)
306 return result;
308 /// Parse the command line arguments
309 result =
310 this->parse_args (argc,
311 argv);
312 if (result != 0)
313 return result;
315 ACE_Reactor *reactor =
316 TAO_AV_CORE::instance ()->reactor ();
318 if (reactor->register_handler (SIGINT,
319 &this->signal_handler_) == -1)
320 ACE_ERROR_RETURN ((LM_ERROR,
321 "Error in handler register\n"),
322 -1);
323 /// Register the signal handler for clean termination of the process.
325 ACE_NEW_RETURN (this->distributer_sender_mmdevice_,
326 TAO_MMDevice (&this->sender_endpoint_strategy_),
327 -1);
329 /// Servant Reference Counting to manage lifetime
330 PortableServer::ServantBase_var safe_sender_mmdevice =
331 this->distributer_sender_mmdevice_;
333 AVStreams::MMDevice_var distributer_sender_mmdevice =
334 this->distributer_sender_mmdevice_->_this ();
336 ACE_NEW_RETURN (this->distributer_receiver_mmdevice_,
337 TAO_MMDevice (&this->receiver_endpoint_strategy_),
338 -1);
340 /// Servant Reference Counting to manage lifetime
341 PortableServer::ServantBase_var safe_receiver_mmdevice =
342 this->distributer_receiver_mmdevice_;
344 AVStreams::MMDevice_var distributer_receiver_mmdevice =
345 this->distributer_receiver_mmdevice_->_this ();
348 /// Bind to sender.
349 this->connection_manager_.bind_to_sender (this->sender_name_,
350 this->distributer_name_,
351 distributer_receiver_mmdevice.in ());
353 /// Connect to sender.
354 this->connection_manager_.connect_to_sender ();
356 /// Bind to receivers.
357 this->connection_manager_.bind_to_receivers (this->distributer_name_,
358 distributer_sender_mmdevice.in ());
360 /// Connect to receivers
361 this->connection_manager_.connect_to_receivers ();
363 return 0;
366 bool
367 Distributer::done () const
369 return this->done_;
372 void
373 Distributer::shut_down ()
377 AVStreams::MMDevice_var receiver_mmdevice =
378 this->distributer_receiver_mmdevice_->_this ();
380 DISTRIBUTER::instance ()->connection_manager ().unbind_receiver (this->sender_name_,
381 this->distributer_name_,
382 receiver_mmdevice.in ());
383 AVStreams::MMDevice_var sender_mmdevice =
384 this->distributer_sender_mmdevice_->_this ();
386 DISTRIBUTER::instance ()->connection_manager ().unbind_sender (this->distributer_name_,
387 sender_mmdevice.in ());
389 // DISTRIBUTER::instance ()->connection_manager ().destroy ();
391 catch (const CORBA::Exception& ex)
393 ex._tao_print_exception ("Distributer::shut_down");
397 void
398 Distributer::done (bool done)
400 this->done_ = done;
404 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
408 /// Initialize the ORB first.
409 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
411 CORBA::Object_var obj = orb->resolve_initial_references ("RootPOA");
413 /// Get the POA_var object from Object_var.
414 PortableServer::POA_var root_poa =
415 PortableServer::POA::_narrow (obj.in ());
417 PortableServer::POAManager_var mgr = root_poa->the_POAManager ();
419 mgr->activate ();
421 /// Initialize the AVStreams components.
422 TAO_AV_CORE::instance ()->init (orb.in (), root_poa.in ());
424 /// Initialize the Distributer
425 int result = DISTRIBUTER::instance ()->init (argc, argv);
427 if (result != 0)
428 return result;
430 while (!DISTRIBUTER::instance ()->done ())
432 CORBA::Boolean wp = orb->work_pending ();
434 if (wp)
436 orb->perform_work ();
440 DISTRIBUTER::instance ()->shut_down ();
442 // orb->shutdown(true);
444 catch (const CORBA::Exception& ex)
446 ex._tao_print_exception ("main");
447 return -1;
450 DISTRIBUTER::close (); // Explicitly finalize the Unmanaged_Singleton.
452 return 0;
455 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
456 template ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex>::singleton_;
457 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */