Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Pluggable_Flow_Protocol / sender.cpp
blob330e26f925e8db96437c9cf2d8a42e79627515f2
1 #include "sender.h"
2 #include "tao/debug.h"
3 #include "ace/Get_Opt.h"
4 #include "ace/High_Res_Timer.h"
6 // Create a singleton instance of the Sender.
8 // An Unmanaged_Singleton is used to avoid static object destruction
9 // order related problems since the underlying singleton object
10 // contains references to static TypeCodes.
11 typedef ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> SENDER;
14 int
15 Sender_StreamEndPoint::get_callback (const char *,
16 TAO_AV_Callback *&callback)
18 // Create and return the sender application callback to AVStreams
19 // for further upcalls.
20 callback = &this->callback_;
21 return 0;
24 int
25 Sender_StreamEndPoint::set_protocol_object (const char *,
26 TAO_AV_Protocol_Object *object)
28 // Set the sender protocol object corresponding to the transport
29 // protocol selected.
30 SENDER::instance ()->protocol_object (object);
31 return 0;
34 Sender::Sender ()
35 : sender_mmdevice_ (0),
36 streamctrl_ (0),
37 frame_count_ (0),
38 filename_ ("input"),
39 input_file_ (0),
40 protocol_ ("UDP"),
41 frame_rate_ (30),
42 mb_ (BUFSIZ)
46 void
47 Sender::protocol_object (TAO_AV_Protocol_Object *object)
49 // Set the sender protocol object corresponding to the transport
50 // protocol selected.
51 this->protocol_object_ = object;
54 int
55 Sender::parse_args (int argc,
56 ACE_TCHAR *argv[])
58 // Parse command line arguments
59 ACE_Get_Opt opts (argc, argv, ACE_TEXT("f:p:r:d"));
61 int c;
62 while ((c= opts ()) != -1)
64 switch (c)
66 case 'f':
67 this->filename_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
68 break;
69 case 'p':
70 this->protocol_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
71 break;
72 case 'r':
73 this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
74 break;
75 case 'd':
76 TAO_debug_level++;
77 break;
78 default:
79 ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
80 return -1;
83 return 0;
86 // Method to get the object reference of the receiver
87 int
88 Sender::bind_to_receiver ()
90 CosNaming::Name name (1);
91 name.length (1);
92 name [0].id =
93 CORBA::string_dup ("Receiver");
95 // Resolve the receiver object reference from the Naming Service
96 CORBA::Object_var receiver_mmdevice_obj =
97 this->naming_client_->resolve (name);
99 this->receiver_mmdevice_ =
100 AVStreams::MMDevice::_narrow (receiver_mmdevice_obj.in ());
102 if (CORBA::is_nil (this->receiver_mmdevice_.in ()))
103 ACE_ERROR_RETURN ((LM_ERROR,
104 "Could not resolve Receiver_MMdevice in Naming service\n"),
105 -1);
107 return 0;
111 Sender::init (int argc,
112 ACE_TCHAR *argv[])
114 // Initialize the endpoint strategy with the orb and poa.
115 int result =
116 this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
117 TAO_AV_CORE::instance ()->poa ());
118 if (result != 0)
119 return result;
121 // Initialize the naming services
122 result =
123 this->naming_client_.init (TAO_AV_CORE::instance ()->orb ());
124 if (result != 0)
125 return result;
127 // Parse the command line arguments
128 result =
129 this->parse_args (argc,
130 argv);
131 if (result != 0)
132 return result;
134 // Open file to read.
135 this->input_file_ =
136 ACE_OS::fopen (this->filename_.c_str (),
137 "r");
139 if (this->input_file_ == 0)
140 ACE_ERROR_RETURN ((LM_DEBUG,
141 "Cannot open input file %C\n",
142 this->filename_.c_str ()),
143 -1);
144 else
145 ACE_DEBUG ((LM_DEBUG,
146 "File opened successfully\n"));
148 // Resolve the object reference of the receiver from the Naming Service.
149 result = this->bind_to_receiver ();
151 if (result != 0)
152 ACE_ERROR_RETURN ((LM_ERROR,
153 "(%P|%t) Error binding to the naming service\n"),
154 -1);
157 // Initialize the QoS
158 AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
160 // Create the forward flow specification to describe the flow.
161 TAO_Forward_FlowSpec_Entry entry ("Data_Receiver",
162 "IN",
163 "USER_DEFINED",
164 "TS",
165 this->protocol_.c_str (),
168 AVStreams::flowSpec flow_spec (1);
169 flow_spec.length (1);
170 flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
172 // Register the sender mmdevice object with the ORB
173 ACE_NEW_RETURN (this->sender_mmdevice_,
174 TAO_MMDevice (&this->endpoint_strategy_),
175 -1);
177 // Servant Reference Counting to manage lifetime
178 PortableServer::ServantBase_var safe_mmdevice =
179 this->sender_mmdevice_;
181 AVStreams::MMDevice_var mmdevice =
182 this->sender_mmdevice_->_this ();
184 ACE_NEW_RETURN (this->streamctrl_,
185 TAO_StreamCtrl,
186 -1);
188 PortableServer::ServantBase_var safe_streamctrl =
189 this->streamctrl_;
191 // Bind/Connect the sender and receiver MMDevices.
192 CORBA::Boolean bind_result =
193 this->streamctrl_->bind_devs (mmdevice.in (),
194 this->receiver_mmdevice_.in (),
195 the_qos.inout (),
196 flow_spec);
198 if (bind_result == 0)
199 ACE_ERROR_RETURN ((LM_ERROR,
200 "streamctrl::bind_devs failed\n"),
201 -1);
203 return 0;
206 // Method to send data at the specified rate
208 Sender::pace_data ()
210 // The time that should lapse between two consecutive frames sent.
211 ACE_Time_Value inter_frame_time;
213 // The time between two consecutive frames.
214 inter_frame_time.set (1 / (double) this->frame_rate_);
216 if (TAO_debug_level > 0)
217 ACE_DEBUG ((LM_DEBUG,
218 "Frame Rate = %d / second\n"
219 "Inter Frame Time = %d (msec)\n",
220 this->frame_rate_,
221 inter_frame_time.msec ()));
225 // The time taken for sending a frame and preparing for the next frame
226 ACE_High_Res_Timer elapsed_timer;
228 // Continue to send data till the file is read to the end.
229 while (1)
231 // Read from the file into a message block.
232 int n = ACE_OS::fread (this->mb_.wr_ptr (),
234 this->mb_.size (),
235 this->input_file_);
237 if (n < 0)
238 ACE_ERROR_RETURN ((LM_ERROR,
239 "Sender::pace_data fread failed\n"),
240 -1);
242 if (n == 0)
244 // At end of file break the loop and end the sender.
245 if (TAO_debug_level > 0)
246 ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
247 break;
250 this->mb_.wr_ptr (n);
252 if (this->frame_count_ > 1)
255 // Second frame and beyond
258 // Stop the timer that was started just before the previous frame was sent.
259 elapsed_timer.stop ();
261 // Get the time elapsed after sending the previous frame.
262 ACE_Time_Value elapsed_time;
263 elapsed_timer.elapsed_time (elapsed_time);
265 if (TAO_debug_level > 0)
266 ACE_DEBUG ((LM_DEBUG,
267 "Elapsed Time = %d\n",
268 elapsed_time.msec ()));
270 // Check to see if the inter frame time has elapsed.
271 if (elapsed_time < inter_frame_time)
273 // Inter frame time has not elapsed.
275 // Calculate the time to wait before the next frame needs to be sent.
276 ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
278 if (TAO_debug_level > 0)
279 ACE_DEBUG ((LM_DEBUG,
280 "Wait Time = %d\n",
281 wait_time.msec ()));
283 // Run the orb for the wait time so the sender can
284 // continue other orb requests.
285 TAO_AV_CORE::instance ()->orb ()->run (wait_time);
289 // Start timer before sending the frame.
290 elapsed_timer.start ();
292 // Send frame.
293 int result =
294 this->protocol_object_->send_frame (&this->mb_);
296 if (result < 0)
297 ACE_ERROR_RETURN ((LM_ERROR,
298 "send failed:%p",
299 "Sender::pace_data send\n"),
300 -1);
302 ACE_DEBUG ((LM_DEBUG,
303 "Sender::pace_data frame %d was sent successfully\n",
304 ++this->frame_count_));
306 // Reset the message block.
307 this->mb_.reset ();
308 } // end while
310 // File reading is complete, destroy the stream.
311 AVStreams::flowSpec stop_spec;
312 this->streamctrl_->destroy (stop_spec);
314 // Shut the orb down.
315 TAO_AV_CORE::instance ()->orb ()->shutdown (0);
317 catch (const CORBA::Exception& ex)
319 ex._tao_print_exception ("Sender::pace_data Failed\n");
320 return -1;
322 return 0;
326 ACE_TMAIN (int argc,
327 ACE_TCHAR *argv[])
331 ACE_High_Res_Timer::global_scale_factor ();
333 CORBA::ORB_var orb =
334 CORBA::ORB_init (argc, argv);
336 CORBA::Object_var obj
337 = orb->resolve_initial_references ("RootPOA");
339 // Get the POA_var object from Object_var
340 PortableServer::POA_var root_poa
341 = PortableServer::POA::_narrow (obj.in ());
343 PortableServer::POAManager_var mgr
344 = root_poa->the_POAManager ();
346 mgr->activate ();
348 // Initialize the AV Stream components.
349 TAO_AV_CORE::instance ()->init (orb.in (),
350 root_poa.in ());
352 // Initialize the Sender.
353 int result = 0;
354 result = SENDER::instance ()->init (argc,
355 argv);
357 if (result < 0)
358 ACE_ERROR_RETURN ((LM_ERROR,
359 "Sender::init failed\n"),
360 -1);
362 // Start sending data.
363 result = SENDER::instance ()->pace_data ();
365 catch (const CORBA::Exception& ex)
367 ex._tao_print_exception ("Sender Failed\n");
368 return -1;
371 SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
373 return 0;
376 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
377 template ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex>::singleton_;
378 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */