1 #include "distributer.h"
3 #include "ace/Get_Opt.h"
4 #include "orbsvcs/AV/Protocol_Factory.h"
5 #include "orbsvcs/AV/FlowSpec_Entry.h"
7 #include "tao/Strategies/advanced_resource.h"
9 typedef ACE_Unmanaged_Singleton
<Distributer
, ACE_Null_Mutex
> DISTRIBUTER
;
12 Signal_Handler::Signal_Handler ()
17 Signal_Handler::handle_signal (int signum
, siginfo_t
*, ucontext_t
*)
21 if (TAO_debug_level
> 0)
23 "In the signal handler\n"));
25 DISTRIBUTER::instance ()->done (1);
31 Distributer_Sender_StreamEndPoint::get_callback (const char *flow_name
,
32 TAO_AV_Callback
*&callback
)
34 /// Create and return the sender application callback to AVStreams
35 /// for further upcalls.
36 callback
= &this->callback_
;
38 ACE_CString fname
= flow_name
;
40 this->callback_
.flowname (fname
);
46 Distributer_Sender_StreamEndPoint::set_protocol_object (const char *flowname
,
47 TAO_AV_Protocol_Object
*object
)
49 Connection_Manager
&connection_manager
=
50 DISTRIBUTER::instance ()->connection_manager ();
52 /// Add to the map of protocol objects.
53 connection_manager
.protocol_objects ().bind (flowname
, object
);
55 /// Store the related streamctrl.
56 connection_manager
.add_streamctrl (flowname
, this);
62 Distributer_Receiver_StreamEndPoint::get_callback (const char *flow_name
,
63 TAO_AV_Callback
*&callback
)
65 /// Create and return the receiver application callback to AVStreams
66 /// for further upcalls.
67 callback
= &this->callback_
;
69 ACE_CString
flowname (flow_name
);
70 this->callback_
.flowname (flowname
);
76 Distributer_Receiver_StreamEndPoint::set_protocol_object (const char *,
77 TAO_AV_Protocol_Object
*)
79 /// Increment the stream count.
80 DISTRIBUTER::instance ()->stream_created ();
86 Distributer_Receiver_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec
&flowspec
)
88 //if (TAO_debug_level > 0)
90 "Distributer_Receiver_StreamEndPoint::handle_connection_requested\n"));
92 Connection_Manager
&connection_manager
=
93 DISTRIBUTER::instance ()->connection_manager ();
95 /// Check to see if the flow already exists. If it does then close the
96 /// old connection and setup a new one with the new sender.
98 for (CORBA::ULong i
= 0;
99 i
< flowspec
.length ();
102 TAO_Forward_FlowSpec_Entry entry
;
103 entry
.parse (flowspec
[i
]);
105 //if (TAO_debug_level > 0)
106 ACE_DEBUG ((LM_DEBUG
,
107 "Handle Connection Requested flowname %C\n",
110 ACE_CString
flowname (entry
.flowname ());
112 int const result
= connection_manager
.streamctrls ().find (flowname
);
114 /// If the flowname is found.
117 ACE_DEBUG ((LM_DEBUG
, "\nDistributer switching senders handle connection requested\n\n"));
119 ///Destroy old stream with the same flowname.
120 connection_manager
.destroy (flowname
);
123 /// Store the related streamctrl.
124 connection_manager
.add_streamctrl (flowname
.c_str (), this);
130 Distributer_Receiver_Callback::Distributer_Receiver_Callback ()
136 Distributer_Receiver_Callback::flowname ()
138 return this->flowname_
;
142 Distributer_Receiver_Callback::flowname (const ACE_CString
&flowname
)
144 this->flowname_
= flowname
;
149 Distributer_Receiver_Callback::receive_frame (ACE_Message_Block
*frame
,
153 /// Upcall from the AVStreams when there is data to be received from
155 ACE_DEBUG ((LM_DEBUG
,
156 "Distributer_Callback::receive_frame for frame %d\n",
157 this->frame_count_
++));
159 Connection_Manager::Protocol_Objects
&protocol_objects
=
160 DISTRIBUTER::instance ()->connection_manager ().protocol_objects ();
162 /// Send frame to all receivers.
163 for (Connection_Manager::Protocol_Objects::iterator iterator
= protocol_objects
.begin ();
164 iterator
!= protocol_objects
.end ();
167 int const result
= (*iterator
).int_id_
->send_frame (frame
);
170 ACE_ERROR_RETURN ((LM_ERROR
,
172 "Sender::pace_data send\n"),
180 Distributer_Receiver_Callback::handle_destroy ()
182 /// Called when the sender requests the stream to be shutdown.
183 ACE_DEBUG ((LM_DEBUG
,
184 "Distributer_Receiver_Callback::end_stream\n"));
186 DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_
.c_str ());
188 /// Decrement the stream count.
189 DISTRIBUTER::instance ()->stream_destroyed ();
195 Distributer_Sender_Callback::flowname ()
197 return this->flowname_
;
201 Distributer_Sender_Callback::flowname (const ACE_CString
&flowname
)
203 this->flowname_
= flowname
;
207 Distributer_Sender_Callback::handle_destroy ()
209 /// Called when the sender requests the stream to be shutdown.
211 ACE_DEBUG ((LM_DEBUG
,
212 "Distributer_Sender_Callback::end_stream\n"));
214 DISTRIBUTER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_
.c_str ());
216 DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_
.c_str ());
218 DISTRIBUTER::instance ()->connection_manager ().receivers ().unbind (this->flowname_
.c_str ());
223 Distributer::Distributer ()
224 : distributer_receiver_mmdevice_ (0),
225 sender_name_ ("sender"),
226 distributer_name_ ("distributer"),
232 Distributer::~Distributer ()
237 Distributer::stream_created ()
239 ++this->stream_count_
;
243 Distributer::stream_destroyed ()
245 --this->stream_count_
;
247 if (this->stream_count_
== 0)
253 Distributer::connection_manager ()
255 return this->connection_manager_
;
259 Distributer::parse_args (int argc
,
262 /// Parse command line arguments
263 ACE_Get_Opt
opts (argc
, argv
, ACE_TEXT("s:r:"));
266 while ((c
= opts ()) != -1)
271 this->sender_name_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
274 this->distributer_name_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
277 ACE_DEBUG ((LM_DEBUG
,"Unknown Option\n"));
286 Distributer::init (int argc
,
289 /// Initialize the connection class.
291 this->connection_manager_
.init (TAO_AV_CORE::instance ()->orb ());
295 /// Initialize the endpoint strategy with the orb and poa.
297 this->sender_endpoint_strategy_
.init (TAO_AV_CORE::instance ()->orb (),
298 TAO_AV_CORE::instance ()->poa ());
303 this->receiver_endpoint_strategy_
.init (TAO_AV_CORE::instance ()->orb (),
304 TAO_AV_CORE::instance ()->poa ());
308 /// Parse the command line arguments
310 this->parse_args (argc
,
315 ACE_Reactor
*reactor
=
316 TAO_AV_CORE::instance ()->reactor ();
318 if (reactor
->register_handler (SIGINT
,
319 &this->signal_handler_
) == -1)
320 ACE_ERROR_RETURN ((LM_ERROR
,
321 "Error in handler register\n"),
323 /// Register the signal handler for clean termination of the process.
325 ACE_NEW_RETURN (this->distributer_sender_mmdevice_
,
326 TAO_MMDevice (&this->sender_endpoint_strategy_
),
329 /// Servant Reference Counting to manage lifetime
330 PortableServer::ServantBase_var safe_sender_mmdevice
=
331 this->distributer_sender_mmdevice_
;
333 AVStreams::MMDevice_var distributer_sender_mmdevice
=
334 this->distributer_sender_mmdevice_
->_this ();
336 ACE_NEW_RETURN (this->distributer_receiver_mmdevice_
,
337 TAO_MMDevice (&this->receiver_endpoint_strategy_
),
340 /// Servant Reference Counting to manage lifetime
341 PortableServer::ServantBase_var safe_receiver_mmdevice
=
342 this->distributer_receiver_mmdevice_
;
344 AVStreams::MMDevice_var distributer_receiver_mmdevice
=
345 this->distributer_receiver_mmdevice_
->_this ();
349 this->connection_manager_
.bind_to_sender (this->sender_name_
,
350 this->distributer_name_
,
351 distributer_receiver_mmdevice
.in ());
353 /// Connect to sender.
354 this->connection_manager_
.connect_to_sender ();
356 /// Bind to receivers.
357 this->connection_manager_
.bind_to_receivers (this->distributer_name_
,
358 distributer_sender_mmdevice
.in ());
360 /// Connect to receivers
361 this->connection_manager_
.connect_to_receivers ();
367 Distributer::done () const
373 Distributer::shut_down ()
377 AVStreams::MMDevice_var receiver_mmdevice
=
378 this->distributer_receiver_mmdevice_
->_this ();
380 DISTRIBUTER::instance ()->connection_manager ().unbind_receiver (this->sender_name_
,
381 this->distributer_name_
,
382 receiver_mmdevice
.in ());
383 AVStreams::MMDevice_var sender_mmdevice
=
384 this->distributer_sender_mmdevice_
->_this ();
386 DISTRIBUTER::instance ()->connection_manager ().unbind_sender (this->distributer_name_
,
387 sender_mmdevice
.in ());
389 // DISTRIBUTER::instance ()->connection_manager ().destroy ();
391 catch (const CORBA::Exception
& ex
)
393 ex
._tao_print_exception ("Distributer::shut_down");
398 Distributer::done (bool done
)
404 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
408 /// Initialize the ORB first.
409 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
411 CORBA::Object_var obj
= orb
->resolve_initial_references ("RootPOA");
413 /// Get the POA_var object from Object_var.
414 PortableServer::POA_var root_poa
=
415 PortableServer::POA::_narrow (obj
.in ());
417 PortableServer::POAManager_var mgr
= root_poa
->the_POAManager ();
421 /// Initialize the AVStreams components.
422 TAO_AV_CORE::instance ()->init (orb
.in (), root_poa
.in ());
424 /// Initialize the Distributer
425 int result
= DISTRIBUTER::instance ()->init (argc
, argv
);
430 while (!DISTRIBUTER::instance ()->done ())
432 CORBA::Boolean wp
= orb
->work_pending ();
436 orb
->perform_work ();
440 DISTRIBUTER::instance ()->shut_down ();
442 // orb->shutdown(true);
444 catch (const CORBA::Exception
& ex
)
446 ex
._tao_print_exception ("main");
450 DISTRIBUTER::close (); // Explicitly finalize the Unmanaged_Singleton.
455 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
456 template ACE_Unmanaged_Singleton
<Distributer
, ACE_Null_Mutex
> *ACE_Unmanaged_Singleton
<Distributer
, ACE_Null_Mutex
>::singleton_
;
457 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */