2 #include "ace/Get_Opt.h"
5 #include "tao/Strategies/advanced_resource.h"
7 static FILE *output_file
= 0;
8 /// File handle of the file into which received data is written.
11 /// Flag set when a signal is raised.
14 Signal_Handler::Signal_Handler ()
19 Signal_Handler::handle_signal (int signum
, siginfo_t
*, ucontext_t
*)
23 if (TAO_debug_level
> 0)
25 "In the signal handler\n"));
33 Connection_Manager
*connection_manager
;
36 Receiver_StreamEndPoint::get_callback (const char *flow_name
,
37 TAO_AV_Callback
*&callback
)
39 /// Return the receiver application callback to the AVStreams for
41 callback
= &this->callback_
;
43 ACE_CString fname
= flow_name
;
44 this->callback_
.flowname (fname
);
49 Receiver_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec
&flowspec
)
51 if (TAO_debug_level
> 0)
53 "In Handle Connection Requested\n"));
55 for (CORBA::ULong i
= 0;
56 i
< flowspec
.length ();
59 TAO_Forward_FlowSpec_Entry entry
;
60 entry
.parse (flowspec
[i
]);
63 "Handle Connection Requested flowname %C\n",
66 ACE_CString
flowname (entry
.flowname ());
68 /// Store the related streamctrl.
69 connection_manager
->add_streamctrl (flowname
.c_str (),
75 Receiver_Callback::Receiver_Callback ()
81 Receiver_Callback::flowname ()
83 return this->flowname_
;
87 Receiver_Callback::flowname (const ACE_CString
&flowname
)
89 this->flowname_
= flowname
;
93 Receiver_Callback::handle_destroy ()
95 /// Called when the sender requests the stream to be shutdown.
97 "Receiver_Callback::end_stream\n"));
99 /// Remove the related stream control reference.
100 connection_manager
->streamctrls ().unbind (this->flowname_
.c_str ());
105 Receiver_Callback::receive_frame (ACE_Message_Block
*frame
,
110 /// Upcall from the AVStreams when there is data to be received from
113 ACE_DEBUG ((LM_DEBUG
,
114 "Receiver_Callback::receive_frame for frame %d\n",
115 this->frame_count_
++));
119 /// Write the received data to the file.
121 ACE_OS::fwrite (frame
->rd_ptr (),
126 if (result
== (signed) frame
->length ())
127 ACE_ERROR_RETURN ((LM_ERROR
,
128 "Receiver_Callback::fwrite failed\n"),
131 frame
= frame
->cont ();
137 Receiver::Receiver ()
139 output_file_name_ ("output"),
140 sender_name_ ("distributer"),
141 receiver_name_ ("receiver")
145 Receiver::~Receiver ()
150 Receiver::sender_name ()
152 return this->sender_name_
;
156 Receiver::receiver_name ()
158 return this->receiver_name_
;
165 /// Initialize the endpoint strategy with the orb and poa.
167 this->reactive_strategy_
.init (TAO_AV_CORE::instance ()->orb (),
168 TAO_AV_CORE::instance ()->poa ());
172 /// Initialize the connection manager.
174 this->connection_manager_
.init (TAO_AV_CORE::instance ()->orb ());
178 connection_manager
= &this->connection_manager_
;
180 ACE_Reactor
*reactor
=
181 TAO_AV_CORE::instance ()->reactor ();
183 if (reactor
->register_handler (SIGINT
,
184 &this->signal_handler_
) == -1)
185 ACE_ERROR_RETURN ((LM_ERROR
,
186 "Error in handler register\n"),
188 /// Register the signal handler for clean termination of the process.
190 /// Register the receiver mmdevice object with the ORB
191 ACE_NEW_RETURN (this->mmdevice_
,
192 TAO_MMDevice (&this->reactive_strategy_
),
195 /// Servant Reference Counting to manage lifetime
196 PortableServer::ServantBase_var safe_mmdevice
=
199 AVStreams::MMDevice_var mmdevice
=
200 this->mmdevice_
->_this ();
203 this->connection_manager_
.bind_to_sender (this->sender_name_
,
204 this->receiver_name_
,
207 /// Connect to the sender.
208 this->connection_manager_
.connect_to_sender ();
214 Receiver::parse_args (int argc
,
217 /// Parse the command line arguments
218 ACE_Get_Opt
opts (argc
,
223 while ((c
= opts ()) != -1)
228 this->output_file_name_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
231 this->sender_name_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
234 this->receiver_name_
= ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ());
237 ACE_ERROR_RETURN ((LM_ERROR
,
238 "Usage: receiver -f filename"),
247 Receiver::output_file_name ()
249 return this->output_file_name_
;
253 Receiver::shut_down ()
257 AVStreams::MMDevice_var mmdevice_obj
=
258 this->mmdevice_
->_this ();
261 this->connection_manager_
.unbind_receiver (this->sender_name_
,
262 this->receiver_name_
,
265 catch (const CORBA::Exception
& ex
)
267 ex
._tao_print_exception ("Receiver::shut_down");
278 /// Initialize the ORB first.
280 CORBA::ORB_init (argc
, argv
);
282 CORBA::Object_var obj
283 = orb
->resolve_initial_references ("RootPOA");
285 /// Get the POA_var object from Object_var.
286 PortableServer::POA_var root_poa
=
287 PortableServer::POA::_narrow (obj
.in ());
289 PortableServer::POAManager_var mgr
290 = root_poa
->the_POAManager ();
294 /// Initialize the AVStreams components.
295 TAO_AV_CORE::instance ()->init (orb
.in (),
300 receiver
.parse_args (argc
,
305 /// Make sure we have a valid <output_file>
307 ACE_OS::fopen (receiver
.output_file_name ().c_str (),
309 if (output_file
== 0)
310 ACE_ERROR_RETURN ((LM_DEBUG
,
311 "Cannot open output file %C\n",
312 receiver
.output_file_name ().c_str ()),
316 ACE_DEBUG ((LM_DEBUG
,
317 "File Opened Successfully\n"));
328 orb
->perform_work ();
331 receiver
.shut_down ();
333 ACE_OS::fclose (output_file
);
335 catch (const CORBA::Exception
& ex
)
337 ex
._tao_print_exception ("receiver::init");