4 FTP_Client_Callback::FTP_Client_Callback ()
10 FTP_Client_Callback::handle_end_stream ()
12 TAO_AV_CORE::instance ()->stop_run ();
17 FTP_Client_Callback::get_timeout (ACE_Time_Value
*&tv
,
20 ACE_Time_Value
*timeout
;
27 FTP_Client_Callback::handle_timeout (void *)
31 ACE_Message_Block
mb (BUFSIZ
);
32 ACE_DEBUG ((LM_DEBUG
,"FTP_Client_Callback::handle_timeout\n"));
33 char *buf
= mb
.rd_ptr ();
34 int n
= ACE_OS::fread(buf
,1,mb
.size (),CLIENT::instance ()->file ());
37 ACE_ERROR_RETURN ((LM_ERROR
,"FTP_Client_Flow_Handler::fread end of file\n"),-1);
42 if (feof (CLIENT::instance ()->file ()))
44 // wait for sometime for the data to be flushed to the other side.
46 if (this->count_
== 2)
48 ACE_DEBUG ((LM_DEBUG
,"handle_timeout:End of file\n"));
49 AVStreams::flowSpec
stop_spec (1);
50 CLIENT::instance ()->streamctrl ()->stop (stop_spec
);
51 //CLIENT::instance ()->streamctrl ()->destroy (stop_spec);
52 TAO_AV_CORE::instance ()->orb ()->shutdown (0);
59 ACE_ERROR_RETURN ((LM_ERROR
,"FTP_Client_Flow_Handler::fread error\n"),-1);
62 int result
= this->protocol_object_
->send_frame (&mb
);
64 ACE_ERROR_RETURN ((LM_ERROR
,"send failed:%p","FTP_Client_Flow_Handler::send\n"),-1);
65 ACE_DEBUG ((LM_DEBUG
,"handle_timeout::buffer sent successfully\n"));
67 catch (const CORBA::Exception
& ex
)
69 ex
._tao_print_exception (
70 "FTP_Client_Callback::handle_timeout Failed");
76 FTP_Client_Producer::FTP_Client_Producer ()
77 :TAO_FlowProducer ("Data",CLIENT::instance ()->protocols (),CLIENT::instance ()->format ())
82 FTP_Client_Producer::set_protocol_object (const char *,
83 TAO_AV_Protocol_Object
*object
)
85 this->callback_
->set_protocol_object (object
);
90 FTP_Client_Producer::get_callback (const char *,
91 TAO_AV_Callback
*&callback
)
93 ACE_NEW_RETURN (this->callback_
,
96 callback
= this->callback_
;
101 Client::parse_args (int argc
,
104 ACE_Get_Opt
opts (argc
, argv
, ACE_TEXT("f:l:a:p:s"));
110 while ((c
= opts ()) != -1)
115 this->filename_
= ACE_OS::strdup (ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ()));
118 this->address_
= ACE_OS::strdup (ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ()));
122 this->peer_addr_
= ACE_OS::strdup (ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ()));
126 this->protocol_
= ACE_OS::strdup (ACE_TEXT_ALWAYS_CHAR (opts
.opt_arg ()));
132 ACE_DEBUG ((LM_DEBUG
,"Unknown option\n"));
139 ACE_OS::hostname (buf
,
148 ACE_OS::hostname (buf
,
151 peer_addr_
+= ":5050";
166 return this->flowname_
.c_str();
169 AVStreams::protocolSpec
172 AVStreams::protocolSpec
protocols (1);
173 protocols
.length (1);
175 ACE_OS::sprintf (buf
,"%s=%s",this->protocol_
,this->address_
.c_str ());
176 protocols
[0] = CORBA::string_dup (buf
);
190 return this->address_
.c_str ();
194 Client::streamctrl ()
196 return &this->streamctrl_
;
201 protocol_ (ACE_OS::strdup ("UDP")),
202 orb_ (TAO_AV_CORE::instance ()->orb ()),
203 poa_ (TAO_AV_CORE::instance ()->poa ())
205 endpoint_strategy_
.init (TAO_AV_CORE::instance ()->orb (),
206 TAO_AV_CORE::instance ()->poa ());
211 Client::init (int argc
, ACE_TCHAR
*argv
[])
216 CORBA::String_var ior
;
219 PortableServer::POAManager_var mgr
220 = TAO_AV_CORE::instance ()->poa ()->the_POAManager ();
224 this->parse_args (this->argc_
, this->argv_
);
226 ACE_NEW_RETURN (this->streamendpoint_a_
,
227 TAO_StreamEndPoint_A
, -1);
229 ACE_NEW_RETURN (this->fep_a_
, FTP_Client_Producer
, -1);
230 this->flowname_
= "Data";
232 sep_a_
= this->streamendpoint_a_
->_this();
234 fep_a_obj_
= this->fep_a_
->_this ();
236 CORBA::String_var s1
= sep_a_
->add_fep( fep_a_obj_
.in());
238 ACE_DEBUG ((LM_DEBUG
, "(%N,%l) Added flowendpoint named: %C\n", s1
.in() ));
240 this->fp_
= ACE_OS::fopen (this->filename_
,"r");
244 ACE_DEBUG ((LM_DEBUG
,"file opened successfully\n"));
248 ACE_ERROR_RETURN ((LM_ERROR
, "ERROR: file %C could not be opened\n",
249 this->filename_
), -1);
252 catch (const CORBA::Exception
& ex
)
254 ex
._tao_print_exception ("Client::init\n");
265 char flow_protocol_str
[BUFSIZ
];
267 ACE_OS::strcpy (flow_protocol_str
,"sfp:1.0");
269 ACE_OS::strcpy (flow_protocol_str
,"");
270 AVStreams::streamQoS_var
the_qos (new AVStreams::streamQoS
);
271 AVStreams::flowSpec
flow_spec (1);
272 flow_spec
.length (1);
274 ACE_INET_Addr
addr (this->address_
.c_str ());
276 TAO_Forward_FlowSpec_Entry
entry (this->flowname_
.c_str(),
283 ACE_INET_Addr
peer_addr (this->peer_addr_
.c_str ());
284 entry
.set_peer_addr (&peer_addr
);
286 flow_spec
[0] = CORBA::string_dup (entry
.entry_to_string ());
288 ACE_High_Res_Timer timer
;
289 ACE_Time_Value elapsed
;
293 CORBA::Object_var obj_b
= this->orb_
->string_to_object("corbaname:rir:#Server_StreamEndPoint_b");
294 AVStreams::StreamEndPoint_B_var sep_b
= AVStreams::StreamEndPoint_B::_narrow (obj_b
.in());
296 CORBA::Boolean result
=
297 this->streamctrl_
.bind (sep_a_
.in(),
303 timer
.elapsed_time (elapsed
);
306 ACE_ERROR_RETURN ((LM_ERROR
,"streamctrl::bind failed\n"),-1);
308 AVStreams::flowSpec
start_spec (1);
309 this->streamctrl_
.start (start_spec
);
311 // Schedule a timer for the for the flow handler.
312 ACE_Time_Value
tv (10000,0);
313 this->orb_
->run (tv
);
315 ACE_DEBUG ((LM_DEBUG
, "event loop finished\n"));
317 catch (const CORBA::Exception
& ex
)
319 ex
._tao_print_exception ("Client::run\n");
331 CORBA::ORB_var orb
= CORBA::ORB_init (argc
,
333 CORBA::Object_var obj
334 = orb
->resolve_initial_references ("RootPOA");
336 PortableServer::POA_var poa
337 = PortableServer::POA::_narrow (obj
.in ());
339 TAO_AV_CORE::instance ()->init (orb
.in (),
344 result
= CLIENT::instance ()->init (argc
, argv
);
346 ACE_ERROR_RETURN ((LM_ERROR
,"client::init failed\n"),1);
347 result
= CLIENT::instance ()->run ();
349 poa
->destroy (true, true);
354 ACE_ERROR_RETURN ((LM_ERROR
,"client::run failed\n"),1);
356 catch (const CORBA::Exception
& ex
)
359 ex
._tao_print_exception ("Client Failed\n");
363 CLIENT::close (); // Explicitly finalize the Unmanaged_Singleton.
368 #if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
369 template ACE_Unmanaged_Singleton
<Client
, ACE_Null_Mutex
> *ACE_Unmanaged_Singleton
<Client
, ACE_Null_Mutex
>::singleton_
;
370 #endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */