2 #include "orbsvcs/AV/Protocol_Factory.h"
3 #include "tao/PortableServer/PortableServer.h"
4 #include "tao/Strategies/advanced_resource.h"
7 #include "ace/Get_Opt.h"
8 #include "ace/High_Res_Timer.h"
10 #include "ace/Throughput_Stats.h"
12 const ACE_TCHAR
*ior_output_file
= ACE_TEXT ("pong.ior");
13 const char *protocol
= "RTP/UDP";
14 int milliseconds
= 100;
15 size_t message_size
= 64;
17 AVStreams::protocolSpec pong_protocols
;
18 AVStreams::protocolSpec ping_protocols
;
20 ACE_hrtime_t recv_throughput_base
= 0;
21 ACE_Throughput_Stats recv_latency
;
23 ACE_hrtime_t send_throughput_base
= 0;
24 ACE_Throughput_Stats send_latency
;
28 parse_args (int argc
, ACE_TCHAR
*argv
[])
30 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("xo:s:r:t:b:d"));
33 while ((c
= get_opts ()) != -1)
37 ior_output_file
= get_opts
.opt_arg ();
42 CORBA::ULong l
= ping_protocols
.length ();
43 ping_protocols
.length (l
+ 1);
44 ping_protocols
[l
] = CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (get_opts
.opt_arg ()));
50 CORBA::ULong l
= pong_protocols
.length ();
51 pong_protocols
.length (l
+ 1);
52 pong_protocols
[l
] = CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (get_opts
.opt_arg ()));
57 milliseconds
= ACE_OS::atoi (get_opts
.opt_arg ());
61 message_size
= ACE_OS::atoi (get_opts
.opt_arg ());
62 if (message_size
< sizeof(ACE_hrtime_t
))
64 ACE_DEBUG ((LM_DEBUG
, "Invalid message size\n"));
78 ACE_ERROR_RETURN ((LM_ERROR
,
90 // If no protocols are specified use the default...
91 if (pong_protocols
.length () == 0)
93 pong_protocols
.length (1);
94 pong_protocols
[0] = CORBA::string_dup ("UDP=localhost:23456");
97 if (ping_protocols
.length () == 0)
99 ping_protocols
.length (1);
100 ping_protocols
[0] = CORBA::string_dup ("UDP=localhost:12345");
103 // Indicates successful parsing of the command line
107 int ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
111 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
113 parse_args (argc
, argv
);
115 CORBA::Object_var obj
116 = orb
->resolve_initial_references ("RootPOA");
118 PortableServer::POA_var poa
119 = PortableServer::POA::_narrow (obj
.in ());
121 PortableServer::POAManager_var mgr
122 = poa
->the_POAManager ();
126 TAO_AV_CORE::instance ()->init (orb
.in (),
129 Reactive_Strategy
*reactive_strategy
;
130 ACE_NEW_RETURN (reactive_strategy
,
133 reactive_strategy
->init (orb
.in (), poa
.in ());
134 TAO_MMDevice
*mmdevice_impl
;
135 ACE_NEW_RETURN (mmdevice_impl
,
136 TAO_MMDevice (reactive_strategy
),
139 AVStreams::MMDevice_var mmdevice
=
140 mmdevice_impl
->_this ();
142 CORBA::String_var ior
=
143 orb
->object_to_string (mmdevice
.in ());
145 ACE_DEBUG ((LM_DEBUG
, "Activated as <%C>\n", ior
.in () ));
147 // If the ior_output_file exists, output the ior to it
148 if (ior_output_file
!= 0)
150 FILE *output_file
= ACE_OS::fopen (ior_output_file
, "w");
151 if (output_file
== 0)
152 ACE_ERROR_RETURN ((LM_ERROR
,
153 "Cannot open output file %s for writing IOR: %C",
157 ACE_OS::fprintf (output_file
, "%s", ior
.in ());
158 ACE_OS::fclose (output_file
);
161 Pong_Recv_FDev
* pong_fdev_impl
;
162 ACE_NEW_RETURN (pong_fdev_impl
,
163 Pong_Recv_FDev ("Pong"),
165 Ping_Send_FDev
* ping_fdev_impl
;
166 ACE_NEW_RETURN (ping_fdev_impl
,
167 Ping_Send_FDev ("Ping"),
170 AVStreams::FDev_var ping_fdev
=
171 ping_fdev_impl
->_this ();
172 AVStreams::FDev_var pong_fdev
=
173 pong_fdev_impl
->_this ();
175 mmdevice
->add_fdev (ping_fdev
.in ());
178 mmdevice
->add_fdev (pong_fdev
.in ());
183 ACE_DEBUG ((LM_DEBUG
, "event loop finished\n"));
185 ACE_DEBUG ((LM_DEBUG
, "Calibrating scale factory . . . "));
186 ACE_High_Res_Timer::global_scale_factor_type gsf
=
187 ACE_High_Res_Timer::global_scale_factor ();
188 ACE_DEBUG ((LM_DEBUG
, "done\n"));
190 recv_latency
.dump_results (ACE_TEXT("Receive"), gsf
);
192 send_latency
.dump_results (ACE_TEXT("Send"), gsf
);
194 catch (const CORBA::Exception
& ex
)
196 ex
._tao_print_exception ("Caught exception:");
203 // ****************************************************************
205 Pong_Recv::Pong_Recv ()
206 : TAO_FlowConsumer ("Pong",
213 Pong_Recv::get_callback (const char *,
214 TAO_AV_Callback
*&callback
)
216 // ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n"));
217 callback
= &this->callback_
;
222 Pong_Recv_Callback::handle_stop ()
224 // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop"));
225 TAO_AV_CORE::instance ()->orb ()->shutdown ();
230 Pong_Recv_Callback::receive_frame (ACE_Message_Block
*frame
,
234 // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n"));
236 ACE_hrtime_t now
= ACE_OS::gethrtime ();
237 for (const ACE_Message_Block
*i
= frame
;
243 if (frame
->length () < sizeof(buf
))
245 ACE_DEBUG ((LM_DEBUG
, "Unexpected message size\n"));
249 ACE_OS::memcpy (buf
, i
->rd_ptr (), sizeof(buf
));
251 if (recv_throughput_base
== 0)
253 recv_throughput_base
= now
;
255 recv_latency
.sample (now
- recv_throughput_base
,
262 Pong_Recv_Callback::handle_destroy ()
264 ACE_DEBUG ((LM_DEBUG
,"Pong_Recv_Callback::destroy\n"));
268 // ****************************************************************
270 Ping_Send::Ping_Send ()
271 : TAO_FlowProducer ("Ping",
278 Ping_Send::get_callback (const char *,
279 TAO_AV_Callback
*&callback
)
281 // ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n"));
282 callback
= &this->callback_
;
286 Ping_Send_Callback::Ping_Send_Callback ()
289 this->timeout_
= ACE_Time_Value (2);
291 this->frame_
.size (message_size
);
292 this->frame_
.wr_ptr (message_size
);
296 Ping_Send_Callback::get_timeout (ACE_Time_Value
*&tv
,
299 tv
= &this->timeout_
;
303 Ping_Send_Callback::handle_timeout (void *)
307 ACE_DEBUG ((LM_DEBUG
, "Ping timeout frame %d\n", this->count_
));
309 if (this->count_
> 10)
311 TAO_AV_CORE::instance ()->orb ()->shutdown ();
315 ACE_hrtime_t stamp
= ACE_OS::gethrtime ();
316 ACE_OS::memcpy (this->frame_
.rd_ptr (), &stamp
, sizeof(stamp
));
318 int result
= this->protocol_object_
->send_frame (&this->frame_
);
320 ACE_ERROR_RETURN ((LM_ERROR
,
321 "Ping_Send_Callback::handle_timeout - send_frame - %p\n",
325 if (send_throughput_base
== 0)
327 send_throughput_base
= stamp
;
329 ACE_hrtime_t now
= ACE_OS::gethrtime ();
330 send_latency
.sample (now
- send_throughput_base
,
337 Ping_Send_Callback::handle_end_stream ()