Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Multiple_Flows / sender.cpp
blobbcd859a1f5b61a29043c1ce1e79084a8b400b585
1 #include "sender.h"
2 #include "tao/debug.h"
3 #include "ace/Get_Opt.h"
4 #include "ace/High_Res_Timer.h"
6 // Create a singleton instance of the Sender.
8 // An Unmanaged_Singleton is used to avoid static object destruction
9 // order related problems since the underlying singleton object
10 // contains references to static TypeCodes.
11 typedef ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> SENDER;
14 int
15 Sender_StreamEndPoint::get_callback (const char *,
16 TAO_AV_Callback *&callback)
18 // Create and return the sender application callback to AVStreams
19 // for further upcalls.
20 callback = &this->callback_;
21 return 0;
24 int
25 Sender_StreamEndPoint::set_protocol_object (const char *,
26 TAO_AV_Protocol_Object *object)
28 // Set the sender protocol object corresponding to the transport
29 // protocol selected.
30 SENDER::instance ()->protocol_object (object);
31 return 0;
34 Sender::Sender ()
35 : sender_mmdevice_ (0),
36 streamctrl_ (0),
37 frame_count_ (0),
38 filename_ ("input"),
39 input_file_ (0),
40 protocol_ ("UDP"),
41 frame_rate_ (30),
42 mb_ (BUFSIZ)
46 void
47 Sender::protocol_object (TAO_AV_Protocol_Object *object)
49 // Set the sender protocol object corresponding to the transport
50 // protocol selected.
51 this->protocol_object_.insert (object);
54 void
55 Sender::shutdown ()
57 // File reading is complete, destroy the stream.
58 AVStreams::flowSpec stop_spec;
59 this->streamctrl_->destroy (stop_spec);
61 // Shut the orb down.
62 TAO_AV_CORE::instance ()->orb ()->shutdown (0);
65 int
66 Sender::parse_args (int argc,
67 ACE_TCHAR *argv[])
69 // Parse command line arguments
70 ACE_Get_Opt opts (argc, argv, ACE_TEXT("f:p:r:d"));
72 int c;
73 while ((c= opts ()) != -1)
75 switch (c)
77 case 'f':
78 this->filename_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
79 break;
80 case 'p':
81 this->protocol_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
82 break;
83 case 'r':
84 this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
85 break;
86 case 'd':
87 TAO_debug_level++;
88 break;
89 default:
90 ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
91 return -1;
94 return 0;
97 // Method to get the object reference of the receiver
98 int
99 Sender::bind_to_receiver ()
101 CosNaming::Name name (1);
102 name.length (1);
103 name [0].id =
104 CORBA::string_dup ("Receiver");
106 // Resolve the receiver object reference from the Naming Service
107 CORBA::Object_var receiver_mmdevice_obj =
108 this->naming_client_->resolve (name);
110 this->receiver_mmdevice_ =
111 AVStreams::MMDevice::_narrow (receiver_mmdevice_obj.in ());
113 if (CORBA::is_nil (this->receiver_mmdevice_.in ()))
114 ACE_ERROR_RETURN ((LM_ERROR,
115 "Could not resolve Receiver_MMdevice in Naming service\n"),
116 -1);
118 return 0;
122 Sender::init (int argc,
123 ACE_TCHAR *argv[])
125 // Initialize the endpoint strategy with the orb and poa.
126 int result =
127 this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
128 TAO_AV_CORE::instance ()->poa ());
129 if (result != 0)
130 return result;
132 // Initialize the naming services
133 result =
134 this->naming_client_.init (TAO_AV_CORE::instance ()->orb ());
135 if (result != 0)
136 return result;
138 // Parse the command line arguments
139 result =
140 this->parse_args (argc,
141 argv);
142 if (result != 0)
143 return result;
145 // Open file to read.
146 this->input_file_ =
147 ACE_OS::fopen (this->filename_.c_str (),
148 "r");
150 if (this->input_file_ == 0)
151 ACE_ERROR_RETURN ((LM_DEBUG,
152 "Cannot open input file %C\n",
153 this->filename_.c_str ()),
154 -1);
155 else
156 ACE_DEBUG ((LM_DEBUG,
157 "File opened successfully\n"));
159 // Resolve the object reference of the receiver from the Naming Service.
160 result = this->bind_to_receiver ();
162 if (result != 0)
163 ACE_ERROR_RETURN ((LM_ERROR,
164 "(%P|%t) Error binding to the naming service\n"),
165 -1);
168 // Initialize the QoS
169 AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
171 // Create the forward flow specification to describe the flow.
172 TAO_Forward_FlowSpec_Entry entry ("Data_Receiver_One",
173 "IN",
174 "USER_DEFINED",
176 this->protocol_.c_str (),
178 // &addr1_local, // ACE_INET_Addr addr1_local ("IP1:Port1");
180 AVStreams::flowSpec flow_spec (1);
181 flow_spec.length (2);
182 flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
184 // Create the forward flow specification to describe the flow.
185 TAO_Forward_FlowSpec_Entry entry1 ("Data_Receiver_Two",
186 "IN",
187 "USER_DEFINED",
189 this->protocol_.c_str (),
191 // &addr2_local, // ACE_INET_Addr addr2_local ("IP2:Port2");
193 flow_spec [1] = CORBA::string_dup (entry1.entry_to_string ());
195 // Register the sender mmdevice object with the ORB
196 ACE_NEW_RETURN (this->sender_mmdevice_,
197 TAO_MMDevice (&this->endpoint_strategy_),
198 -1);
200 // Servant Reference Counting to manage lifetime
201 PortableServer::ServantBase_var safe_mmdevice =
202 this->sender_mmdevice_;
204 AVStreams::MMDevice_var mmdevice =
205 this->sender_mmdevice_->_this ();
207 ACE_NEW_RETURN (this->streamctrl_,
208 TAO_StreamCtrl,
209 -1);
211 PortableServer::ServantBase_var safe_streamctrl =
212 this->streamctrl_;
214 // Bind/Connect the sender and receiver MMDevices.
215 CORBA::Boolean bind_result =
216 this->streamctrl_->bind_devs (mmdevice.in (),
217 this->receiver_mmdevice_.in (),
218 the_qos.inout (),
219 flow_spec);
221 if (bind_result == 0)
222 ACE_ERROR_RETURN ((LM_ERROR,
223 "streamctrl::bind_devs failed\n"),
224 -1);
226 return 0;
229 // Method to send data at the specified rate
231 Sender::pace_data ()
233 // The time that should lapse between two consecutive frames sent.
234 ACE_Time_Value inter_frame_time;
236 // The time between two consecutive frames.
237 inter_frame_time.set (1 / (double) this->frame_rate_);
239 if (TAO_debug_level > 0)
240 ACE_DEBUG ((LM_DEBUG,
241 "Frame Rate = %d / second\n"
242 "Inter Frame Time = %d (msec)\n",
243 this->frame_rate_,
244 inter_frame_time.msec ()));
248 // The time taken for sending a frame and preparing for the next frame
249 ACE_High_Res_Timer elapsed_timer;
251 // Continue to send data till the file is read to the end.
252 while (1)
254 // Read from the file into a message block.
255 int n = ACE_OS::fread (this->mb_.wr_ptr (),
257 this->mb_.size (),
258 this->input_file_);
260 if (n < 0)
261 ACE_ERROR_RETURN ((LM_ERROR,
262 "Sender::pace_data fread failed\n"),
263 -1);
265 if (n == 0)
267 // At end of file break the loop and end the sender.
268 if (TAO_debug_level > 0)
269 ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
270 break;
273 this->mb_.wr_ptr (n);
275 if (this->frame_count_ > 1)
278 // Second frame and beyond
281 // Stop the timer that was started just before the previous frame was sent.
282 elapsed_timer.stop ();
284 // Get the time elapsed after sending the previous frame.
285 ACE_Time_Value elapsed_time;
286 elapsed_timer.elapsed_time (elapsed_time);
288 if (TAO_debug_level > 0)
289 ACE_DEBUG ((LM_DEBUG,
290 "Elapsed Time = %d\n",
291 elapsed_time.msec ()));
293 // Check to see if the inter frame time has elapsed.
294 if (elapsed_time < inter_frame_time)
296 // Inter frame time has not elapsed.
298 // Calculate the time to wait before the next frame needs to be sent.
299 ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
301 if (TAO_debug_level > 0)
302 ACE_DEBUG ((LM_DEBUG,
303 "Wait Time = %d\n",
304 wait_time.msec ()));
306 // Run the orb for the wait time so the sender can
307 // continue other orb requests.
308 TAO_AV_CORE::instance ()->orb ()->run (wait_time);
312 // Start timer before sending the frame.
313 elapsed_timer.start ();
315 ProtocolObject_SetItor end = this->protocol_object_.end ();
316 for (ProtocolObject_SetItor begin = this->protocol_object_.begin ();
317 begin != end; ++begin)
319 // Send frame.
320 int result =
321 (*begin)->send_frame (&this->mb_);
323 if (result < 0)
324 ACE_ERROR_RETURN ((LM_ERROR,
325 "send failed:%p",
326 "Sender::pace_data send\n"),
327 -1);
331 ACE_DEBUG ((LM_DEBUG,
332 "Sender::pace_data frame %d was sent successfully\n",
333 ++this->frame_count_));
335 // Reset the message block.
336 this->mb_.reset ();
337 } // end while
339 ACE_OS::sleep (1);
341 this->shutdown ();
343 catch (const CORBA::Exception& ex)
345 ex._tao_print_exception ("Sender::pace_data Failed\n");
346 return -1;
348 return 0;
352 ACE_TMAIN (int argc,
353 ACE_TCHAR *argv[])
357 CORBA::ORB_var orb =
358 CORBA::ORB_init (argc, argv);
360 CORBA::Object_var obj
361 = orb->resolve_initial_references ("RootPOA");
363 // Get the POA_var object from Object_var
364 PortableServer::POA_var root_poa
365 = PortableServer::POA::_narrow (obj.in ());
367 PortableServer::POAManager_var mgr
368 = root_poa->the_POAManager ();
370 mgr->activate ();
372 // Initialize the AV Stream components.
373 TAO_AV_CORE::instance ()->init (orb.in (),
374 root_poa.in ());
376 // Initialize the Sender.
377 int result = 0;
378 result = SENDER::instance ()->init (argc,
379 argv);
381 if (result < 0)
382 ACE_ERROR_RETURN ((LM_ERROR,
383 "Sender::init failed\n"),
384 -1);
386 // Start sending data.
387 result = SENDER::instance ()->pace_data ();
389 catch (const CORBA::Exception& ex)
391 ex._tao_print_exception ("Sender Failed\n");
392 return -1;
395 SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
397 return 0;
400 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
401 template ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex>::singleton_;
402 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */