Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Component_Switching / sender.cpp
blobb7b47d1a48758429cf9c53b3162ffd7cabd91066
1 #include "sender.h"
2 #include "tao/debug.h"
3 #include "ace/Get_Opt.h"
4 #include "ace/High_Res_Timer.h"
5 #include "ace/Event_Handler.h"
7 #include "tao/Strategies/advanced_resource.h"
9 // Create a singleton instance of the Sender.
11 // An Unmanaged_Singleton is used to avoid static object destruction
12 // order related problems since the underlying singleton object
13 // contains references to static TypeCodes.
14 typedef ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> SENDER;
16 int g_shutdown = 0;
17 /// Flag set when a signal is raised.
19 // constructor.
20 Signal_Handler::Signal_Handler (void)
24 int
25 Signal_Handler::handle_signal (int signum, siginfo_t *, ucontext_t*)
27 if (signum == SIGINT)
29 if (TAO_debug_level > 0)
30 ACE_DEBUG ((LM_DEBUG,
31 "In the signal handler\n"));
33 g_shutdown = 1;
36 return 0;
39 ACE_CString &
40 Sender_Callback::flowname (void)
42 return this->flowname_;
45 void
46 Sender_Callback::flowname (const ACE_CString &flowname)
48 this->flowname_ = flowname;
52 int
53 Sender_Callback::handle_destroy (void)
55 SENDER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_.c_str ());
57 SENDER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());
59 SENDER::instance ()->connection_manager ().receivers ().unbind (this->flowname_.c_str ());
61 // SENDER::instance ()->remove_stream ();
63 return 0;
66 int
67 Sender_StreamEndPoint::get_callback (const char * flowname,
68 TAO_AV_Callback *&callback)
70 //SENDER::instance ()->add_stream ();
72 /// Create and return the client application callback and return to the AVStreams
73 /// for further upcalls.
74 callback = &this->callback_;
76 ACE_CString flow_name (flowname);
77 this->callback_.flowname (flow_name);
79 return 0;
82 int
83 Sender_StreamEndPoint::set_protocol_object (const char *flowname,
84 TAO_AV_Protocol_Object *object)
86 Connection_Manager &connection_manager =
87 SENDER::instance ()->connection_manager ();
89 /// Add to the map of protocol objects.
90 connection_manager.protocol_objects ().bind (flowname,
91 object);
93 /// Store the related streamctrl.
94 connection_manager.add_streamctrl (flowname,
95 this);
97 return 0;
100 CORBA::Boolean
101 Sender_StreamEndPoint::handle_preconnect (AVStreams::flowSpec &flowspec)
103 /// If another receiver of the same flowname is in the map, destroy
104 /// the old stream.
105 for (CORBA::ULong i = 0;
106 i < flowspec.length ();
107 i++)
109 TAO_Forward_FlowSpec_Entry entry;
110 entry.parse (flowspec[i]);
112 ACE_CString flowname (entry.flowname ());
114 Connection_Manager &connection_manager =
115 SENDER::instance ()->connection_manager ();
117 int result =
118 connection_manager.protocol_objects ().find (flowname);
120 /// If the flowname is found.
121 if (result == 0)
123 ACE_DEBUG ((LM_DEBUG, "\nSender switching distributers\n\n"));
125 /// Destroy old stream with the same flowname.
126 connection_manager.destroy (flowname);
129 return 1;
132 Sender::Sender (void)
133 : sender_mmdevice_ (0),
134 frame_count_ (0),
135 filename_ ("input"),
136 input_file_ (0),
137 frame_rate_ (5),
138 mb_ (BUFSIZ),
139 sender_name_ ("sender")
143 Sender::~Sender (void)
145 if (TAO_debug_level > 0)
146 ACE_DEBUG ((LM_DEBUG,
147 "Sender destructor\n"));
150 void
151 Sender::shut_down (void)
155 AVStreams::MMDevice_var mmdevice =
156 this->sender_mmdevice_->_this ();
158 SENDER::instance ()->connection_manager ().unbind_sender (this->sender_name_,
159 mmdevice.in ());
162 catch (const CORBA::Exception& ex)
164 ex._tao_print_exception ("Sender::shut_down Failed\n");
169 Sender::parse_args (int argc,
170 ACE_TCHAR *argv[])
172 /// Parse command line arguments
173 ACE_Get_Opt opts (argc, argv, ACE_TEXT("s:f:r:d"));
175 int c;
176 while ((c= opts ()) != -1)
178 switch (c)
180 case 'f':
181 this->filename_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
182 break;
183 case 'r':
184 this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
185 break;
186 case 's':
187 this->sender_name_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
188 break;
189 case 'd':
190 TAO_debug_level++;
191 break;
192 default:
193 ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
194 return -1;
197 return 0;
201 Sender::init (int argc,
202 ACE_TCHAR *argv[])
204 /// Initialize the endpoint strategy with the orb and poa.
205 int result =
206 this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
207 TAO_AV_CORE::instance ()->poa ());
208 if (result != 0)
209 return result;
211 /// Initialize the connection manager.
212 result =
213 this->connection_manager_.init (TAO_AV_CORE::instance ()->orb ());
214 if (result != 0)
215 return result;
217 /// Parse the command line arguments
218 result =
219 this->parse_args (argc,
220 argv);
221 if (result != 0)
222 return result;
225 ACE_Reactor *reactor =
226 TAO_AV_CORE::instance ()->reactor ();
229 if (reactor->register_handler (SIGINT,
230 &this->signal_handler_) == -1)
231 ACE_ERROR_RETURN ((LM_ERROR,
232 "Error in handler register\n"),
233 -1);
234 /// Register the signal handler for clean termination of the process.
237 /// Open file to read.
238 this->input_file_ =
239 ACE_OS::fopen (this->filename_.c_str (),
240 "r");
242 if (this->input_file_ == 0)
243 ACE_ERROR_RETURN ((LM_DEBUG,
244 "Cannot open input file %C\n",
245 this->filename_.c_str ()),
246 -1);
247 else
248 ACE_DEBUG ((LM_DEBUG,
249 "File opened successfully\n"));
251 /// Register the sender mmdevice object with the ORB
252 ACE_NEW_RETURN (this->sender_mmdevice_,
253 TAO_MMDevice (&this->endpoint_strategy_),
254 -1);
256 /// Servant Reference Counting to manage lifetime
257 PortableServer::ServantBase_var safe_mmdevice =
258 this->sender_mmdevice_;
260 AVStreams::MMDevice_var mmdevice =
261 this->sender_mmdevice_->_this ();
263 /// Register the object reference with the Naming Service and bind to
264 /// the receivers
265 this->connection_manager_.bind_to_receivers (this->sender_name_,
266 mmdevice.in ());
268 /// Connect to the receivers
269 this->connection_manager_.connect_to_receivers ();
271 return 0;
274 /// Method to send data at the specified rate
276 Sender::pace_data (void)
278 /// The time that should lapse between two consecutive frames sent.
279 ACE_Time_Value inter_frame_time;
281 /// The time between two consecutive frames.
282 inter_frame_time.set (1.0 / this->frame_rate_);
284 if (TAO_debug_level > 0)
285 ACE_DEBUG ((LM_DEBUG,
286 "Frame Rate = %d / second\n"
287 "Inter Frame Time = %d (msec)\n",
288 this->frame_rate_,
289 inter_frame_time.msec ()));
293 /// The time taken for sending a frame and preparing for the next frame
294 ACE_High_Res_Timer elapsed_timer;
296 /// Continue to send data till the file is read to the end.
297 while (1)
300 if (g_shutdown == 1)
302 ACE_DEBUG ((LM_DEBUG,
303 "Shut Down called\n"));
305 this->shut_down ();
307 break;
310 /// Read from the file into a message block.
311 int n = ACE_OS::fread (this->mb_.wr_ptr (),
313 this->mb_.size (),
314 this->input_file_);
316 if (n < 0)
317 ACE_ERROR_RETURN ((LM_ERROR,
318 "Sender::pace_data fread failed\n"),
319 -1);
321 if (n == 0)
323 /// At end of file break the loop and end the sender.
324 if (TAO_debug_level > 0)
325 ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
327 this->shut_down ();
329 break;
333 this->mb_.wr_ptr (n);
335 if (this->frame_count_ > 1)
338 /// Second frame and beyond
341 /// Stop the timer that was started just before the previous frame was sent.
342 elapsed_timer.stop ();
344 /// Get the time elapsed after sending the previous frame.
345 ACE_Time_Value elapsed_time;
346 elapsed_timer.elapsed_time (elapsed_time);
348 if (TAO_debug_level > 0)
349 ACE_DEBUG ((LM_DEBUG,
350 "Elapsed Time = %d\n",
351 elapsed_time.msec ()));
353 /// Check to see if the inter frame time has elapsed.
354 if (elapsed_time < inter_frame_time)
356 /// Inter frame time has not elapsed.
358 /// Calculate the time to wait before the next frame needs to be sent.
359 ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
361 if (TAO_debug_level > 0)
362 ACE_DEBUG ((LM_DEBUG,
363 "Wait Time = %d\n",
364 wait_time.msec ()));
366 /// Run the orb for the wait time so the sender can
367 /// continue other orb requests.
368 TAO_AV_CORE::instance ()->orb ()->run (wait_time);
373 /// Start timer before sending the frame.
374 elapsed_timer.start ();
376 Connection_Manager::Protocol_Objects &protocol_objects =
377 this->connection_manager_.protocol_objects ();
379 /// Send frame to all receivers.
380 for (Connection_Manager::Protocol_Objects::iterator iterator = protocol_objects.begin ();
381 iterator != protocol_objects.end ();
382 ++iterator)
384 int result =
385 (*iterator).int_id_->send_frame (&this->mb_);
387 if (result < 0)
388 ACE_ERROR_RETURN ((LM_ERROR,
389 "send failed:%p",
390 "Sender::pace_data send\n"),
391 -1);
394 ACE_DEBUG ((LM_DEBUG,
395 "Sender::pace_data frame %d was sent successfully\n",
396 ++this->frame_count_));
398 /// Reset the message block.
399 this->mb_.reset ();
401 } /// end while
404 catch (const CORBA::Exception& ex)
406 ex._tao_print_exception ("Sender::pace_data Failed\n");
407 return -1;
409 return 0;
412 Connection_Manager &
413 Sender::connection_manager (void)
415 return this->connection_manager_;
418 // void
419 // Sender::add_stream (void)
420 // {
421 // this->stream_count_++;
422 // }
424 // void
425 // Sender::remove_stream (void)
426 // {
427 // this->stream_count_--;
428 // }
430 // int
431 // Sender::stream_alive (void)
432 // {
433 // return this->stream_count_;
434 // }
437 ACE_TMAIN (int argc,
438 ACE_TCHAR *argv[])
442 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
444 CORBA::Object_var obj
445 = orb->resolve_initial_references ("RootPOA");
447 ///Get the POA_var object from Object_var
448 PortableServer::POA_var root_poa
449 = PortableServer::POA::_narrow (obj.in ());
451 PortableServer::POAManager_var mgr
452 = root_poa->the_POAManager ();
454 mgr->activate ();
456 /// Initialize the AV Stream components.
457 TAO_AV_CORE::instance ()->init (orb.in (),
458 root_poa.in ());
460 /// Initialize the Client.
461 int result = 0;
462 result = SENDER::instance ()->init (argc,
463 argv);
465 if (result < 0)
466 ACE_ERROR_RETURN ((LM_ERROR,
467 "client::init failed\n"), -1);
469 SENDER::instance ()->pace_data ();
471 orb->destroy ();
473 catch (const CORBA::Exception& ex)
475 ex._tao_print_exception ("Sender Failed\n");
476 return -1;
479 SENDER::close (); // Explicitly finalize the Unmanaged_Singleton.
481 return 0;
484 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
485 template ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Sender, ACE_Null_Mutex>::singleton_;
486 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */