Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Multicast / ftp.cpp
blobb367afdda76565a361050d8c6c8a30edda971e51
1 #include "ftp.h"
3 FTP_Client_Callback::FTP_Client_Callback ()
4 : count_ (0)
8 int
9 FTP_Client_Callback::handle_end_stream ()
11 TAO_AV_CORE::instance ()->orb ()->shutdown ();
12 return 0;
15 FTP_Client_StreamEndPoint::FTP_Client_StreamEndPoint ()
16 : callback_ (0)
20 void
21 FTP_Client_Callback::get_timeout (ACE_Time_Value *&tv,
22 void *&)
24 ACE_Time_Value *timeout;
25 ACE_NEW (timeout,
26 ACE_Time_Value(2));
27 tv = timeout;
30 int
31 FTP_Client_Callback::handle_timeout (void *)
33 ACE_Message_Block mb (BUFSIZ);
34 ACE_DEBUG ((LM_DEBUG,"FTP_Client_Callback::get_frame\n"));
35 char *buf = mb.rd_ptr ();
36 //cerr << "message block size" << mb.size () << endl;
37 int n = ACE_OS::fread(buf,1,mb.size (),CLIENT::instance ()->file ());
38 if (n < 0)
40 ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread end of file\n"),-1);
42 if (n == 0)
44 if (feof (CLIENT::instance ()->file ()))
46 // wait for sometime for the data to be flushed to the other side.
47 this->count_++;
48 if (this->count_ == 2)
50 try
52 ACE_DEBUG ((LM_DEBUG,"handle_timeout:End of file\n"));
53 AVStreams::flowSpec stop_spec (1);
54 //ACE_DECLARE_NEW_CORBA_ENV;
55 CLIENT::instance ()->streamctrl ()->stop (stop_spec);
56 // CLIENT::instance ()->streamctrl ()->destroy (stop_spec);
57 TAO_AV_CORE::instance ()->orb ()->shutdown (0);
58 return 0;
60 catch (const CORBA::Exception& ex)
62 ex._tao_print_exception (
63 "FTP_Client_Callback::handle_timeout\n");
64 return -1;
67 else
68 return 0;
70 else
71 ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread error\n"),-1);
73 //cerr << "read bytes = " << n << endl;
74 mb.wr_ptr (n);
75 int result = this->protocol_object_->send_frame (&mb);
76 if (result < 0)
77 ACE_ERROR_RETURN ((LM_ERROR,"send failed:%p","FTP_Client_Flow_Handler::send\n"),-1);
78 ACE_DEBUG ((LM_DEBUG,"handle_timeout::buffer sent successfully\n"));
79 return 0;
82 int
83 FTP_Client_StreamEndPoint::get_callback (const char *,
84 TAO_AV_Callback *&callback)
86 ACE_NEW_RETURN (this->callback_,
87 FTP_Client_Callback,
88 -1);
89 callback = this->callback_;
90 return 0;
93 int
94 FTP_Client_StreamEndPoint::set_protocol_object (const char *flowname,
95 TAO_AV_Protocol_Object *object)
97 this->callback_->set_protocol_object (object);
98 ACE_CString flow_string (flowname);
99 return 0;
103 Endpoint_Reactive_Strategy::Endpoint_Reactive_Strategy (CORBA::ORB_ptr orb,
104 PortableServer::POA_ptr poa,
105 Client *client)
106 : client_ (client)
108 this->init (orb, poa);
112 Endpoint_Reactive_Strategy::make_stream_endpoint (FTP_Client_StreamEndPoint *&endpoint)
114 ACE_DEBUG ((LM_DEBUG,"Endpoint_Reactive_Strategy::make_stream_endpoint\n"));
115 ACE_NEW_RETURN (endpoint,
116 FTP_Client_StreamEndPoint,
117 -1);
118 return 0;
122 Client::parse_args (int argc,
123 ACE_TCHAR *argv[])
125 ACE_Get_Opt opts (argc, argv, ACE_TEXT("f:a:p:s"));
127 this->use_sfp_ = 0;
128 int c;
129 while ((c= opts ()) != -1)
131 switch (c)
133 case 'f':
134 this->filename_ = ACE_OS::strdup (ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ()));
135 break;
136 case 'a':
137 this->address_ = ACE_OS::strdup (ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ()));
138 break;
139 case 'p':
140 this->protocol_ = ACE_OS::strdup (ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ()));
141 break;
142 case 's':
143 this->use_sfp_ = 1;
144 break;
145 default:
146 ACE_DEBUG ((LM_DEBUG, "Unknown option\n"));
147 return -1;
150 return 0;
153 FILE *
154 Client::file ()
156 return this->fp_;
159 char*
160 Client::flowname ()
162 return this->flowname_;
165 TAO_StreamCtrl*
166 Client::streamctrl ()
168 return &this->streamctrl_;
171 Client::Client ()
172 :endpoint_strategy_ (TAO_AV_CORE::instance ()->orb (), TAO_AV_CORE::instance ()->poa (),this),
173 client_mmdevice_ (&endpoint_strategy_),
174 address_ (ACE_OS::strdup ("224.9.9.2:12345")),
175 fp_ (0),
176 protocol_ (ACE_OS::strdup ("UDP"))
182 Client::bind_to_server (const char *name)
186 // Initialize the naming services
187 CosNaming::Name server_mmdevice_name (1);
188 server_mmdevice_name.length (1);
189 server_mmdevice_name [0].id = name;
190 CORBA::Object_var server_mmdevice_obj =
191 my_naming_client_->resolve (server_mmdevice_name);
193 this->server_mmdevice_ =
194 AVStreams::MMDevice::_narrow (server_mmdevice_obj.in ());
196 if (CORBA::is_nil (this->server_mmdevice_.in ()))
197 ACE_ERROR_RETURN ((LM_ERROR,
198 " could not resolve Server_Mmdevice in Naming service <%C>\n",
199 name),
200 -1);
202 catch (const CORBA::Exception& ex)
204 ex._tao_print_exception ("Client::bind_to_server\n");
205 return -1;
207 return 0;
211 Client::init (int argc, ACE_TCHAR *argv[])
213 PortableServer::POAManager_var mgr
214 = TAO_AV_CORE::instance ()->poa ()->the_POAManager ();
216 mgr->activate ();
218 this->argc_ = argc;
219 this->argv_ = argv;
221 // Increase the debug_level so that we can see the output
222 this->parse_args (this->argc_, this->argv_);
224 if (this->my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
225 ACE_ERROR_RETURN ((LM_ERROR,
226 " (%P|%t) Unable to initialize "
227 "the TAO_Naming_Client.\n"),
228 -1);
230 this->fp_ = ACE_OS::fopen (this->filename_,"r");
231 if (this->fp_ != 0)
233 ACE_DEBUG ((LM_DEBUG,"file opened successfully\n"));
235 else
237 ACE_ERROR_RETURN ((LM_ERROR, "ERROR: file %C could not be opened\n",
238 this->filename_), -1);
241 return 0;
245 Client::run ()
249 char flow_protocol_str [BUFSIZ];
250 if (this->use_sfp_)
251 ACE_OS::strcpy (flow_protocol_str,"sfp:1.0");
252 else
253 ACE_OS::strcpy (flow_protocol_str,"");
254 AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
255 AVStreams::flowSpec flow_spec (1);
257 ACE_INET_Addr addr (this->address_);
258 ACE_NEW_RETURN (this->flowname_,
259 char [BUFSIZ],
261 ACE_OS::sprintf (this->flowname_,
262 "Data_%s",
263 this->protocol_);
264 TAO_Forward_FlowSpec_Entry entry (this->flowname_,
265 "IN",
266 "USER_DEFINED",
267 flow_protocol_str,
268 this->protocol_,
269 &addr);
270 flow_spec.length (1);
271 flow_spec [0] = entry.entry_to_string ();
272 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Flowspec: %C\n", entry.entry_to_string() ));
274 AVStreams::MMDevice_var client_mmdevice
275 = this->client_mmdevice_._this ();
277 CORBA::Boolean result =
278 this->streamctrl_.bind_devs (client_mmdevice.in (),
279 AVStreams::MMDevice::_nil (),
280 the_qos.inout (),
281 flow_spec);
282 if (this->bind_to_server ("Server_MMDevice1") == -1)
283 ACE_ERROR_RETURN ((LM_ERROR,
284 "(%P|%t) Error binding to the naming service\n"),
285 -1);
286 result = this->streamctrl_.bind_devs (AVStreams::MMDevice::_nil (),
287 this->server_mmdevice_.in (),
288 the_qos.inout (),
289 flow_spec);
290 if (this->bind_to_server ("Server_MMDevice2") == -1)
291 ACE_ERROR_RETURN ((LM_ERROR,
292 "(%P|%t) Error binding to the naming service\n"),
293 -1);
294 result = this->streamctrl_.bind_devs (AVStreams::MMDevice::_nil (),
295 this->server_mmdevice_.in (),
296 the_qos.inout (),
297 flow_spec);
299 if (result == 0)
300 ACE_ERROR_RETURN ((LM_ERROR,"streamctrl::bind_devs failed\n"),-1);
301 AVStreams::flowSpec start_spec (1);
302 start_spec.length (1);
303 start_spec [0] = CORBA::string_dup (this->flowname_);
304 this->streamctrl_.start (start_spec);
305 // Schedule a timer for the for the flow handler.
306 //TAO_AV_CORE::instance ()->run ();
307 ACE_Time_Value tv (10000,0);
309 TAO_AV_CORE::instance ()->orb ()->run (tv);
311 ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
313 ACE_DEBUG ((LM_DEBUG, "Exited the TAO_AV_Core::run\n"));
315 catch (const CORBA::Exception& ex)
317 ex._tao_print_exception ("Client::run");
318 return -1;
320 return 0;
324 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
328 CORBA::ORB_var orb = CORBA::ORB_init (argc,
329 argv);
330 CORBA::Object_var obj
331 = orb->resolve_initial_references ("RootPOA");
333 PortableServer::POA_var poa
334 = PortableServer::POA::_narrow (obj.in ());
336 TAO_AV_CORE::instance ()->init (orb.in (),
337 poa.in ());
339 int result = 0;
340 result = CLIENT::instance ()->init (argc, argv);
341 if (result < 0)
342 ACE_ERROR_RETURN ((LM_ERROR,"client::init failed\n"),1);
343 result = CLIENT::instance ()->run ();
344 if (result < 0)
345 ACE_ERROR_RETURN ((LM_ERROR,"client::run failed\n"),1);
347 catch (const CORBA::Exception& ex)
350 ex._tao_print_exception ("Client Failed\n");
351 return -1;
354 CLIENT::close (); // Explicitly finalize the Unmanaged_Singleton.
356 return 0;
359 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
360 template ACE_Unmanaged_Singleton<Client, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Client, ACE_Null_Mutex>::singleton_;
361 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */