Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Multicast_Full_Profile / ftp.cpp
blob2a384f9129189b572fbdf63edb2628a6b8a44d71
1 #include "ftp.h"
2 #include "tao/debug.h"
4 FTP_Client_Callback::FTP_Client_Callback (void)
5 :count_ (0)
9 int
10 FTP_Client_Callback::handle_end_stream (void)
12 TAO_AV_CORE::instance ()->orb ()->shutdown ();
13 return 0;
16 void
17 FTP_Client_Callback::get_timeout (ACE_Time_Value *&tv,
18 void *&)
20 ACE_Time_Value *timeout;
21 ACE_NEW (timeout,
22 ACE_Time_Value(2));
23 tv = timeout;
26 int
27 FTP_Client_Callback::handle_timeout (void *)
29 ACE_Message_Block mb (BUFSIZ);
30 ACE_DEBUG ((LM_DEBUG,"FTP_Client_Callback::get_frame\n"));
31 char *buf = mb.rd_ptr ();
33 int n = ACE_OS::fread(buf,1,mb.size (),CLIENT::instance ()->file ());
34 if (n < 0)
36 ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread end of file\n"),-1);
38 if (n == 0)
40 if (feof (CLIENT::instance ()->file ()))
42 // wait for sometime for the data to be flushed to the other side.
43 this->count_++;
44 if (this->count_ == 2)
46 try
48 ACE_DEBUG ((LM_DEBUG,"handle_timeout:End of file\n"));
49 AVStreams::flowSpec stop_spec (1);
50 ACE_DEBUG ((LM_DEBUG, "Just before Orb Shutdown\n"));
51 TAO_AV_CORE::instance ()->orb ()->shutdown (0);
52 return 0;
54 catch (const CORBA::Exception& ex)
56 ex._tao_print_exception (
57 "FTP_Client_Callback::handle_timeout\n");
58 return -1;
61 else
62 return 0;
64 else
65 ACE_ERROR_RETURN ((LM_ERROR,"FTP_Client_Flow_Handler::fread error\n"),-1);
67 mb.wr_ptr (n);
68 int result = this->protocol_object_->send_frame (&mb);
69 if (result < 0)
70 ACE_ERROR_RETURN ((LM_ERROR,"send failed:%p","FTP_Client_Flow_Handler::send\n"),-1);
71 ACE_DEBUG ((LM_DEBUG,"handle_timeout::buffer sent successfully\n"));
72 return 0;
75 FTP_Client_Producer::FTP_Client_Producer (void)
76 :TAO_FlowProducer ("Data",CLIENT::instance ()->protocols (),CLIENT::instance ()->format ())
80 int
81 FTP_Client_Producer::set_protocol_object (const char *,
82 TAO_AV_Protocol_Object *object)
84 this->callback_->set_protocol_object (object);
85 return 0;
88 int
89 FTP_Client_Producer::get_callback (const char *,
90 TAO_AV_Callback *&callback)
92 ACE_NEW_RETURN (this->callback_,
93 FTP_Client_Callback,
94 -1);
95 callback = this->callback_;
96 return 0;
99 int
100 Client::parse_args (int argc, ACE_TCHAR *argv[])
102 ACE_Get_Opt opts (argc, argv, ACE_TEXT("f:a:p:sd"));
104 this->use_sfp_ = 0;
105 int c;
106 while ((c= opts ()) != -1)
108 switch (c)
110 case 'f':
111 this->filename_ = opts.opt_arg ();
112 break;
113 case 'a':
114 this->address_ = ACE_OS::strdup (ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ()));
115 break;
116 case 'p':
117 this->protocol_ = ACE_OS::strdup (ACE_TEXT_ALWAYS_CHAR (opts.opt_arg ()));
118 break;
119 case 's':
120 this->use_sfp_ = 1;
121 break;
122 case 'd':
123 TAO_debug_level++;
124 break;
125 default:
126 ACE_DEBUG ((LM_DEBUG,"Unknown option\n"));
127 return -1;
131 return 0;
134 FILE *
135 Client::file (void)
137 return this->fp_;
140 char*
141 Client::flowname (void)
143 return this->flowname_;
146 AVStreams::protocolSpec
147 Client::protocols (void)
149 AVStreams::protocolSpec protocols (1);
150 protocols.length (1);
151 char buf [BUFSIZ];
152 ACE_OS::sprintf (buf, "%s=%s", this->protocol_, this->address_);
153 protocols [0] = CORBA::string_dup (buf);
154 return protocols;
157 const char *
158 Client::format (void)
160 return "UNS:ftp";
163 const char *
164 Client::address (void)
167 return this->address_;
170 TAO_StreamCtrl*
171 Client::streamctrl (void)
173 return &this->streamctrl_;
176 Client::Client (void)
177 : client_mmdevice_ (&endpoint_strategy_),
178 fdev_ (0),
179 address_ (ACE_OS::strdup ("224.9.9.2:10002")),
180 fp_ (0),
181 protocol_ (ACE_OS::strdup ("UDP"))
183 endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (), TAO_AV_CORE::instance ()->poa ());
189 Client::bind_to_server (const char *name)
193 // Initialize the naming services
194 if (my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
195 ACE_ERROR_RETURN ((LM_ERROR,
196 " (%P|%t) Unable to initialize "
197 "the TAO_Naming_Client.\n"),
198 -1);
200 CosNaming::Name server_mmdevice_name (1);
201 server_mmdevice_name.length (1);
202 server_mmdevice_name [0].id = name;
203 CORBA::Object_var server_mmdevice_obj =
204 my_naming_client_->resolve (server_mmdevice_name);
206 this->server_mmdevice_ =
207 AVStreams::MMDevice::_narrow (server_mmdevice_obj.in ());
209 if (CORBA::is_nil (this->server_mmdevice_.in ()))
210 ACE_ERROR_RETURN ((LM_ERROR,
211 " could not resolve Server_Mmdevice in Naming service <%C>\n",
212 name),
213 -1);
215 catch (const CORBA::Exception& ex)
217 ex._tao_print_exception (
218 "Command_Handler::resolve_reference");
219 return -1;
221 return 0;
225 Client::init (int argc, ACE_TCHAR *argv[])
227 this->argc_ = argc;
228 this->argv_ = argv;
232 PortableServer::POAManager_var mgr
233 = TAO_AV_CORE::instance ()->poa ()->the_POAManager ();
235 mgr->activate ();
237 this->parse_args (this->argc_, this->argv_);
239 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Parsed Address %C\n", this->address_));
241 ACE_NEW_RETURN (this->fdev_,
242 FTP_Client_FDev,
243 -1);
245 ACE_NEW_RETURN (this->flowname_,
246 char [BUFSIZ],
248 ACE_OS::sprintf (this->flowname_,
249 "Data");
250 this->fdev_->flowname (this->flowname ());
251 AVStreams::MMDevice_var mmdevice = this->client_mmdevice_._this ();
252 AVStreams::FDev_var fdev = this->fdev_->_this ();
253 mmdevice->add_fdev (fdev.in ());
255 if (this->my_naming_client_.init (TAO_AV_CORE::instance ()->orb ()) != 0)
256 ACE_ERROR_RETURN ((LM_ERROR,
257 " (%P|%t) Unable to initialize "
258 "the TAO_Naming_Client.\n"),
259 -1);
261 this->fp_ = ACE_OS::fopen (this->filename_, "r");
262 if (this->fp_ != 0)
264 ACE_DEBUG ((LM_DEBUG,"file opened successfully\n"));
267 catch (const CORBA::Exception& ex)
269 ex._tao_print_exception ("Client::init");
270 return -1;
272 return 0;
276 Client::run (void)
280 char flow_protocol_str [BUFSIZ];
281 if (this->use_sfp_)
282 ACE_OS::strcpy (flow_protocol_str, "sfp:1.0");
283 else
284 ACE_OS::strcpy (flow_protocol_str, "");
285 AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
286 AVStreams::flowSpec flow_spec (1);
287 // Bind the client and server mmdevices.
288 ACE_DEBUG ((LM_DEBUG, "(%N,%l) Parsed Address %C\n", this->address_));
289 ACE_INET_Addr *addr = new ACE_INET_Addr(this->address_);
290 TAO_Forward_FlowSpec_Entry entry (this->flowname_,
291 "IN",
292 "USER_DEFINED",
293 flow_protocol_str,
294 this->protocol_,
295 addr);
296 ACE_DEBUG ((LM_DEBUG, "(%N,%l) flowspec: %C\n", entry.entry_to_string() ));
297 flow_spec.length (1);
298 flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
300 AVStreams::MMDevice_var client_mmdevice =
301 this->client_mmdevice_._this ();
303 CORBA::Boolean result =
304 this->streamctrl_.bind_devs (client_mmdevice.in (),
305 AVStreams::MMDevice::_nil (),
306 the_qos.inout (),
307 flow_spec);
309 if (result == 0)
310 ACE_ERROR_RETURN ((LM_ERROR,"streamctrl::bind_devs for client_mmdevice failed\n"),-1);
311 if (this->bind_to_server ("Server_MMDevice1") == -1)
312 ACE_ERROR_RETURN ((LM_ERROR,
313 "(%P|%t) Error binding to the naming service\n"),
314 -1);
315 result = this->streamctrl_.bind_devs (AVStreams::MMDevice::_nil (),
316 this->server_mmdevice_.in (),
317 the_qos.inout (),
318 flow_spec);
320 if (result == 0)
321 ACE_ERROR_RETURN ((LM_ERROR,"(%N,%l) streamctrl::bind_devs for mmdevice 1 failed\n"),-1);
322 if (this->bind_to_server ("Server_MMDevice2") == -1)
323 ACE_ERROR_RETURN ((LM_ERROR,
324 "(%P|%t) Error binding to the naming service\n"),
325 -1);
326 result = this->streamctrl_.bind_devs (AVStreams::MMDevice::_nil (),
327 this->server_mmdevice_.in (),
328 the_qos.inout (),
329 flow_spec);
331 if (result == 0)
332 ACE_ERROR_RETURN ((LM_ERROR,"(%N,%l) streamctrl::bind_devs for mmdevice 2 failed\n"),-1);
333 AVStreams::flowSpec start_spec (1);
334 start_spec.length (1);
335 start_spec [0] = CORBA::string_dup (this->flowname_);
336 this->streamctrl_.start (start_spec);
337 // Schedule a timer for the for the flow handler.
338 //TAO_AV_CORE::instance ()->run ();
340 ACE_Time_Value tv (10000,0);
341 TAO_AV_CORE::instance ()->orb ()->run (tv);
343 ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
345 ACE_DEBUG ((LM_DEBUG, "Exited the TAO_AV_Core::run\n"));
347 catch (const CORBA::Exception& ex)
349 ex._tao_print_exception ("Client::run");
350 return -1;
352 return 0;
356 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
360 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
362 CORBA::Object_var obj
363 = orb->resolve_initial_references ("RootPOA");
365 PortableServer::POA_var poa
366 = PortableServer::POA::_narrow (obj.in ());
368 TAO_AV_CORE::instance ()->init (orb.in (),
369 poa.in ());
371 catch (const CORBA::Exception& ex)
373 ex._tao_print_exception ("server::init");
374 return -1;
377 int result = 0;
378 result = CLIENT::instance ()->init (argc, argv);
379 if (result < 0)
380 ACE_ERROR_RETURN ((LM_ERROR,"client::init failed\n"),1);
381 result = CLIENT::instance ()->run ();
382 if (result < 0)
383 ACE_ERROR_RETURN ((LM_ERROR,"client::run failed\n"),1);
385 CLIENT::close (); // Explicitly finalize the Unmanaged_Singleton.
387 return result;
390 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
391 template ACE_Unmanaged_Singleton<Client, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Client, ACE_Null_Mutex>::singleton_;
392 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */