3 #include "ace/Get_Opt.h"
4 #include "ace/High_Res_Timer.h"
5 #include "ace/Event_Handler.h"
7 #include "tao/Strategies/advanced_resource.h"
9 // Create a singleton instance of the Sender.
11 // An Unmanaged_Singleton is used to avoid static object destruction
12 // order related problems since the underlying singleton object
13 // contains references to static TypeCodes.
14 typedef ACE_Unmanaged_Singleton
<Sender
, ACE_Null_Mutex
> SENDER
;
17 /// Flag set when a signal is raised.
20 Signal_Handler::Signal_Handler (void)
25 Signal_Handler::handle_signal (int signum
, siginfo_t
*, ucontext_t
*)
29 if (TAO_debug_level
> 0)
31 "In the signal handler\n"));
40 Sender_Callback::flowname (void)
42 return this->flowname_
;
46 Sender_Callback::flowname (const ACE_CString
&flowname
)
48 this->flowname_
= flowname
;
53 Sender_Callback::handle_destroy (void)
55 SENDER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_
.c_str ());
57 SENDER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_
.c_str ());
59 SENDER::instance ()->connection_manager ().receivers ().unbind (this->flowname_
.c_str ());
61 // SENDER::instance ()->remove_stream ();
67 Sender_StreamEndPoint::get_callback (const char * flowname
,
68 TAO_AV_Callback
*&callback
)
70 //SENDER::instance ()->add_stream ();
72 /// Create and return the client application callback and return to the AVStreams
73 /// for further upcalls.
74 callback
= &this->callback_
;
76 ACE_CString
flow_name (flowname
);
77 this->callback_
.flowname (flow_name
);
83 Sender_StreamEndPoint::set_protocol_object (const char *flowname
,
84 TAO_AV_Protocol_Object
*object
)
86 Connection_Manager
&connection_manager
=
87 SENDER::instance ()->connection_manager ();
89 /// Add to the map of protocol objects.
90 connection_manager
.protocol_objects ().bind (flowname
,
93 /// Store the related streamctrl.
94 connection_manager
.add_streamctrl (flowname
,
101 Sender_StreamEndPoint::handle_preconnect (AVStreams::flowSpec
&flowspec
)
103 /// If another receiver of the same flowname is in the map, destroy
105 for (CORBA::ULong i
= 0;
106 i
< flowspec
.length ();
109 TAO_Forward_FlowSpec_Entry entry
;
110 entry
.parse (flowspec
[i
]);
112 ACE_CString
flowname (entry
.flowname ());
114 Connection_Manager
&connection_manager
=
115 SENDER::instance ()->connection_manager ();
118 connection_manager
.protocol_objects ().find (flowname
);
120 /// If the flowname is found.
123 ACE_DEBUG ((LM_DEBUG
, "\nSender switching distributers\n\n"));
125 /// Destroy old stream with the same flowname.
126 connection_manager
.destroy (flowname
);
132 Sender::Sender (void)
133 : sender_mmdevice_ (0),
139 sender_name_ ("sender")
143 Sender::~Sender (void)
145 if (TAO_debug_level
> 0)
146 ACE_DEBUG ((LM_DEBUG
,
147 "Sender destructor\n"));
151 Sender::shut_down (void)
155 AVStreams::MMDevice_var mmdevice
=
156 this->sender_mmdevice_
->_this ();
158 SENDER::instance ()->connection_manager ().unbind_sender (this->sender_name_
,
162 catch (const CORBA::Exception
& ex
)
164 ex
._tao_print_exception ("Sender::shut_down Failed\n");
169 Sender::parse_args (int argc
,
172 /// Parse command line arguments
173 ACE_Get_Opt
opts (argc
, argv
, ACE_TEXT("s:f:r:d"));
176 while ((c
= opts ()) != -1)
181 this->filename_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
184 this->frame_rate_
= ACE_OS::atoi (opts
.opt_arg ());
187 this->sender_name_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
193 ACE_DEBUG ((LM_DEBUG
, "Unknown Option\n"));
201 Sender::init (int argc
,
204 /// Initialize the endpoint strategy with the orb and poa.
206 this->endpoint_strategy_
.init (TAO_AV_CORE::instance ()->orb (),
207 TAO_AV_CORE::instance ()->poa ());
211 /// Initialize the connection manager.
213 this->connection_manager_
.init (TAO_AV_CORE::instance ()->orb ());
217 /// Parse the command line arguments
219 this->parse_args (argc
,
225 ACE_Reactor *reactor =
226 TAO_AV_CORE::instance ()->reactor ();
229 if (reactor->register_handler (SIGINT,
230 &this->signal_handler_) == -1)
231 ACE_ERROR_RETURN ((LM_ERROR,
232 "Error in handler register\n"),
234 /// Register the signal handler for clean termination of the process.
237 /// Open file to read.
239 ACE_OS::fopen (this->filename_
.c_str (),
242 if (this->input_file_
== 0)
243 ACE_ERROR_RETURN ((LM_DEBUG
,
244 "Cannot open input file %C\n",
245 this->filename_
.c_str ()),
248 ACE_DEBUG ((LM_DEBUG
,
249 "File opened successfully\n"));
251 /// Register the sender mmdevice object with the ORB
252 ACE_NEW_RETURN (this->sender_mmdevice_
,
253 TAO_MMDevice (&this->endpoint_strategy_
),
256 /// Servant Reference Counting to manage lifetime
257 PortableServer::ServantBase_var safe_mmdevice
=
258 this->sender_mmdevice_
;
260 AVStreams::MMDevice_var mmdevice
=
261 this->sender_mmdevice_
->_this ();
263 /// Register the object reference with the Naming Service and bind to
265 this->connection_manager_
.bind_to_receivers (this->sender_name_
,
268 /// Connect to the receivers
269 this->connection_manager_
.connect_to_receivers ();
274 /// Method to send data at the specified rate
276 Sender::pace_data (void)
278 /// The time that should lapse between two consecutive frames sent.
279 ACE_Time_Value inter_frame_time
;
281 /// The time between two consecutive frames.
282 inter_frame_time
.set (1.0 / this->frame_rate_
);
284 if (TAO_debug_level
> 0)
285 ACE_DEBUG ((LM_DEBUG
,
286 "Frame Rate = %d / second\n"
287 "Inter Frame Time = %d (msec)\n",
289 inter_frame_time
.msec ()));
293 /// The time taken for sending a frame and preparing for the next frame
294 ACE_High_Res_Timer elapsed_timer
;
296 /// Continue to send data till the file is read to the end.
302 ACE_DEBUG ((LM_DEBUG
,
303 "Shut Down called\n"));
310 /// Read from the file into a message block.
311 int n
= ACE_OS::fread (this->mb_
.wr_ptr (),
317 ACE_ERROR_RETURN ((LM_ERROR
,
318 "Sender::pace_data fread failed\n"),
323 /// At end of file break the loop and end the sender.
324 if (TAO_debug_level
> 0)
325 ACE_DEBUG ((LM_DEBUG
,"Handle_Start:End of file\n"));
333 this->mb_
.wr_ptr (n
);
335 if (this->frame_count_
> 1)
338 /// Second frame and beyond
341 /// Stop the timer that was started just before the previous frame was sent.
342 elapsed_timer
.stop ();
344 /// Get the time elapsed after sending the previous frame.
345 ACE_Time_Value elapsed_time
;
346 elapsed_timer
.elapsed_time (elapsed_time
);
348 if (TAO_debug_level
> 0)
349 ACE_DEBUG ((LM_DEBUG
,
350 "Elapsed Time = %d\n",
351 elapsed_time
.msec ()));
353 /// Check to see if the inter frame time has elapsed.
354 if (elapsed_time
< inter_frame_time
)
356 /// Inter frame time has not elapsed.
358 /// Calculate the time to wait before the next frame needs to be sent.
359 ACE_Time_Value
wait_time (inter_frame_time
- elapsed_time
);
361 if (TAO_debug_level
> 0)
362 ACE_DEBUG ((LM_DEBUG
,
366 /// Run the orb for the wait time so the sender can
367 /// continue other orb requests.
368 TAO_AV_CORE::instance ()->orb ()->run (wait_time
);
373 /// Start timer before sending the frame.
374 elapsed_timer
.start ();
376 Connection_Manager::Protocol_Objects
&protocol_objects
=
377 this->connection_manager_
.protocol_objects ();
379 /// Send frame to all receivers.
380 for (Connection_Manager::Protocol_Objects::iterator iterator
= protocol_objects
.begin ();
381 iterator
!= protocol_objects
.end ();
385 (*iterator
).int_id_
->send_frame (&this->mb_
);
388 ACE_ERROR_RETURN ((LM_ERROR
,
390 "Sender::pace_data send\n"),
394 ACE_DEBUG ((LM_DEBUG
,
395 "Sender::pace_data frame %d was sent successfully\n",
396 ++this->frame_count_
));
398 /// Reset the message block.
404 catch (const CORBA::Exception
& ex
)
406 ex
._tao_print_exception ("Sender::pace_data Failed\n");
413 Sender::connection_manager (void)
415 return this->connection_manager_
;
419 // Sender::add_stream (void)
421 // this->stream_count_++;
425 // Sender::remove_stream (void)
427 // this->stream_count_--;
431 // Sender::stream_alive (void)
433 // return this->stream_count_;
442 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
444 CORBA::Object_var obj
445 = orb
->resolve_initial_references ("RootPOA");
447 ///Get the POA_var object from Object_var
448 PortableServer::POA_var root_poa
449 = PortableServer::POA::_narrow (obj
.in ());
451 PortableServer::POAManager_var mgr
452 = root_poa
->the_POAManager ();
456 /// Initialize the AV Stream components.
457 TAO_AV_CORE::instance ()->init (orb
.in (),
460 /// Initialize the Client.
462 result
= SENDER::instance ()->init (argc
,
466 ACE_ERROR_RETURN ((LM_ERROR
,
467 "client::init failed\n"), -1);
469 SENDER::instance ()->pace_data ();
473 catch (const CORBA::Exception
& ex
)
475 ex
._tao_print_exception ("Sender Failed\n");
479 SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
484 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
485 template ACE_Unmanaged_Singleton
<Sender
, ACE_Null_Mutex
> *ACE_Unmanaged_Singleton
<Sender
, ACE_Null_Mutex
>::singleton_
;
486 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */