2 #include "orbsvcs/AV/Protocol_Factory.h"
4 #include "tao/Strategies/advanced_resource.h"
5 #include "ace/Get_Opt.h"
6 #include "ace/High_Res_Timer.h"
8 #include "ace/Throughput_Stats.h"
10 const ACE_TCHAR
*ior_output_file
= ACE_TEXT ("ping.ior");
11 const char *protocol
= "RTP/UDP";
12 int milliseconds
= 100;
14 AVStreams::protocolSpec ping_protocols
;
15 AVStreams::protocolSpec pong_protocols
;
17 Pong_Send_Callback pong_callback
;
19 ACE_hrtime_t recv_base
= 0;
20 ACE_Throughput_Stats recv_latency
;
23 parse_args (int argc
, ACE_TCHAR
*argv
[])
25 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("xo:s:r:t:"));
28 while ((c
= get_opts ()) != -1)
32 ior_output_file
= get_opts
.opt_arg ();
37 CORBA::ULong l
= ping_protocols
.length ();
38 ping_protocols
.length (l
+ 1);
39 ping_protocols
[l
] = CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (get_opts
.opt_arg ()));
45 CORBA::ULong l
= pong_protocols
.length ();
46 pong_protocols
.length (l
+ 1);
47 pong_protocols
[l
] = CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (get_opts
.opt_arg ()));
52 milliseconds
= ACE_OS::atoi (get_opts
.opt_arg ());
61 ACE_ERROR_RETURN ((LM_ERROR
,
73 // If no protocols are specified use the default...
74 if (ping_protocols
.length () == 0)
76 ping_protocols
.length (1);
77 ping_protocols
[0] = CORBA::string_dup ("UDP=localhost:12345");
80 if (pong_protocols
.length () == 0)
82 pong_protocols
.length (1);
83 pong_protocols
[0] = CORBA::string_dup ("UDP=localhost:23456");
86 // Indicates successful parsing of the command line
90 int ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
94 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
96 parse_args (argc
, argv
);
99 = orb
->resolve_initial_references ("RootPOA");
101 PortableServer::POA_var poa
102 = PortableServer::POA::_narrow (obj
.in ());
104 PortableServer::POAManager_var mgr
105 = poa
->the_POAManager ();
109 TAO_AV_CORE::instance ()->init (orb
.in (),
112 // Register the video mmdevice object with the ORB
113 Reactive_Strategy
*reactive_strategy
;
114 ACE_NEW_RETURN (reactive_strategy
,
117 reactive_strategy
->init (orb
.in (), poa
.in ());
118 TAO_MMDevice
*mmdevice_impl
;
119 ACE_NEW_RETURN (mmdevice_impl
,
120 TAO_MMDevice (reactive_strategy
),
123 AVStreams::MMDevice_var mmdevice
=
124 mmdevice_impl
->_this ();
126 CORBA::String_var ior
=
127 orb
->object_to_string (mmdevice
.in ());
129 ACE_DEBUG ((LM_DEBUG
, "Activated as <%C>\n", ior
.in ()));
131 // If the ior_output_file exists, output the ior to it
132 if (ior_output_file
!= 0)
134 FILE *output_file
= ACE_OS::fopen (ior_output_file
, "w");
135 if (output_file
== 0)
136 ACE_ERROR_RETURN ((LM_ERROR
,
137 "Cannot open output file %s for writing IOR: %C",
141 ACE_OS::fprintf (output_file
, "%s", ior
.in ());
142 ACE_OS::fclose (output_file
);
145 Ping_Recv_FDev
* ping_fdev_impl
;
146 ACE_NEW_RETURN (ping_fdev_impl
,
147 Ping_Recv_FDev ("Ping"),
149 Pong_Send_FDev
* pong_fdev_impl
;
150 ACE_NEW_RETURN (pong_fdev_impl
,
151 Pong_Send_FDev ("Pong"),
154 AVStreams::FDev_var ping_fdev
=
155 ping_fdev_impl
->_this ();
156 AVStreams::FDev_var pong_fdev
=
157 pong_fdev_impl
->_this ();
159 mmdevice
->add_fdev (ping_fdev
.in ());
163 mmdevice
->add_fdev (pong_fdev
.in ());
169 ACE_DEBUG ((LM_DEBUG
, "Calibrating scale factory . . . "));
170 ACE_High_Res_Timer::global_scale_factor_type gsf
=
171 ACE_High_Res_Timer::global_scale_factor ();
172 ACE_DEBUG ((LM_DEBUG
, "done %d\n", gsf
));
174 recv_latency
.dump_results (ACE_TEXT("Receive"), gsf
);
176 catch (const CORBA::Exception
& ex
)
178 ex
._tao_print_exception ("Caught exception:");
185 // ****************************************************************
187 Ping_Recv::Ping_Recv ()
188 : TAO_FlowConsumer ("Ping",
195 Ping_Recv::get_callback (const char *,
196 TAO_AV_Callback
*&callback
)
198 ACE_DEBUG ((LM_DEBUG
,"Ping_Recv::get_callback\n"));
199 callback
= &this->callback_
;
203 Ping_Recv_Callback::Ping_Recv_Callback ()
209 Ping_Recv_Callback::handle_stop ()
211 ACE_DEBUG ((LM_DEBUG
,"Ping_Recv_Callback::stop"));
212 TAO_AV_CORE::instance ()->orb ()->shutdown ();
218 Ping_Recv_Callback::receive_frame (ACE_Message_Block
*frame
,
224 ACE_DEBUG ((LM_DEBUG
,"Ping_Recv_Callback::receive_frame %d\n", this->count_
));
226 if (this->count_
< 10)
228 for (const ACE_Message_Block
*i
= frame
;
234 if (i
->length () < sizeof(stamp
))
237 ACE_OS::memcpy (&stamp
, i
->rd_ptr (), sizeof(stamp
));
239 ACE_hrtime_t now
= ACE_OS::gethrtime ();
246 recv_latency
.sample (now
- recv_base
,
251 pong_callback
.send_response (stamp
);
255 TAO_AV_CORE::instance ()->orb ()->shutdown ();
260 Ping_Recv_Callback::handle_destroy ()
262 ACE_DEBUG ((LM_DEBUG
,"Ping_Recv_Callback::destroy\n"));
266 // ****************************************************************
268 Pong_Send::Pong_Send ()
269 : TAO_FlowProducer ("Pong",
276 Pong_Send::get_callback (const char *,
277 TAO_AV_Callback
*&callback
)
279 ACE_DEBUG ((LM_DEBUG
,"Pong_Send::get_callback\n"));
280 callback
= &pong_callback
;
285 Pong_Send_Callback::get_timeout (ACE_Time_Value
*&tv
,
288 // @@ ACE_NEW (tv, ACE_Time_Value (0, milliseconds * 1000));
289 ACE_DEBUG ((LM_DEBUG
,"Pong_Send_Callback::get_timeout\n"));
294 Pong_Send_Callback::handle_timeout (void *)
296 // ACE_DEBUG ((LM_DEBUG, "pong timeout (ignored)\n"));
301 Pong_Send_Callback::handle_end_stream ()
307 Pong_Send_Callback::send_response (ACE_hrtime_t stamp
)
309 ACE_DEBUG ((LM_DEBUG
, "pong send response)\n"));
313 ACE_Message_Block
mb (reinterpret_cast<char*> (buf
),
317 buf
[1] = ACE_OS::gethrtime ();
318 mb
.wr_ptr (sizeof(buf
));
320 int result
= this->protocol_object_
->send_frame (&mb
);
322 ACE_ERROR_RETURN ((LM_ERROR
,
323 "Pong_Send_Callback::send - %p\n",