Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Modify_QoS / sender.cpp
blob2acff23330307a3979751023bbcbd39c9bf2c8f2
1 #include "sender.h"
2 #include "tao/debug.h"
3 #include "ace/Get_Opt.h"
4 #include "ace/High_Res_Timer.h"
6 // Create a singleton instance of the Sender.
8 // An Unmanaged_Singleton is used to avoid static object destruction
9 // order related problems since the underlying singleton object
10 // contains references to static TypeCodes.
11 typedef ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> SENDER;
13 // The time that should lapse between two consecutive frames sent.
14 ACE_Time_Value inter_frame_time;
16 int
17 Sender_StreamEndPoint::get_callback (const char *,
18 TAO_AV_Callback *&callback)
20 // Create and return the sender application callback to AVStreams
21 // for further upcalls.
22 callback = &this->callback_;
23 return 0;
26 int
27 Sender_StreamEndPoint::set_protocol_object (const char *,
28 TAO_AV_Protocol_Object *object)
30 // Set the sender protocol object corresponding to the transport
31 // protocol selected.
32 SENDER::instance ()->protocol_object (object);
33 return 0;
36 CORBA::Boolean
37 Sender_StreamEndPoint::modify_QoS (AVStreams::streamQoS &new_qos,
38 const AVStreams::flowSpec &/* the_flows */)
40 ACE_DEBUG ((LM_DEBUG,
41 "Sender_StreamEndPoint::modify_QoS\n"));
43 // Check if the qos for the flow has changed.
44 if (new_qos.length () != 0)
46 // Check which qos parameter has changed.
47 if (ACE_OS::strcmp (new_qos [0].QoSParams [0].property_name, "video_frame_rate") == 0)
49 // The video frame rate for this flow has changed.
51 // Get the new value of the frame rate.
52 CORBA::Any frame_rate_any =
53 new_qos [0].QoSParams [0].property_value;
55 CORBA::Short frame_rate;
56 frame_rate_any >>= frame_rate;
58 // Calculate the new inter frame time.
59 inter_frame_time.set (1 / (double) frame_rate);
63 return 1;
66 Sender::Sender ()
67 : sender_mmdevice_ (0),
68 streamctrl_ (0),
69 frame_count_ (0),
70 filename_ ("input"),
71 input_file_ (0),
72 protocol_ ("UDP"),
73 frame_rate_ (1),
74 mb_ (BUFSIZ)
78 void
79 Sender::protocol_object (TAO_AV_Protocol_Object *object)
81 // Set the sender protocol object corresponding to the transport
82 // protocol selected.
83 this->protocol_object_ = object;
86 int
87 Sender::parse_args (int argc,
88 ACE_TCHAR *argv[])
90 // Parse command line arguments
91 ACE_Get_Opt opts (argc, argv, ACE_TEXT("f:p:r:d"));
93 int c;
94 while ((c= opts ()) != -1)
96 switch (c)
98 case 'f':
99 this->filename_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
100 break;
101 case 'p':
102 this->protocol_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
103 break;
104 case 'r':
105 this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
106 break;
107 case 'd':
108 TAO_debug_level++;
109 break;
110 default:
111 ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
112 return -1;
115 return 0;
118 // Method to get the object reference of the receiver
120 Sender::bind_to_receiver ()
122 CosNaming::Name name (1);
123 name.length (1);
124 name [0].id =
125 CORBA::string_dup ("Receiver");
127 // Resolve the receiver object reference from the Naming Service
128 CORBA::Object_var receiver_mmdevice_obj =
129 this->naming_client_->resolve (name);
131 this->receiver_mmdevice_ =
132 AVStreams::MMDevice::_narrow (receiver_mmdevice_obj.in ());
134 if (CORBA::is_nil (this->receiver_mmdevice_.in ()))
135 ACE_ERROR_RETURN ((LM_ERROR,
136 "Could not resolve Receiver_MMdevice in Naming service\n"),
137 -1);
139 return 0;
143 Sender::init (int argc,
144 ACE_TCHAR *argv[])
146 // Initialize the endpoint strategy with the orb and poa.
147 int result =
148 this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
149 TAO_AV_CORE::instance ()->poa ());
150 if (result != 0)
151 return result;
153 // Initialize the naming services
154 result =
155 this->naming_client_.init (TAO_AV_CORE::instance ()->orb ());
156 if (result != 0)
157 return result;
159 // Parse the command line arguments
160 result =
161 this->parse_args (argc,
162 argv);
163 if (result != 0)
164 return result;
166 // Open file to read.
167 this->input_file_ =
168 ACE_OS::fopen (this->filename_.c_str (),
169 "r");
171 if (this->input_file_ == 0)
172 ACE_ERROR_RETURN ((LM_DEBUG,
173 "Cannot open input file %C\n",
174 this->filename_.c_str ()),
175 -1);
176 else
177 ACE_DEBUG ((LM_DEBUG,
178 "File opened successfully\n"));
180 // Resolve the object reference of the receiver from the Naming Service.
181 result = this->bind_to_receiver ();
183 if (result != 0)
184 ACE_ERROR_RETURN ((LM_ERROR,
185 "(%P|%t) Error binding to the naming service\n"),
186 -1);
189 // Initialize the QoS
190 AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
192 // Create the forward flow specification to describe the flow.
193 TAO_Forward_FlowSpec_Entry entry ("Data_Receiver",
194 "IN",
195 "USER_DEFINED",
197 this->protocol_.c_str (),
200 AVStreams::flowSpec flow_spec (1);
201 flow_spec.length (1);
202 flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
204 // Register the sender mmdevice object with the ORB
205 ACE_NEW_RETURN (this->sender_mmdevice_,
206 TAO_MMDevice (&this->endpoint_strategy_),
207 -1);
209 // Servant Reference Counting to manage lifetime
210 PortableServer::ServantBase_var safe_mmdevice =
211 this->sender_mmdevice_;
213 AVStreams::MMDevice_var mmdevice =
214 this->sender_mmdevice_->_this ();
216 ACE_NEW_RETURN (this->streamctrl_,
217 TAO_StreamCtrl,
218 -1);
220 PortableServer::ServantBase_var safe_streamctrl =
221 this->streamctrl_;
223 // Bind/Connect the sender and receiver MMDevices.
224 CORBA::Boolean bind_result =
225 this->streamctrl_->bind_devs (mmdevice.in (),
226 this->receiver_mmdevice_.in (),
227 the_qos.inout (),
228 flow_spec);
230 if (bind_result == 0)
231 ACE_ERROR_RETURN ((LM_ERROR,
232 "streamctrl::bind_devs failed\n"),
233 -1);
235 return 0;
238 // Method to send data at the specified rate
240 Sender::pace_data ()
242 // The time between two consecutive frames.
243 inter_frame_time.set (1 / (double) this->frame_rate_);
245 if (TAO_debug_level > 0)
246 ACE_DEBUG ((LM_DEBUG,
247 "Frame Rate = %d / second\n"
248 "Inter Frame Time = %d (msec)\n",
249 this->frame_rate_,
250 inter_frame_time.msec ()));
254 // The time taken for sending a frame and preparing for the next frame
255 ACE_High_Res_Timer elapsed_timer;
257 // Continue to send data till the file is read to the end.
258 while (1)
260 // Read from the file into a message block.
261 size_t n = ACE_OS::fread (this->mb_.wr_ptr (),
263 this->mb_.size (),
264 this->input_file_);
266 if (n == 0)
268 // At end of file break the loop and end the sender.
269 if (TAO_debug_level > 0)
270 ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
271 break;
274 this->mb_.wr_ptr (n);
276 if (this->frame_count_ > 1)
279 // Second frame and beyond
282 // Stop the timer that was started just before the previous frame was sent.
283 elapsed_timer.stop ();
285 // Get the time elapsed after sending the previous frame.
286 ACE_Time_Value elapsed_time;
287 elapsed_timer.elapsed_time (elapsed_time);
289 if (TAO_debug_level > 0)
290 ACE_DEBUG ((LM_DEBUG,
291 "Elapsed Time = %d\n",
292 elapsed_time.msec ()));
294 // Check to see if the inter frame time has elapsed.
295 if (elapsed_time < inter_frame_time)
297 // Inter frame time has not elapsed.
299 // Calculate the time to wait before the next frame needs to be sent.
300 ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
302 if (TAO_debug_level > 0)
303 ACE_DEBUG ((LM_DEBUG,
304 "Wait Time = %d\n",
305 wait_time.msec ()));
307 // Run the orb for the wait time so the sender can
308 // continue other orb requests.
309 TAO_AV_CORE::instance ()->orb ()->run (wait_time);
313 // Start timer before sending the frame.
314 elapsed_timer.start ();
316 // Send frame.
317 int result =
318 this->protocol_object_->send_frame (&this->mb_);
320 if (result < 0)
321 ACE_ERROR_RETURN ((LM_ERROR,
322 "send failed:%p",
323 "Sender::pace_data send\n"),
324 -1);
326 ACE_DEBUG ((LM_DEBUG,
327 "Sender::pace_data frame %d was sent successfully\n",
328 ++this->frame_count_));
330 // Reset the message block.
331 this->mb_.reset ();
332 } // end while
334 // File reading is complete, destroy the stream.
335 AVStreams::flowSpec stop_spec;
336 this->streamctrl_->destroy (stop_spec);
338 // Shut the orb down.
339 TAO_AV_CORE::instance ()->orb ()->shutdown (1);
341 catch (const CORBA::Exception& ex)
343 ex._tao_print_exception ("Sender::pace_data Failed\n");
344 return -1;
346 return 0;
350 ACE_TMAIN (int argc,
351 ACE_TCHAR *argv[])
355 CORBA::ORB_var orb =
356 CORBA::ORB_init (argc, argv);
358 CORBA::Object_var obj
359 = orb->resolve_initial_references ("RootPOA");
361 // Get the POA_var object from Object_var
362 PortableServer::POA_var root_poa
363 = PortableServer::POA::_narrow (obj.in ());
365 PortableServer::POAManager_var mgr
366 = root_poa->the_POAManager ();
368 mgr->activate ();
370 // Initialize the AV Stream components.
371 TAO_AV_CORE::instance ()->init (orb.in (),
372 root_poa.in ());
374 // Initialize the Sender.
375 int result = 0;
376 result = SENDER::instance ()->init (argc,
377 argv);
379 if (result < 0)
380 ACE_ERROR_RETURN ((LM_ERROR,
381 "Sender::init failed\n"),
382 -1);
384 // Start sending data.
385 result = SENDER::instance ()->pace_data ();
387 catch (const CORBA::Exception& ex)
389 ex._tao_print_exception ("Sender Failed\n");
390 return -1;
393 SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
395 return 0;
398 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
399 template ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex>::singleton_;
400 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */