Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Bidirectional_Flows / sender.cpp
blob0c607516030dbd805f5fe61cefb8454130111968
1 #include "sender.h"
2 #include "tao/debug.h"
3 #include "ace/Get_Opt.h"
4 #include "ace/High_Res_Timer.h"
6 typedef ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> SENDER;
7 // Create a singleton instance of the Sender.
9 static FILE *output_file = 0;
10 // File handle of the file into which received data is written.
12 static const ACE_TCHAR *output_file_name = ACE_TEXT ("output");
13 // File name of the file into which received data is written.
16 int
17 Sender_StreamEndPoint::get_callback (const char *,
18 TAO_AV_Callback *&callback)
20 // Create and return the sender application callback to AVStreams
21 // for further upcalls.
22 callback = &this->callback_;
23 return 0;
26 int
27 Sender_StreamEndPoint::set_protocol_object (const char *,
28 TAO_AV_Protocol_Object *object)
30 // Set the sender protocol object corresponding to the transport
31 // protocol selected.
32 SENDER::instance ()->protocol_object (object);
33 return 0;
36 Sender_Callback::Sender_Callback ()
37 : frame_count_ (1)
41 int
42 Sender_Callback::receive_frame (ACE_Message_Block *frame,
43 TAO_AV_frame_info *,
44 const ACE_Addr &)
47 // Upcall from the AVStreams when there is data to be received from
48 // the sender.
50 ACE_DEBUG ((LM_DEBUG,
51 "Sender_Callback::receive_frame for frame %d\n",
52 this->frame_count_++));
54 while (frame != 0)
56 // Write the received data to the file.
57 size_t result =
58 ACE_OS::fwrite (frame->rd_ptr (),
59 frame->length (),
61 output_file);
63 if (result == frame->length ())
64 ACE_ERROR_RETURN ((LM_ERROR,
65 "Sender_Callback::fwrite failed\n"),
66 -1);
68 frame = frame->cont ();
71 if (SENDER::instance ()->eof () == 1)
72 SENDER::instance ()->shutdown ();
73 return 0;
76 Sender::Sender ()
77 : sender_mmdevice_ (0),
78 streamctrl_ (0),
79 frame_count_ (0),
80 filename_ ("input"),
81 input_file_ (0),
82 protocol_ ("UDP"),
83 frame_rate_ (30),
84 mb_ (BUFSIZ),
85 eof_ (0)
89 void
90 Sender::protocol_object (TAO_AV_Protocol_Object *object)
92 // Set the sender protocol object corresponding to the transport
93 // protocol selected.
94 this->protocol_object_ = object;
97 int
98 Sender::eof ()
100 return this->eof_;
103 void
104 Sender::shutdown ()
108 // File reading is complete, destroy the stream.
109 AVStreams::flowSpec stop_spec;
110 this->streamctrl_->destroy (stop_spec);
112 // Shut the orb down.
113 TAO_AV_CORE::instance ()->orb ()->shutdown (0);
115 catch (const CORBA::Exception& ex)
117 ex._tao_print_exception ("shutdown\n");
122 Sender::parse_args (int argc,
123 ACE_TCHAR *argv[])
125 // Parse command line arguments
126 ACE_Get_Opt opts (argc, argv, ACE_TEXT("f:p:r:d"));
128 int c;
129 while ((c= opts ()) != -1)
131 switch (c)
133 case 'f':
134 this->filename_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
135 break;
136 case 'p':
137 this->protocol_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
138 break;
139 case 'r':
140 this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
141 break;
142 case 'd':
143 TAO_debug_level++;
144 break;
145 default:
146 ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
147 return -1;
150 return 0;
153 // Method to get the object reference of the receiver
155 Sender::bind_to_receiver ()
157 CosNaming::Name name (1);
158 name.length (1);
159 name [0].id =
160 CORBA::string_dup ("Receiver");
162 // Resolve the receiver object reference from the Naming Service
163 CORBA::Object_var receiver_mmdevice_obj =
164 this->naming_client_->resolve (name);
166 this->receiver_mmdevice_ =
167 AVStreams::MMDevice::_narrow (receiver_mmdevice_obj.in ());
169 if (CORBA::is_nil (this->receiver_mmdevice_.in ()))
170 ACE_ERROR_RETURN ((LM_ERROR,
171 "Could not resolve Receiver_MMdevice in Naming service\n"),
172 -1);
174 return 0;
178 Sender::init (int argc,
179 ACE_TCHAR *argv[])
181 // Initialize the endpoint strategy with the orb and poa.
182 int result =
183 this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
184 TAO_AV_CORE::instance ()->poa ());
185 if (result != 0)
186 return result;
188 // Initialize the naming services
189 result =
190 this->naming_client_.init (TAO_AV_CORE::instance ()->orb ());
191 if (result != 0)
192 return result;
194 // Parse the command line arguments
195 result =
196 this->parse_args (argc,
197 argv);
198 if (result != 0)
199 return result;
201 // Open file to read.
202 this->input_file_ =
203 ACE_OS::fopen (this->filename_.c_str (),
204 "r");
206 if (this->input_file_ == 0)
207 ACE_ERROR_RETURN ((LM_DEBUG,
208 "Cannot open input file %C\n",
209 this->filename_.c_str ()),
210 -1);
211 else
212 ACE_DEBUG ((LM_DEBUG,
213 "File opened successfully\n"));
215 // Resolve the object reference of the receiver from the Naming Service.
216 result = this->bind_to_receiver ();
218 if (result != 0)
219 ACE_ERROR_RETURN ((LM_ERROR,
220 "(%P|%t) Error binding to the naming service\n"),
221 -1);
224 // Initialize the QoS
225 AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
227 // Create the forward flow specification to describe the flow.
228 TAO_Forward_FlowSpec_Entry entry ("Data_Receiver",
229 "IN",
230 "USER_DEFINED",
232 this->protocol_.c_str (),
235 AVStreams::flowSpec flow_spec (1);
236 flow_spec.length (2);
237 flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
239 // Create the forward flow specification to describe the flow.
240 TAO_Forward_FlowSpec_Entry entry1 ("Data_Receiver1",
241 "OUT",
242 "USER_DEFINED",
244 this->protocol_.c_str (),
247 flow_spec [1] = CORBA::string_dup (entry1.entry_to_string ());
249 // Register the sender mmdevice object with the ORB
250 ACE_NEW_RETURN (this->sender_mmdevice_,
251 TAO_MMDevice (&this->endpoint_strategy_),
252 -1);
254 // Servant Reference Counting to manage lifetime
255 PortableServer::ServantBase_var safe_mmdevice =
256 this->sender_mmdevice_;
258 AVStreams::MMDevice_var mmdevice =
259 this->sender_mmdevice_->_this ();
261 ACE_NEW_RETURN (this->streamctrl_,
262 TAO_StreamCtrl,
263 -1);
265 PortableServer::ServantBase_var safe_streamctrl =
266 this->streamctrl_;
268 // Bind/Connect the sender and receiver MMDevices.
269 CORBA::Boolean bind_result =
270 this->streamctrl_->bind_devs (mmdevice.in (),
271 this->receiver_mmdevice_.in (),
272 the_qos.inout (),
273 flow_spec);
275 if (bind_result == 0)
276 ACE_ERROR_RETURN ((LM_ERROR,
277 "streamctrl::bind_devs failed\n"),
278 -1);
280 return 0;
283 // Method to send data at the specified rate
285 Sender::pace_data ()
287 // The time that should lapse between two consecutive frames sent.
288 ACE_Time_Value inter_frame_time;
290 // The time between two consecutive frames.
291 inter_frame_time.set (1 / (double) this->frame_rate_);
293 if (TAO_debug_level > 0)
294 ACE_DEBUG ((LM_DEBUG,
295 "Frame Rate = %d / second\n"
296 "Inter Frame Time = %d (msec)\n",
297 this->frame_rate_,
298 inter_frame_time.msec ()));
302 // The time taken for sending a frame and preparing for the next frame
303 ACE_High_Res_Timer elapsed_timer;
305 // Continue to send data till the file is read to the end.
306 while (1)
308 // Read from the file into a message block.
309 int n = ACE_OS::fread (this->mb_.wr_ptr (),
311 this->mb_.size (),
312 this->input_file_);
314 if (n < 0)
315 ACE_ERROR_RETURN ((LM_ERROR,
316 "Sender::pace_data fread failed\n"),
317 -1);
319 if (n == 0)
321 // At end of file break the loop and end the sender.
322 ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
323 this->eof_ = 1;
324 break;
327 this->mb_.wr_ptr (n);
329 if (this->frame_count_ > 1)
332 // Second frame and beyond
335 // Stop the timer that was started just before the previous frame was sent.
336 elapsed_timer.stop ();
338 // Get the time elapsed after sending the previous frame.
339 ACE_Time_Value elapsed_time;
340 elapsed_timer.elapsed_time (elapsed_time);
342 if (TAO_debug_level > 0)
343 ACE_DEBUG ((LM_DEBUG,
344 "Elapsed Time = %d\n",
345 elapsed_time.msec ()));
347 // Check to see if the inter frame time has elapsed.
348 if (elapsed_time < inter_frame_time)
350 // Inter frame time has not elapsed.
352 // Calculate the time to wait before the next frame needs to be sent.
353 ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
355 if (TAO_debug_level > 0)
356 ACE_DEBUG ((LM_DEBUG,
357 "Wait Time = %d\n",
358 wait_time.msec ()));
360 // Run the orb for the wait time so the sender can
361 // continue other orb requests.
362 TAO_AV_CORE::instance ()->orb ()->run (wait_time);
366 // Start timer before sending the frame.
367 elapsed_timer.start ();
369 // Send frame.
370 int result =
371 this->protocol_object_->send_frame (&this->mb_);
373 if (result < 0)
374 ACE_ERROR_RETURN ((LM_ERROR,
375 "send failed:%p",
376 "Sender::pace_data send\n"),
377 -1);
379 ACE_DEBUG ((LM_DEBUG,
380 "Sender::pace_data frame %d was sent successfully\n",
381 ++this->frame_count_));
383 // Reset the message block.
384 this->mb_.reset ();
385 } // end while
387 // File reading is complete, destroy the stream.
388 AVStreams::flowSpec stop_spec;
389 this->streamctrl_->destroy (stop_spec);
391 // Shut the orb down.
392 //TAO_AV_CORE::instance ()->orb ()->shutdown (1,
393 //);
395 catch (const CORBA::Exception&)
397 //ACE_PRINT_EXCEPTION (ex,
398 // "Sender::pace_data Failed\n");
399 return -1;
401 return 0;
405 ACE_TMAIN (int argc,
406 ACE_TCHAR *argv[])
410 CORBA::ORB_var orb =
411 CORBA::ORB_init (argc, argv);
413 CORBA::Object_var obj
414 = orb->resolve_initial_references ("RootPOA");
416 // Get the POA_var object from Object_var
417 PortableServer::POA_var root_poa
418 = PortableServer::POA::_narrow (obj.in ());
420 PortableServer::POAManager_var mgr
421 = root_poa->the_POAManager ();
423 mgr->activate ();
425 // Initialize the AV Stream components.
426 /* TAO_AV_CORE::instance ()->init (orb.in (),
427 root_poa.in ()); */
429 // Initialize the AVStreams components.
430 TAO_AV_CORE::instance ()->init (orb.in (), root_poa.in ());
432 // Initialize the Sender.
433 int result = 0;
434 result = SENDER::instance ()->init (argc,
435 argv);
437 if (result < 0)
438 ACE_ERROR_RETURN ((LM_ERROR,
439 "Sender::init failed\n"),
440 -1);
442 // Make sure we have a valid <output_file>
443 output_file = ACE_OS::fopen (output_file_name,
444 "w");
445 if (output_file == 0)
446 ACE_ERROR_RETURN ((LM_DEBUG,
447 "Cannot open output file %s\n",
448 output_file_name),
449 -1);
451 else
452 ACE_DEBUG ((LM_DEBUG,
453 "File Opened Successfully\n"));
455 // Start sending data.
456 result = SENDER::instance ()->pace_data ();
457 ACE_Time_Value tv(3,0);
458 orb->run (tv);
460 catch (const CORBA::Exception& ex)
462 ex._tao_print_exception ("Sender Failed\n");
463 return -1;
466 SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
468 return 0;
471 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
472 template ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex>::singleton_;
473 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */