3 #include "ace/Get_Opt.h"
4 #include "ace/High_Res_Timer.h"
5 #include "ace/Event_Handler.h"
7 #include "tao/Strategies/advanced_resource.h"
9 // Create a singleton instance of the Sender.
11 // An Unmanaged_Singleton is used to avoid static object destruction
12 // order related problems since the underlying singleton object
13 // contains references to static TypeCodes.
14 typedef ACE_Unmanaged_Singleton
<Sender
, ACE_Null_Mutex
> SENDER
;
17 /// Flag set when a signal is raised.
20 Signal_Handler::Signal_Handler ()
25 Signal_Handler::handle_signal (int signum
, siginfo_t
*, ucontext_t
*)
29 if (TAO_debug_level
> 0)
31 "In the signal handler\n"));
39 Sender_Callback::flowname ()
41 return this->flowname_
;
45 Sender_Callback::flowname (const ACE_CString
&flowname
)
47 this->flowname_
= flowname
;
52 Sender_Callback::handle_destroy ()
54 SENDER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_
.c_str ());
56 SENDER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_
.c_str ());
58 SENDER::instance ()->connection_manager ().receivers ().unbind (this->flowname_
.c_str ());
60 // SENDER::instance ()->remove_stream ();
66 Sender_StreamEndPoint::get_callback (const char * flowname
,
67 TAO_AV_Callback
*&callback
)
69 //SENDER::instance ()->add_stream ();
71 /// Create and return the client application callback and return to the AVStreams
72 /// for further upcalls.
73 callback
= &this->callback_
;
75 ACE_CString
flow_name (flowname
);
76 this->callback_
.flowname (flow_name
);
82 Sender_StreamEndPoint::set_protocol_object (const char *flowname
,
83 TAO_AV_Protocol_Object
*object
)
85 Connection_Manager
&connection_manager
=
86 SENDER::instance ()->connection_manager ();
88 /// Add to the map of protocol objects.
89 connection_manager
.protocol_objects ().bind (flowname
,
92 /// Store the related streamctrl.
93 connection_manager
.add_streamctrl (flowname
,
100 Sender_StreamEndPoint::handle_preconnect (AVStreams::flowSpec
&flowspec
)
102 /// If another receiver of the same flowname is in the map, destroy
104 for (CORBA::ULong i
= 0;
105 i
< flowspec
.length ();
108 TAO_Forward_FlowSpec_Entry entry
;
109 entry
.parse (flowspec
[i
]);
111 ACE_CString
flowname (entry
.flowname ());
113 Connection_Manager
&connection_manager
=
114 SENDER::instance ()->connection_manager ();
117 connection_manager
.protocol_objects ().find (flowname
);
119 /// If the flowname is found.
122 ACE_DEBUG ((LM_DEBUG
, "\nSender switching distributers\n\n"));
124 /// Destroy old stream with the same flowname.
125 connection_manager
.destroy (flowname
);
132 : sender_mmdevice_ (0),
138 sender_name_ ("sender")
144 if (TAO_debug_level
> 0)
145 ACE_DEBUG ((LM_DEBUG
,
146 "Sender destructor\n"));
154 AVStreams::MMDevice_var mmdevice
=
155 this->sender_mmdevice_
->_this ();
157 SENDER::instance ()->connection_manager ().unbind_sender (this->sender_name_
,
160 catch (const CORBA::Exception
& ex
)
162 ex
._tao_print_exception ("Sender::shut_down Failed\n");
167 Sender::parse_args (int argc
,
170 /// Parse command line arguments
171 ACE_Get_Opt
opts (argc
, argv
, ACE_TEXT("s:f:r:d"));
174 while ((c
= opts ()) != -1)
179 this->filename_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
182 this->frame_rate_
= ACE_OS::atoi (opts
.opt_arg ());
185 this->sender_name_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
191 ACE_DEBUG ((LM_DEBUG
, "Unknown Option\n"));
199 Sender::init (int argc
,
202 /// Initialize the endpoint strategy with the orb and poa.
204 this->endpoint_strategy_
.init (TAO_AV_CORE::instance ()->orb (),
205 TAO_AV_CORE::instance ()->poa ());
209 /// Initialize the connection manager.
211 this->connection_manager_
.init (TAO_AV_CORE::instance ()->orb ());
215 /// Parse the command line arguments
217 this->parse_args (argc
,
223 ACE_Reactor *reactor =
224 TAO_AV_CORE::instance ()->reactor ();
227 if (reactor->register_handler (SIGINT,
228 &this->signal_handler_) == -1)
229 ACE_ERROR_RETURN ((LM_ERROR,
230 "Error in handler register\n"),
232 /// Register the signal handler for clean termination of the process.
235 /// Open file to read.
237 ACE_OS::fopen (this->filename_
.c_str (),
240 if (this->input_file_
== 0)
241 ACE_ERROR_RETURN ((LM_DEBUG
,
242 "Cannot open input file %C\n",
243 this->filename_
.c_str ()),
246 ACE_DEBUG ((LM_DEBUG
,
247 "File opened successfully\n"));
249 /// Register the sender mmdevice object with the ORB
250 ACE_NEW_RETURN (this->sender_mmdevice_
,
251 TAO_MMDevice (&this->endpoint_strategy_
),
254 /// Servant Reference Counting to manage lifetime
255 PortableServer::ServantBase_var safe_mmdevice
=
256 this->sender_mmdevice_
;
258 AVStreams::MMDevice_var mmdevice
=
259 this->sender_mmdevice_
->_this ();
261 /// Register the object reference with the Naming Service and bind to
263 this->connection_manager_
.bind_to_receivers (this->sender_name_
,
266 /// Connect to the receivers
267 this->connection_manager_
.connect_to_receivers ();
272 /// Method to send data at the specified rate
276 /// The time that should lapse between two consecutive frames sent.
277 ACE_Time_Value inter_frame_time
;
279 /// The time between two consecutive frames.
280 inter_frame_time
.set (1.0 / this->frame_rate_
);
282 if (TAO_debug_level
> 0)
283 ACE_DEBUG ((LM_DEBUG
,
284 "Frame Rate = %d / second\n"
285 "Inter Frame Time = %d (msec)\n",
287 inter_frame_time
.msec ()));
291 /// The time taken for sending a frame and preparing for the next frame
292 ACE_High_Res_Timer elapsed_timer
;
294 /// Continue to send data till the file is read to the end.
299 ACE_DEBUG ((LM_DEBUG
,
300 "Shut Down called\n"));
307 /// Read from the file into a message block.
308 int n
= ACE_OS::fread (this->mb_
.wr_ptr (),
314 ACE_ERROR_RETURN ((LM_ERROR
,
315 "Sender::pace_data fread failed\n"),
320 /// At end of file break the loop and end the sender.
321 if (TAO_debug_level
> 0)
322 ACE_DEBUG ((LM_DEBUG
,"Handle_Start:End of file\n"));
329 this->mb_
.wr_ptr (n
);
331 if (this->frame_count_
> 1)
334 /// Second frame and beyond
337 /// Stop the timer that was started just before the previous frame was sent.
338 elapsed_timer
.stop ();
340 /// Get the time elapsed after sending the previous frame.
341 ACE_Time_Value elapsed_time
;
342 elapsed_timer
.elapsed_time (elapsed_time
);
344 if (TAO_debug_level
> 0)
345 ACE_DEBUG ((LM_DEBUG
,
346 "Elapsed Time = %d\n",
347 elapsed_time
.msec ()));
349 /// Check to see if the inter frame time has elapsed.
350 if (elapsed_time
< inter_frame_time
)
352 /// Inter frame time has not elapsed.
354 /// Calculate the time to wait before the next frame needs to be sent.
355 ACE_Time_Value
wait_time (inter_frame_time
- elapsed_time
);
357 if (TAO_debug_level
> 0)
358 ACE_DEBUG ((LM_DEBUG
,
362 /// Run the orb for the wait time so the sender can
363 /// continue other orb requests.
364 TAO_AV_CORE::instance ()->orb ()->run (wait_time
);
368 /// Start timer before sending the frame.
369 elapsed_timer
.start ();
371 Connection_Manager::Protocol_Objects
&protocol_objects
=
372 this->connection_manager_
.protocol_objects ();
374 /// Send frame to all receivers.
375 for (Connection_Manager::Protocol_Objects::iterator iterator
= protocol_objects
.begin ();
376 iterator
!= protocol_objects
.end ();
380 (*iterator
).int_id_
->send_frame (&this->mb_
);
383 ACE_ERROR_RETURN ((LM_ERROR
,
385 "Sender::pace_data send\n"),
389 ACE_DEBUG ((LM_DEBUG
,
390 "Sender::pace_data frame %d was sent successfully\n",
391 ++this->frame_count_
));
393 /// Reset the message block.
398 catch (const CORBA::Exception
& ex
)
400 ex
._tao_print_exception ("Sender::pace_data Failed\n");
407 Sender::connection_manager ()
409 return this->connection_manager_
;
413 // Sender::add_stream ()
415 // this->stream_count_++;
419 // Sender::remove_stream ()
421 // this->stream_count_--;
425 // Sender::stream_alive ()
427 // return this->stream_count_;
436 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
438 CORBA::Object_var obj
439 = orb
->resolve_initial_references ("RootPOA");
441 ///Get the POA_var object from Object_var
442 PortableServer::POA_var root_poa
443 = PortableServer::POA::_narrow (obj
.in ());
445 PortableServer::POAManager_var mgr
446 = root_poa
->the_POAManager ();
450 /// Initialize the AV Stream components.
451 TAO_AV_CORE::instance ()->init (orb
.in (),
454 /// Initialize the Client.
456 result
= SENDER::instance ()->init (argc
,
460 ACE_ERROR_RETURN ((LM_ERROR
,
461 "client::init failed\n"), -1);
463 SENDER::instance ()->pace_data ();
467 catch (const CORBA::Exception
& ex
)
469 ex
._tao_print_exception ("Sender Failed\n");
473 SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
478 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
479 template ACE_Unmanaged_Singleton
<Sender
, ACE_Null_Mutex
> *ACE_Unmanaged_Singleton
<Sender
, ACE_Null_Mutex
>::singleton_
;
480 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */