Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Component_Switching / receiver.cpp
blob68262b9a5a565a395a1183d39ce20a7c3ffbdadc
1 #include "receiver.h"
2 #include "ace/Get_Opt.h"
3 #include "tao/debug.h"
5 #include "tao/Strategies/advanced_resource.h"
7 static FILE *output_file = 0;
8 /// File handle of the file into which received data is written.
10 static int done = 0;
11 /// Flag set when a signal is raised.
13 // constructor.
14 Signal_Handler::Signal_Handler ()
18 int
19 Signal_Handler::handle_signal (int signum, siginfo_t *, ucontext_t*)
21 if (signum == SIGINT)
23 if (TAO_debug_level > 0)
24 ACE_DEBUG ((LM_DEBUG,
25 "In the signal handler\n"));
27 done = 1;
29 return 0;
33 Connection_Manager *connection_manager;
35 int
36 Receiver_StreamEndPoint::get_callback (const char *flow_name,
37 TAO_AV_Callback *&callback)
39 /// Return the receiver application callback to the AVStreams for
40 /// future upcalls.
41 callback = &this->callback_;
43 ACE_CString fname = flow_name;
44 this->callback_.flowname (fname);
45 return 0;
48 CORBA::Boolean
49 Receiver_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &flowspec)
51 if (TAO_debug_level > 0)
52 ACE_DEBUG ((LM_DEBUG,
53 "In Handle Connection Requested\n"));
55 for (CORBA::ULong i = 0;
56 i < flowspec.length ();
57 i++)
59 TAO_Forward_FlowSpec_Entry entry;
60 entry.parse (flowspec[i]);
62 ACE_DEBUG ((LM_DEBUG,
63 "Handle Connection Requested flowname %C\n",
64 entry.flowname ()));
66 ACE_CString flowname (entry.flowname ());
68 /// Store the related streamctrl.
69 connection_manager->add_streamctrl (flowname.c_str (),
70 this);
72 return 1;
75 Receiver_Callback::Receiver_Callback ()
76 : frame_count_ (1)
80 ACE_CString &
81 Receiver_Callback::flowname ()
83 return this->flowname_;
86 void
87 Receiver_Callback::flowname (const ACE_CString &flowname)
89 this->flowname_ = flowname;
92 int
93 Receiver_Callback::handle_destroy ()
95 /// Called when the sender requests the stream to be shutdown.
96 ACE_DEBUG ((LM_DEBUG,
97 "Receiver_Callback::end_stream\n"));
99 /// Remove the related stream control reference.
100 connection_manager->streamctrls ().unbind (this->flowname_.c_str ());
102 return 0;
105 Receiver_Callback::receive_frame (ACE_Message_Block *frame,
106 TAO_AV_frame_info *,
107 const ACE_Addr &)
110 /// Upcall from the AVStreams when there is data to be received from
111 /// the sender.
113 ACE_DEBUG ((LM_DEBUG,
114 "Receiver_Callback::receive_frame for frame %d\n",
115 this->frame_count_++));
117 while (frame != 0)
119 /// Write the received data to the file.
120 int result =
121 ACE_OS::fwrite (frame->rd_ptr (),
122 frame->length (),
124 output_file);
126 if (result == (signed) frame->length ())
127 ACE_ERROR_RETURN ((LM_ERROR,
128 "Receiver_Callback::fwrite failed\n"),
129 -1);
131 frame = frame->cont ();
134 return 0;
137 Receiver::Receiver ()
138 : mmdevice_ (0),
139 output_file_name_ ("output"),
140 sender_name_ ("distributer"),
141 receiver_name_ ("receiver")
145 Receiver::~Receiver ()
149 ACE_CString
150 Receiver::sender_name ()
152 return this->sender_name_;
155 ACE_CString
156 Receiver::receiver_name ()
158 return this->receiver_name_;
162 Receiver::init (int,
163 ACE_TCHAR *[])
165 /// Initialize the endpoint strategy with the orb and poa.
166 int result =
167 this->reactive_strategy_.init (TAO_AV_CORE::instance ()->orb (),
168 TAO_AV_CORE::instance ()->poa ());
169 if (result != 0)
170 return result;
172 /// Initialize the connection manager.
173 result =
174 this->connection_manager_.init (TAO_AV_CORE::instance ()->orb ());
175 if (result != 0)
176 return result;
178 connection_manager = &this->connection_manager_;
180 ACE_Reactor *reactor =
181 TAO_AV_CORE::instance ()->reactor ();
183 if (reactor->register_handler (SIGINT,
184 &this->signal_handler_) == -1)
185 ACE_ERROR_RETURN ((LM_ERROR,
186 "Error in handler register\n"),
187 -1);
188 /// Register the signal handler for clean termination of the process.
190 /// Register the receiver mmdevice object with the ORB
191 ACE_NEW_RETURN (this->mmdevice_,
192 TAO_MMDevice (&this->reactive_strategy_),
193 -1);
195 /// Servant Reference Counting to manage lifetime
196 PortableServer::ServantBase_var safe_mmdevice =
197 this->mmdevice_;
199 AVStreams::MMDevice_var mmdevice =
200 this->mmdevice_->_this ();
202 /// Bind to sender.
203 this->connection_manager_.bind_to_sender (this->sender_name_,
204 this->receiver_name_,
205 mmdevice.in ());
207 /// Connect to the sender.
208 this->connection_manager_.connect_to_sender ();
210 return 0;
214 Receiver::parse_args (int argc,
215 ACE_TCHAR *argv[])
217 /// Parse the command line arguments
218 ACE_Get_Opt opts (argc,
219 argv,
220 "f:s:r:");
222 int c;
223 while ((c = opts ()) != -1)
225 switch (c)
227 case 'f':
228 this->output_file_name_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
229 break;
230 case 's':
231 this->sender_name_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
232 break;
233 case 'r':
234 this->receiver_name_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
235 break;
236 default:
237 ACE_ERROR_RETURN ((LM_ERROR,
238 "Usage: receiver -f filename"),
239 -1);
243 return 0;
246 ACE_CString
247 Receiver::output_file_name ()
249 return this->output_file_name_;
252 void
253 Receiver::shut_down ()
257 AVStreams::MMDevice_var mmdevice_obj =
258 this->mmdevice_->_this ();
261 this->connection_manager_.unbind_receiver (this->sender_name_,
262 this->receiver_name_,
263 mmdevice_obj.in ());
265 catch (const CORBA::Exception& ex)
267 ex._tao_print_exception ("Receiver::shut_down");
273 ACE_TMAIN (int argc,
274 ACE_TCHAR *argv[])
278 /// Initialize the ORB first.
279 CORBA::ORB_var orb =
280 CORBA::ORB_init (argc, argv);
282 CORBA::Object_var obj
283 = orb->resolve_initial_references ("RootPOA");
285 /// Get the POA_var object from Object_var.
286 PortableServer::POA_var root_poa =
287 PortableServer::POA::_narrow (obj.in ());
289 PortableServer::POAManager_var mgr
290 = root_poa->the_POAManager ();
292 mgr->activate ();
294 /// Initialize the AVStreams components.
295 TAO_AV_CORE::instance ()->init (orb.in (),
296 root_poa.in ());
298 Receiver receiver;
299 int result =
300 receiver.parse_args (argc,
301 argv);
302 if (result == -1)
303 return -1;
305 /// Make sure we have a valid <output_file>
306 output_file =
307 ACE_OS::fopen (receiver.output_file_name ().c_str (),
308 "w");
309 if (output_file == 0)
310 ACE_ERROR_RETURN ((LM_DEBUG,
311 "Cannot open output file %C\n",
312 receiver.output_file_name ().c_str ()),
313 -1);
315 else
316 ACE_DEBUG ((LM_DEBUG,
317 "File Opened Successfully\n"));
319 result =
320 receiver.init (argc,
321 argv);
323 if (result != 0)
324 return result;
326 while (!done)
328 orb->perform_work ();
331 receiver.shut_down ();
333 ACE_OS::fclose (output_file);
335 catch (const CORBA::Exception& ex)
337 ex._tao_print_exception ("receiver::init");
338 return -1;
341 return 0;