Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / AVStreams / Latency / pong.cpp
blob42d6dddf6b4045bf6d80d7d9b8000f7ddcfc1cf6
1 #include "pong.h"
2 #include "orbsvcs/AV/Protocol_Factory.h"
3 #include "tao/PortableServer/PortableServer.h"
4 #include "tao/Strategies/advanced_resource.h"
5 #include "tao/ORB.h"
6 #include "tao/debug.h"
7 #include "ace/Get_Opt.h"
8 #include "ace/High_Res_Timer.h"
9 #include "ace/Stats.h"
10 #include "ace/Throughput_Stats.h"
12 const ACE_TCHAR *ior_output_file = ACE_TEXT ("pong.ior");
13 const char *protocol = "RTP/UDP";
14 int milliseconds = 100;
15 size_t message_size = 64;
16 int respond = 1;
17 AVStreams::protocolSpec pong_protocols;
18 AVStreams::protocolSpec ping_protocols;
20 ACE_hrtime_t recv_throughput_base = 0;
21 ACE_Throughput_Stats recv_latency;
23 ACE_hrtime_t send_throughput_base = 0;
24 ACE_Throughput_Stats send_latency;
27 int
28 parse_args (int argc, ACE_TCHAR *argv[])
30 ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("xo:s:r:t:b:d"));
31 int c;
33 while ((c = get_opts ()) != -1)
34 switch (c)
36 case 'o':
37 ior_output_file = get_opts.opt_arg ();
38 break;
40 case 'r':
42 CORBA::ULong l = ping_protocols.length ();
43 ping_protocols.length (l + 1);
44 ping_protocols[l] = CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (get_opts.opt_arg ()));
46 break;
48 case 's':
50 CORBA::ULong l = pong_protocols.length ();
51 pong_protocols.length (l + 1);
52 pong_protocols[l] = CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (get_opts.opt_arg ()));
54 break;
56 case 't':
57 milliseconds = ACE_OS::atoi (get_opts.opt_arg ());
58 break;
60 case 'b':
61 message_size = ACE_OS::atoi (get_opts.opt_arg ());
62 if (message_size < sizeof(ACE_hrtime_t))
64 ACE_DEBUG ((LM_DEBUG, "Invalid message size\n"));
65 message_size = 64;
67 break;
69 case 'x':
70 respond = 0;
71 break;
72 case 'd':
73 TAO_debug_level++;
74 break;
76 case '?':
77 default:
78 ACE_ERROR_RETURN ((LM_ERROR,
79 "usage: %s "
80 "-o <iorfile> "
81 "-r <protocol=addr> "
82 "-s <protocol=addr> "
83 "-t <milliseconds> "
84 "\n",
85 argv [0]),
86 -1);
90 // If no protocols are specified use the default...
91 if (pong_protocols.length () == 0)
93 pong_protocols.length (1);
94 pong_protocols[0] = CORBA::string_dup ("UDP=localhost:23456");
97 if (ping_protocols.length () == 0)
99 ping_protocols.length (1);
100 ping_protocols[0] = CORBA::string_dup ("UDP=localhost:12345");
103 // Indicates successful parsing of the command line
104 return 0;
107 int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
111 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
113 parse_args (argc, argv);
115 CORBA::Object_var obj
116 = orb->resolve_initial_references ("RootPOA");
118 PortableServer::POA_var poa
119 = PortableServer::POA::_narrow (obj.in ());
121 PortableServer::POAManager_var mgr
122 = poa->the_POAManager ();
124 mgr->activate ();
126 TAO_AV_CORE::instance ()->init (orb.in (),
127 poa.in ());
129 Reactive_Strategy *reactive_strategy;
130 ACE_NEW_RETURN (reactive_strategy,
131 Reactive_Strategy,
133 reactive_strategy->init (orb.in (), poa.in ());
134 TAO_MMDevice *mmdevice_impl;
135 ACE_NEW_RETURN (mmdevice_impl,
136 TAO_MMDevice (reactive_strategy),
139 AVStreams::MMDevice_var mmdevice =
140 mmdevice_impl->_this ();
142 CORBA::String_var ior =
143 orb->object_to_string (mmdevice.in ());
145 ACE_DEBUG ((LM_DEBUG, "Activated as <%C>\n", ior.in () ));
147 // If the ior_output_file exists, output the ior to it
148 if (ior_output_file != 0)
150 FILE *output_file= ACE_OS::fopen (ior_output_file, "w");
151 if (output_file == 0)
152 ACE_ERROR_RETURN ((LM_ERROR,
153 "Cannot open output file %s for writing IOR: %C",
154 ior_output_file,
155 ior.in ()),
157 ACE_OS::fprintf (output_file, "%s", ior.in ());
158 ACE_OS::fclose (output_file);
161 Pong_Recv_FDev* pong_fdev_impl;
162 ACE_NEW_RETURN (pong_fdev_impl,
163 Pong_Recv_FDev ("Pong"),
165 Ping_Send_FDev* ping_fdev_impl;
166 ACE_NEW_RETURN (ping_fdev_impl,
167 Ping_Send_FDev ("Ping"),
170 AVStreams::FDev_var ping_fdev =
171 ping_fdev_impl->_this ();
172 AVStreams::FDev_var pong_fdev =
173 pong_fdev_impl->_this ();
175 mmdevice->add_fdev (ping_fdev.in ());
176 if (respond == 1)
178 mmdevice->add_fdev (pong_fdev.in ());
181 orb->run ();
183 ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
185 ACE_DEBUG ((LM_DEBUG, "Calibrating scale factory . . . "));
186 ACE_High_Res_Timer::global_scale_factor_type gsf =
187 ACE_High_Res_Timer::global_scale_factor ();
188 ACE_DEBUG ((LM_DEBUG, "done\n"));
190 recv_latency.dump_results (ACE_TEXT("Receive"), gsf);
192 send_latency.dump_results (ACE_TEXT("Send"), gsf);
194 catch (const CORBA::Exception& ex)
196 ex._tao_print_exception ("Caught exception:");
197 return 1;
200 return 0;
203 // ****************************************************************
205 Pong_Recv::Pong_Recv ()
206 : TAO_FlowConsumer ("Pong",
207 pong_protocols,
208 "UNS:pong")
213 Pong_Recv::get_callback (const char *,
214 TAO_AV_Callback *&callback)
216 // ACE_DEBUG ((LM_DEBUG,"Pong_Recv::get_callback\n"));
217 callback = &this->callback_;
218 return 0;
222 Pong_Recv_Callback::handle_stop ()
224 // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::stop"));
225 TAO_AV_CORE::instance ()->orb ()->shutdown ();
226 return 0;
230 Pong_Recv_Callback::receive_frame (ACE_Message_Block *frame,
231 TAO_AV_frame_info *,
232 const ACE_Addr &)
234 // ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::receive_frame\n"));
236 ACE_hrtime_t now = ACE_OS::gethrtime ();
237 for (const ACE_Message_Block *i = frame;
238 i != 0;
239 i = frame->cont ())
241 ACE_hrtime_t buf[2];
243 if (frame->length () < sizeof(buf))
245 ACE_DEBUG ((LM_DEBUG, "Unexpected message size\n"));
246 return 0;
249 ACE_OS::memcpy (buf, i->rd_ptr (), sizeof(buf));
251 if (recv_throughput_base == 0)
253 recv_throughput_base = now;
255 recv_latency.sample (now - recv_throughput_base,
256 now - buf[0]);
258 return 0;
262 Pong_Recv_Callback::handle_destroy ()
264 ACE_DEBUG ((LM_DEBUG,"Pong_Recv_Callback::destroy\n"));
265 return 0;
268 // ****************************************************************
270 Ping_Send::Ping_Send ()
271 : TAO_FlowProducer ("Ping",
272 ping_protocols,
273 "UNS:ping")
278 Ping_Send::get_callback (const char *,
279 TAO_AV_Callback *&callback)
281 // ACE_DEBUG ((LM_DEBUG,"Ping_Send::get_callback\n"));
282 callback = &this->callback_;
283 return 0;
286 Ping_Send_Callback::Ping_Send_Callback ()
287 :count_ (0)
289 this->timeout_ = ACE_Time_Value (2);
291 this->frame_.size (message_size);
292 this->frame_.wr_ptr (message_size);
295 void
296 Ping_Send_Callback::get_timeout (ACE_Time_Value *&tv,
297 void *&)
299 tv = &this->timeout_;
303 Ping_Send_Callback::handle_timeout (void *)
305 this->count_++;
307 ACE_DEBUG ((LM_DEBUG, "Ping timeout frame %d\n", this->count_));
309 if (this->count_ > 10)
311 TAO_AV_CORE::instance ()->orb ()->shutdown ();
312 return 0;
315 ACE_hrtime_t stamp = ACE_OS::gethrtime ();
316 ACE_OS::memcpy (this->frame_.rd_ptr (), &stamp, sizeof(stamp));
318 int result = this->protocol_object_->send_frame (&this->frame_);
319 if (result < 0)
320 ACE_ERROR_RETURN ((LM_ERROR,
321 "Ping_Send_Callback::handle_timeout - send_frame - %p\n",
322 ""),
323 -1);
325 if (send_throughput_base == 0)
327 send_throughput_base = stamp;
329 ACE_hrtime_t now = ACE_OS::gethrtime ();
330 send_latency.sample (now - send_throughput_base,
331 now - stamp);
333 return 0;
337 Ping_Send_Callback::handle_end_stream ()
339 return 0;