Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Bidirectional_Flows / receiver.cpp
blobc4a529fc542bcb3911b511fa828aeb6ee1d79fae
1 #include "receiver.h"
2 #include "ace/Get_Opt.h"
3 #include "ace/High_Res_Timer.h"
5 static FILE *output_file = 0;
6 // File handle of the file into which received data is written.
8 static const ACE_TCHAR *output_file_name = ACE_TEXT ("output");
9 // File name of the file into which received data is written.
11 typedef ACE_Unmanaged_Singleton<Receiver,ACE_Null_Mutex> RECEIVER;
12 //Create a singleton instance of the receiver.
14 int
15 Receiver_StreamEndPoint::get_callback (const char *,
16 TAO_AV_Callback *&callback)
18 // Return the receiver application callback to the AVStreams for
19 // future upcalls.
20 callback = &this->callback_;
21 return 0;
25 int
26 Receiver_StreamEndPoint::set_protocol_object (const char * flowname,
27 TAO_AV_Protocol_Object *object)
29 // Set the sender protocol object corresponding to the transport
30 // protocol selected.
31 if (ACE_OS::strcmp (flowname, "Data_Receiver1") == 0)
32 RECEIVER::instance ()->protocol_object (object);
33 return 0;
36 Receiver_Callback::Receiver_Callback ()
37 : frame_count_ (1),
38 mb_ (BUFSIZ)
42 int
43 Receiver_Callback::receive_frame (ACE_Message_Block *frame,
44 TAO_AV_frame_info *,
45 const ACE_Addr &)
48 // Upcall from the AVStreams when there is data to be received from
49 // the sender.
51 ACE_DEBUG ((LM_DEBUG,
52 "Receiver_Callback::receive_frame for frame %d\n",
53 this->frame_count_++));
55 while (frame != 0)
57 // Write the received data to the file.
58 size_t result =
59 ACE_OS::fwrite (frame->rd_ptr (),
60 frame->length (),
62 output_file);
64 if (result == frame->length ())
65 ACE_ERROR_RETURN ((LM_ERROR,
66 "Receiver_Callback::fwrite failed\n"),
67 -1);
69 frame = frame->cont ();
72 // Read from the file into a message block.
73 int n = ACE_OS::fread (this->mb_.wr_ptr (),
75 this->mb_.size (),
76 RECEIVER::instance ()->input_file ());
78 if (n < 0)
79 ACE_DEBUG ((LM_DEBUG,
80 "Receiver::receive_frame fread failed\n"));
82 if (n == 0)
84 // At end of file break the loop and end the sender.
85 ACE_DEBUG ((LM_DEBUG,"End of file\n"));
87 else
89 this->mb_.wr_ptr (n);
91 // Send frame.
92 int result =
93 RECEIVER::instance ()->protocol_object ()->send_frame (&this->mb_);
95 if (result < 0)
96 ACE_DEBUG ((LM_DEBUG,
97 "Send Frame Failed\n"));
99 // Reset the message block.
100 this->mb_.reset ();
102 return 0;
106 Receiver_Callback::handle_destroy ()
108 // Called when the distributer requests the stream to be shutdown.
109 ACE_DEBUG ((LM_DEBUG,
110 "Receiver_Callback::handle_destroy\n"));
112 static int count = 0;
113 ++count;
115 if (count < 2)
119 TAO_AV_CORE::instance ()->orb ()->shutdown (0);
121 catch (const CORBA::Exception& ex)
123 ex._tao_print_exception (
124 "Receiver_Callback::handle_destroy Failed\n");
125 return -1;
130 return 0;
133 Receiver::Receiver ()
134 : mmdevice_ (0),
135 frame_rate_ (30),
136 input_file_ (0),
137 frame_count_ (0),
138 filename_ ("input"),
139 mb_ (BUFSIZ)
143 Receiver::~Receiver ()
147 void
148 Receiver::protocol_object (TAO_AV_Protocol_Object *object)
150 // Set the sender protocol object corresponding to the transport
151 // protocol selected.
152 this->protocol_object_ = object;
156 Receiver::parse_args (int argc,
157 ACE_TCHAR *argv[])
159 // Parse command line arguments
160 ACE_Get_Opt opts (argc, argv, ACE_TEXT("f:r:d"));
162 int c;
163 while ((c= opts ()) != -1)
165 switch (c)
167 case 'f':
168 this->filename_ = ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ());
169 break;
170 case 'r':
171 this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
172 break;
173 case 'd':
174 TAO_debug_level++;
175 break;
176 default:
177 ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
178 return -1;
181 return 0;
185 Receiver::init (int argc,
186 ACE_TCHAR *argv[])
188 // Initialize the endpoint strategy with the orb and poa.
189 int result =
190 this->reactive_strategy_.init (TAO_AV_CORE::instance ()->orb (),
191 TAO_AV_CORE::instance ()->poa ());
192 if (result != 0)
193 return result;
195 // Parse the command line arguments
196 result =
197 this->parse_args (argc,
198 argv);
199 if (result != 0)
200 return result;
202 // Open file to read.
203 this->input_file_ =
204 ACE_OS::fopen (this->filename_.c_str (), "r");
206 if (this->input_file_ == 0)
207 ACE_ERROR_RETURN ((LM_DEBUG,
208 "Cannot open input file %C\n",
209 this->filename_.c_str ()),
210 -1);
211 else
212 ACE_DEBUG ((LM_DEBUG,
213 "Input File opened successfully\n"));
215 // Register the receiver mmdevice object with the ORB
216 ACE_NEW_RETURN (this->mmdevice_,
217 TAO_MMDevice (&this->reactive_strategy_),
218 -1);
220 // Servant Reference Counting to manage lifetime
221 PortableServer::ServantBase_var safe_mmdevice =
222 this->mmdevice_;
224 CORBA::Object_var mmdevice =
225 this->mmdevice_->_this ();
227 // Register the mmdevice with the naming service.
228 CosNaming::Name name (1);
229 name.length (1);
230 name [0].id =
231 CORBA::string_dup ("Receiver");
233 // Initialize the naming services
234 if (this->naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
235 ACE_ERROR_RETURN ((LM_ERROR,
236 "Unable to initialize "
237 "the TAO_Naming_Client\n"),
238 -1);
240 // Register the receiver object with the naming server.
241 this->naming_client_->rebind (name,
242 mmdevice.in ());
244 return 0;
247 TAO_AV_Protocol_Object*
248 Receiver::protocol_object ()
250 return this->protocol_object_;
253 FILE *
254 Receiver::input_file ()
256 return this->input_file_;
260 ACE_TMAIN (int argc,
261 ACE_TCHAR *argv[])
265 // Initialize the ORB first.
266 CORBA::ORB_var orb =
267 CORBA::ORB_init (argc, argv);
269 CORBA::Object_var obj
270 = orb->resolve_initial_references ("RootPOA");
272 // Get the POA_var object from Object_var.
273 PortableServer::POA_var root_poa =
274 PortableServer::POA::_narrow (obj.in ());
276 PortableServer::POAManager_var mgr
277 = root_poa->the_POAManager ();
279 mgr->activate ();
281 // Initialize the AVStreams components.
282 TAO_AV_CORE::instance ()->init (orb.in (), root_poa.in ());
284 // Make sure we have a valid <output_file>
285 output_file = ACE_OS::fopen (output_file_name,
286 "w");
287 if (output_file == 0)
288 ACE_ERROR_RETURN ((LM_DEBUG,
289 "Cannot open output file %s\n",
290 output_file_name),
291 -1);
293 else
294 ACE_DEBUG ((LM_DEBUG,
295 "Output File Opened Successfully\n"));
297 int result =
298 RECEIVER::instance ()->init (argc,
299 argv);
301 // // Start sending data.
302 // result = SENDER::instance ()->pace_data ();
304 if (result != 0)
305 return result;
307 orb->run ();
309 // Hack for now....
310 ACE_OS::sleep (1);
312 //orb->destroy ();
314 catch (const CORBA::Exception& ex)
316 ex._tao_print_exception ("receiver::init");
317 return -1;
320 ACE_OS::fclose (output_file);
322 RECEIVER::close (); // Explicitly finalize the Unmanaged_Singleton.
324 return 0;
327 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
328 template ACE_Unmanaged_Singleton<Receiver, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Receiver, ACE_Null_Mutex>::singleton_;
329 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */