Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Component_Switching / sender.cpp
blobd8a91aecb9b95799b7d9b3e30e68c1ad07270dc0
1 #include "sender.h"
2 #include "tao/debug.h"
3 #include "ace/Get_Opt.h"
4 #include "ace/High_Res_Timer.h"
5 #include "ace/Event_Handler.h"
7 #include "tao/Strategies/advanced_resource.h"
9 // Create a singleton instance of the Sender.
11 // An Unmanaged_Singleton is used to avoid static object destruction
12 // order related problems since the underlying singleton object
13 // contains references to static TypeCodes.
14 typedef ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> SENDER;
16 int g_shutdown = 0;
17 /// Flag set when a signal is raised.
19 // constructor.
20 Signal_Handler::Signal_Handler ()
24 int
25 Signal_Handler::handle_signal (int signum, siginfo_t *, ucontext_t*)
27 if (signum == SIGINT)
29 if (TAO_debug_level > 0)
30 ACE_DEBUG ((LM_DEBUG,
31 "In the signal handler\n"));
33 g_shutdown = 1;
35 return 0;
38 ACE_CString &
39 Sender_Callback::flowname ()
41 return this->flowname_;
44 void
45 Sender_Callback::flowname (const ACE_CString &flowname)
47 this->flowname_ = flowname;
51 int
52 Sender_Callback::handle_destroy ()
54 SENDER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_.c_str ());
56 SENDER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());
58 SENDER::instance ()->connection_manager ().receivers ().unbind (this->flowname_.c_str ());
60 // SENDER::instance ()->remove_stream ();
62 return 0;
65 int
66 Sender_StreamEndPoint::get_callback (const char * flowname,
67 TAO_AV_Callback *&callback)
69 //SENDER::instance ()->add_stream ();
71 /// Create and return the client application callback and return to the AVStreams
72 /// for further upcalls.
73 callback = &this->callback_;
75 ACE_CString flow_name (flowname);
76 this->callback_.flowname (flow_name);
78 return 0;
81 int
82 Sender_StreamEndPoint::set_protocol_object (const char *flowname,
83 TAO_AV_Protocol_Object *object)
85 Connection_Manager &connection_manager =
86 SENDER::instance ()->connection_manager ();
88 /// Add to the map of protocol objects.
89 connection_manager.protocol_objects ().bind (flowname,
90 object);
92 /// Store the related streamctrl.
93 connection_manager.add_streamctrl (flowname,
94 this);
96 return 0;
99 CORBA::Boolean
100 Sender_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &flowspec)
102 /// If another receiver of the same flowname is in the map, destroy
103 /// the old stream.
104 for (CORBA::ULong i = 0;
105 i < flowspec.length ();
106 i++)
108 TAO_Forward_FlowSpec_Entry entry;
109 entry.parse (flowspec[i]);
111 ACE_CString flowname (entry.flowname ());
113 Connection_Manager &connection_manager =
114 SENDER::instance ()->connection_manager ();
116 int result =
117 connection_manager.protocol_objects ().find (flowname);
119 /// If the flowname is found.
120 if (result == 0)
122 ACE_DEBUG ((LM_DEBUG, "\nSender switching distributers\n\n"));
124 /// Destroy old stream with the same flowname.
125 connection_manager.destroy (flowname);
128 return 1;
131 Sender::Sender ()
132 : sender_mmdevice_ (0),
133 frame_count_ (0),
134 filename_ ("input"),
135 input_file_ (0),
136 frame_rate_ (5),
137 mb_ (BUFSIZ),
138 sender_name_ ("sender")
142 Sender::~Sender ()
144 if (TAO_debug_level > 0)
145 ACE_DEBUG ((LM_DEBUG,
146 "Sender destructor\n"));
149 void
150 Sender::shut_down ()
154 AVStreams::MMDevice_var mmdevice =
155 this->sender_mmdevice_->_this ();
157 SENDER::instance ()->connection_manager ().unbind_sender (this->sender_name_,
158 mmdevice.in ());
160 catch (const CORBA::Exception& ex)
162 ex._tao_print_exception ("Sender::shut_down Failed\n");
167 Sender::parse_args (int argc,
168 ACE_TCHAR *argv[])
170 /// Parse command line arguments
171 ACE_Get_Opt opts (argc, argv, ACE_TEXT("s:f:r:d"));
173 int c;
174 while ((c= opts ()) != -1)
176 switch (c)
178 case 'f':
179 this->filename_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
180 break;
181 case 'r':
182 this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
183 break;
184 case 's':
185 this->sender_name_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
186 break;
187 case 'd':
188 TAO_debug_level++;
189 break;
190 default:
191 ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
192 return -1;
195 return 0;
199 Sender::init (int argc,
200 ACE_TCHAR *argv[])
202 /// Initialize the endpoint strategy with the orb and poa.
203 int result =
204 this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
205 TAO_AV_CORE::instance ()->poa ());
206 if (result != 0)
207 return result;
209 /// Initialize the connection manager.
210 result =
211 this->connection_manager_.init (TAO_AV_CORE::instance ()->orb ());
212 if (result != 0)
213 return result;
215 /// Parse the command line arguments
216 result =
217 this->parse_args (argc,
218 argv);
219 if (result != 0)
220 return result;
223 ACE_Reactor *reactor =
224 TAO_AV_CORE::instance ()->reactor ();
227 if (reactor->register_handler (SIGINT,
228 &this->signal_handler_) == -1)
229 ACE_ERROR_RETURN ((LM_ERROR,
230 "Error in handler register\n"),
231 -1);
232 /// Register the signal handler for clean termination of the process.
235 /// Open file to read.
236 this->input_file_ =
237 ACE_OS::fopen (this->filename_.c_str (),
238 "r");
240 if (this->input_file_ == 0)
241 ACE_ERROR_RETURN ((LM_DEBUG,
242 "Cannot open input file %C\n",
243 this->filename_.c_str ()),
244 -1);
245 else
246 ACE_DEBUG ((LM_DEBUG,
247 "File opened successfully\n"));
249 /// Register the sender mmdevice object with the ORB
250 ACE_NEW_RETURN (this->sender_mmdevice_,
251 TAO_MMDevice (&this->endpoint_strategy_),
252 -1);
254 /// Servant Reference Counting to manage lifetime
255 PortableServer::ServantBase_var safe_mmdevice =
256 this->sender_mmdevice_;
258 AVStreams::MMDevice_var mmdevice =
259 this->sender_mmdevice_->_this ();
261 /// Register the object reference with the Naming Service and bind to
262 /// the receivers
263 this->connection_manager_.bind_to_receivers (this->sender_name_,
264 mmdevice.in ());
266 /// Connect to the receivers
267 this->connection_manager_.connect_to_receivers ();
269 return 0;
272 /// Method to send data at the specified rate
274 Sender::pace_data ()
276 /// The time that should lapse between two consecutive frames sent.
277 ACE_Time_Value inter_frame_time;
279 /// The time between two consecutive frames.
280 inter_frame_time.set (1.0 / this->frame_rate_);
282 if (TAO_debug_level > 0)
283 ACE_DEBUG ((LM_DEBUG,
284 "Frame Rate = %d / second\n"
285 "Inter Frame Time = %d (msec)\n",
286 this->frame_rate_,
287 inter_frame_time.msec ()));
291 /// The time taken for sending a frame and preparing for the next frame
292 ACE_High_Res_Timer elapsed_timer;
294 /// Continue to send data till the file is read to the end.
295 while (1)
297 if (g_shutdown == 1)
299 ACE_DEBUG ((LM_DEBUG,
300 "Shut Down called\n"));
302 this->shut_down ();
304 break;
307 /// Read from the file into a message block.
308 int n = ACE_OS::fread (this->mb_.wr_ptr (),
310 this->mb_.size (),
311 this->input_file_);
313 if (n < 0)
314 ACE_ERROR_RETURN ((LM_ERROR,
315 "Sender::pace_data fread failed\n"),
316 -1);
318 if (n == 0)
320 /// At end of file break the loop and end the sender.
321 if (TAO_debug_level > 0)
322 ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
324 this->shut_down ();
326 break;
329 this->mb_.wr_ptr (n);
331 if (this->frame_count_ > 1)
334 /// Second frame and beyond
337 /// Stop the timer that was started just before the previous frame was sent.
338 elapsed_timer.stop ();
340 /// Get the time elapsed after sending the previous frame.
341 ACE_Time_Value elapsed_time;
342 elapsed_timer.elapsed_time (elapsed_time);
344 if (TAO_debug_level > 0)
345 ACE_DEBUG ((LM_DEBUG,
346 "Elapsed Time = %d\n",
347 elapsed_time.msec ()));
349 /// Check to see if the inter frame time has elapsed.
350 if (elapsed_time < inter_frame_time)
352 /// Inter frame time has not elapsed.
354 /// Calculate the time to wait before the next frame needs to be sent.
355 ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
357 if (TAO_debug_level > 0)
358 ACE_DEBUG ((LM_DEBUG,
359 "Wait Time = %d\n",
360 wait_time.msec ()));
362 /// Run the orb for the wait time so the sender can
363 /// continue other orb requests.
364 TAO_AV_CORE::instance ()->orb ()->run (wait_time);
368 /// Start timer before sending the frame.
369 elapsed_timer.start ();
371 Connection_Manager::Protocol_Objects &protocol_objects =
372 this->connection_manager_.protocol_objects ();
374 /// Send frame to all receivers.
375 for (Connection_Manager::Protocol_Objects::iterator iterator = protocol_objects.begin ();
376 iterator != protocol_objects.end ();
377 ++iterator)
379 int result =
380 (*iterator).int_id_->send_frame (&this->mb_);
382 if (result < 0)
383 ACE_ERROR_RETURN ((LM_ERROR,
384 "send failed:%p",
385 "Sender::pace_data send\n"),
386 -1);
389 ACE_DEBUG ((LM_DEBUG,
390 "Sender::pace_data frame %d was sent successfully\n",
391 ++this->frame_count_));
393 /// Reset the message block.
394 this->mb_.reset ();
395 } /// end while
398 catch (const CORBA::Exception& ex)
400 ex._tao_print_exception ("Sender::pace_data Failed\n");
401 return -1;
403 return 0;
406 Connection_Manager &
407 Sender::connection_manager ()
409 return this->connection_manager_;
412 // void
413 // Sender::add_stream ()
414 // {
415 // this->stream_count_++;
416 // }
418 // void
419 // Sender::remove_stream ()
420 // {
421 // this->stream_count_--;
422 // }
424 // int
425 // Sender::stream_alive ()
426 // {
427 // return this->stream_count_;
428 // }
431 ACE_TMAIN (int argc,
432 ACE_TCHAR *argv[])
436 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
438 CORBA::Object_var obj
439 = orb->resolve_initial_references ("RootPOA");
441 ///Get the POA_var object from Object_var
442 PortableServer::POA_var root_poa
443 = PortableServer::POA::_narrow (obj.in ());
445 PortableServer::POAManager_var mgr
446 = root_poa->the_POAManager ();
448 mgr->activate ();
450 /// Initialize the AV Stream components.
451 TAO_AV_CORE::instance ()->init (orb.in (),
452 root_poa.in ());
454 /// Initialize the Client.
455 int result = 0;
456 result = SENDER::instance ()->init (argc,
457 argv);
459 if (result < 0)
460 ACE_ERROR_RETURN ((LM_ERROR,
461 "client::init failed\n"), -1);
463 SENDER::instance ()->pace_data ();
465 orb->destroy ();
467 catch (const CORBA::Exception& ex)
469 ex._tao_print_exception ("Sender Failed\n");
470 return -1;
473 SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
475 return 0;
478 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
479 template ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex>::singleton_;
480 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */