3 #include "ace/Get_Opt.h"
4 #include "ace/High_Res_Timer.h"
6 // Create a singleton instance of the Sender.
8 // An Unmanaged_Singleton is used to avoid static object destruction
9 // order related problems since the underlying singleton object
10 // contains references to static TypeCodes.
11 typedef ACE_Unmanaged_Singleton
<Sender
, ACE_Null_Mutex
> SENDER
;
13 // The time that should lapse between two consecutive frames sent.
14 ACE_Time_Value inter_frame_time
;
17 Sender_StreamEndPoint::get_callback (const char *,
18 TAO_AV_Callback
*&callback
)
20 // Create and return the sender application callback to AVStreams
21 // for further upcalls.
22 callback
= &this->callback_
;
27 Sender_StreamEndPoint::set_protocol_object (const char *,
28 TAO_AV_Protocol_Object
*object
)
30 // Set the sender protocol object corresponding to the transport
32 SENDER::instance ()->protocol_object (object
);
37 Sender_StreamEndPoint::modify_QoS (AVStreams::streamQoS
&new_qos
,
38 const AVStreams::flowSpec
&/* the_flows */)
41 "Sender_StreamEndPoint::modify_QoS\n"));
43 // Check if the qos for the flow has changed.
44 if (new_qos
.length () != 0)
46 // Check which qos parameter has changed.
47 if (ACE_OS::strcmp (new_qos
[0].QoSParams
[0].property_name
, "video_frame_rate") == 0)
49 // The video frame rate for this flow has changed.
51 // Get the new value of the frame rate.
52 CORBA::Any frame_rate_any
=
53 new_qos
[0].QoSParams
[0].property_value
;
55 CORBA::Short frame_rate
;
56 frame_rate_any
>>= frame_rate
;
58 // Calculate the new inter frame time.
59 inter_frame_time
.set (1 / (double) frame_rate
);
67 : sender_mmdevice_ (0),
79 Sender::protocol_object (TAO_AV_Protocol_Object
*object
)
81 // Set the sender protocol object corresponding to the transport
83 this->protocol_object_
= object
;
87 Sender::parse_args (int argc
,
90 // Parse command line arguments
91 ACE_Get_Opt
opts (argc
, argv
, ACE_TEXT("f:p:r:d"));
94 while ((c
= opts ()) != -1)
99 this->filename_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
102 this->protocol_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
105 this->frame_rate_
= ACE_OS::atoi (opts
.opt_arg ());
111 ACE_DEBUG ((LM_DEBUG
, "Unknown Option\n"));
118 // Method to get the object reference of the receiver
120 Sender::bind_to_receiver (void)
122 CosNaming::Name
name (1);
125 CORBA::string_dup ("Receiver");
127 // Resolve the receiver object reference from the Naming Service
128 CORBA::Object_var receiver_mmdevice_obj
=
129 this->naming_client_
->resolve (name
);
131 this->receiver_mmdevice_
=
132 AVStreams::MMDevice::_narrow (receiver_mmdevice_obj
.in ());
134 if (CORBA::is_nil (this->receiver_mmdevice_
.in ()))
135 ACE_ERROR_RETURN ((LM_ERROR
,
136 "Could not resolve Receiver_MMdevice in Naming service\n"),
143 Sender::init (int argc
,
146 // Initialize the endpoint strategy with the orb and poa.
148 this->endpoint_strategy_
.init (TAO_AV_CORE::instance ()->orb (),
149 TAO_AV_CORE::instance ()->poa ());
153 // Initialize the naming services
155 this->naming_client_
.init (TAO_AV_CORE::instance ()->orb ());
159 // Parse the command line arguments
161 this->parse_args (argc
,
166 // Open file to read.
168 ACE_OS::fopen (this->filename_
.c_str (),
171 if (this->input_file_
== 0)
172 ACE_ERROR_RETURN ((LM_DEBUG
,
173 "Cannot open input file %C\n",
174 this->filename_
.c_str ()),
177 ACE_DEBUG ((LM_DEBUG
,
178 "File opened successfully\n"));
180 // Resolve the object reference of the receiver from the Naming Service.
181 result
= this->bind_to_receiver ();
184 ACE_ERROR_RETURN ((LM_ERROR
,
185 "(%P|%t) Error binding to the naming service\n"),
189 // Initialize the QoS
190 AVStreams::streamQoS_var
the_qos (new AVStreams::streamQoS
);
192 // Create the forward flow specification to describe the flow.
193 TAO_Forward_FlowSpec_Entry
entry ("Data_Receiver",
197 this->protocol_
.c_str (),
200 AVStreams::flowSpec
flow_spec (1);
201 flow_spec
.length (1);
202 flow_spec
[0] = CORBA::string_dup (entry
.entry_to_string ());
204 // Register the sender mmdevice object with the ORB
205 ACE_NEW_RETURN (this->sender_mmdevice_
,
206 TAO_MMDevice (&this->endpoint_strategy_
),
209 // Servant Reference Counting to manage lifetime
210 PortableServer::ServantBase_var safe_mmdevice
=
211 this->sender_mmdevice_
;
213 AVStreams::MMDevice_var mmdevice
=
214 this->sender_mmdevice_
->_this ();
216 ACE_NEW_RETURN (this->streamctrl_
,
220 PortableServer::ServantBase_var safe_streamctrl
=
223 // Bind/Connect the sender and receiver MMDevices.
224 CORBA::Boolean bind_result
=
225 this->streamctrl_
->bind_devs (mmdevice
.in (),
226 this->receiver_mmdevice_
.in (),
230 if (bind_result
== 0)
231 ACE_ERROR_RETURN ((LM_ERROR
,
232 "streamctrl::bind_devs failed\n"),
238 // Method to send data at the specified rate
240 Sender::pace_data (void)
244 // The time between two consecutive frames.
245 inter_frame_time
.set (1 / (double) this->frame_rate_
);
247 if (TAO_debug_level
> 0)
248 ACE_DEBUG ((LM_DEBUG
,
249 "Frame Rate = %d / second\n"
250 "Inter Frame Time = %d (msec)\n",
252 inter_frame_time
.msec ()));
256 // The time taken for sending a frame and preparing for the next frame
257 ACE_High_Res_Timer elapsed_timer
;
259 // Continue to send data till the file is read to the end.
262 // Read from the file into a message block.
263 size_t n
= ACE_OS::fread (this->mb_
.wr_ptr (),
270 // At end of file break the loop and end the sender.
271 if (TAO_debug_level
> 0)
272 ACE_DEBUG ((LM_DEBUG
,"Handle_Start:End of file\n"));
276 this->mb_
.wr_ptr (n
);
278 if (this->frame_count_
> 1)
281 // Second frame and beyond
284 // Stop the timer that was started just before the previous frame was sent.
285 elapsed_timer
.stop ();
287 // Get the time elapsed after sending the previous frame.
288 ACE_Time_Value elapsed_time
;
289 elapsed_timer
.elapsed_time (elapsed_time
);
291 if (TAO_debug_level
> 0)
292 ACE_DEBUG ((LM_DEBUG
,
293 "Elapsed Time = %d\n",
294 elapsed_time
.msec ()));
296 // Check to see if the inter frame time has elapsed.
297 if (elapsed_time
< inter_frame_time
)
299 // Inter frame time has not elapsed.
301 // Calculate the time to wait before the next frame needs to be sent.
302 ACE_Time_Value
wait_time (inter_frame_time
- elapsed_time
);
304 if (TAO_debug_level
> 0)
305 ACE_DEBUG ((LM_DEBUG
,
309 // Run the orb for the wait time so the sender can
310 // continue other orb requests.
311 TAO_AV_CORE::instance ()->orb ()->run (wait_time
);
315 // Start timer before sending the frame.
316 elapsed_timer
.start ();
320 this->protocol_object_
->send_frame (&this->mb_
);
323 ACE_ERROR_RETURN ((LM_ERROR
,
325 "Sender::pace_data send\n"),
328 ACE_DEBUG ((LM_DEBUG
,
329 "Sender::pace_data frame %d was sent successfully\n",
330 ++this->frame_count_
));
332 // Reset the message block.
337 // File reading is complete, destroy the stream.
338 AVStreams::flowSpec stop_spec
;
339 this->streamctrl_
->destroy (stop_spec
);
341 // Shut the orb down.
342 TAO_AV_CORE::instance ()->orb ()->shutdown (1);
344 catch (const CORBA::Exception
& ex
)
346 ex
._tao_print_exception ("Sender::pace_data Failed\n");
359 CORBA::ORB_init (argc
, argv
);
361 CORBA::Object_var obj
362 = orb
->resolve_initial_references ("RootPOA");
364 // Get the POA_var object from Object_var
365 PortableServer::POA_var root_poa
366 = PortableServer::POA::_narrow (obj
.in ());
368 PortableServer::POAManager_var mgr
369 = root_poa
->the_POAManager ();
373 // Initialize the AV Stream components.
374 TAO_AV_CORE::instance ()->init (orb
.in (),
377 // Initialize the Sender.
379 result
= SENDER::instance ()->init (argc
,
383 ACE_ERROR_RETURN ((LM_ERROR
,
384 "Sender::init failed\n"),
387 // Start sending data.
388 result
= SENDER::instance ()->pace_data ();
391 catch (const CORBA::Exception
& ex
)
393 ex
._tao_print_exception ("Sender Failed\n");
397 SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
402 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
403 template ACE_Unmanaged_Singleton
<Sender
, ACE_Null_Mutex
> *ACE_Unmanaged_Singleton
<Sender
, ACE_Null_Mutex
>::singleton_
;
404 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */