2 //=============================================================================
6 * Measures TCP round-trip performance.
8 * @author Based on udp_test by Fred Kuhns and David L. LevineModified by Carlos O'Ryan and Nanbor Wang.
10 //=============================================================================
13 #include "ace/Reactor.h"
14 #include "ace/Select_Reactor.h"
15 #include "ace/TP_Reactor.h"
16 #include "ace/SOCK_Stream.h"
17 #include "ace/SOCK_Acceptor.h"
18 #include "ace/SOCK_Connector.h"
19 #include "ace/INET_Addr.h"
21 #include "ace/Get_Opt.h"
22 #include "ace/High_Res_Timer.h"
23 #include "ace/Thread_Manager.h"
24 #include "ace/Sched_Params.h"
25 #include "ace/Stats.h"
26 #include "ace/Throughput_Stats.h"
27 #include "ace/Sample_History.h"
28 #include "ace/OS_main.h"
29 #include "ace/OS_NS_arpa_inet.h"
30 #include "ace/OS_NS_ctype.h"
31 #include "ace/OS_NS_errno.h"
32 #include "ace/OS_NS_string.h"
33 #include "ace/OS_NS_unistd.h"
35 // FUZZ: disable check_for_math_include
38 // Global variables (evil).
39 static const u_short DEFPORT
= 5050;
40 static const int MAXPKTSZ
= 65536;
41 static const int DEFPKTSZ
= 64;
42 static const int DEFITERATIONS
= 1000;
43 static const int DEFINTERVAL
= 0;
44 static const int DEFAULT_THRNO
= 10;
46 static char sbuf
[MAXPKTSZ
];
47 static char rbuf
[MAXPKTSZ
];
49 static int usdelay
= DEFINTERVAL
;
50 static int bufsz
= DEFPKTSZ
;
51 static int VERBOSE
= 0;
52 static int dump_history
= 0;
53 static int svr_thrno
= DEFAULT_THRNO
;
54 static int server
= 0;
55 static int client
= 0;
56 static int nsamples
= DEFITERATIONS
;
57 static int so_bufsz
= 0;
58 static u_int use_reactor
= 0;
74 " [-h] (dump all the samples)\n"
75 " [-m message size]\n"
78 " [-b socket bufsz]\n"
82 // " [-x max_sample_allowed]\n"
83 " [-t number of threads]\n"
84 " [-a to use the ACE Select reactor]\n"
85 " [-x to use the ACE TP reactor]\n"
86 " [-w to use the ACE WFMO reactor]\n"
90 // ****************************************************************
92 class Client
: public ACE_Event_Handler
95 Client (const ACE_INET_Addr
&remote_addr
);
99 // = Override <ACE_Event_Handler> methods.
100 virtual ACE_HANDLE
get_handle () const;
101 virtual int handle_input (ACE_HANDLE
);
102 virtual int handle_close (ACE_HANDLE handle
,
103 ACE_Reactor_Mask close_mask
);
105 //FUZZ: disable check_for_lack_ACE_OS
106 /// Send the <buf> to the server.
107 int send (const char *buf
, size_t len
);
108 //FUZZ: enable check_for_lack_ACE_OS
110 /// Wait for the response.
111 int get_response (char *buf
, size_t len
);
113 /// Send messages to server and record statistics.
116 //FUZZ: disable check_for_lack_ACE_OS
117 /// Send shutdown message to server.
119 //FUZZ: enable check_for_lack_ACE_OS
122 /// To send messages and receive responses.
123 ACE_SOCK_Stream endpoint_
;
125 /// The address to send messages to.
126 ACE_INET_Addr remote_addr_
;
129 Client (const Client
&) = delete;
130 Client
&operator= (const Client
&) = delete;
133 Client::Client (const ACE_INET_Addr
&remote_addr
)
134 : remote_addr_ (remote_addr
)
136 ACE_SOCK_Connector connector
;
137 if (connector
.connect (this->endpoint_
, remote_addr
) == -1)
139 ACE_ERROR ((LM_ERROR
, "Client - %p\n",
145 if (ACE_Reactor::instance ()->register_handler
146 (this, ACE_Event_Handler::READ_MASK
) == -1)
147 ACE_ERROR ((LM_ERROR
,
148 "ACE_Reactor::register_handler: Client\n"));
157 Client::get_handle () const
159 return this->endpoint_
.get_handle ();
163 Client::handle_input (ACE_HANDLE
)
167 ssize_t n
= this->endpoint_
.recv (buf
, sizeof buf
);
170 ACE_ERROR ((LM_ERROR
,
174 ACE_DEBUG ((LM_DEBUG
,
175 "(%P|%t) buf of size %d = %*s\n",
184 Client::handle_close (ACE_HANDLE
,
187 this->endpoint_
.close ();
192 Client::send (const char *buf
, size_t len
)
194 return this->endpoint_
.send (buf
, len
);
198 Client::get_response (char *buf
, size_t len
)
200 return this->endpoint_
.recv (buf
, len
);
206 ACE_OS::memset (sbuf
, 0, bufsz
);
207 ACE_OS::memset (rbuf
, 0, bufsz
);
209 for (int j
= 0; j
!= 100; ++j
)
211 if (this->send (sbuf
, bufsz
) <= 0)
212 ACE_ERROR_RETURN ((LM_ERROR
, "(%P) %p\n", "send"), -1);
215 if ((this->get_response (rbuf
, bufsz
)) <= 0)
216 ACE_ERROR_RETURN ((LM_ERROR
, "(%P) %p\n", "get_response"), -1);
219 ACE_Sample_History
history (nsamples
);
221 ACE_hrtime_t test_start
= ACE_OS::gethrtime ();
222 for (int i
= 0; i
!= nsamples
; ++i
)
226 ACE_Time_Value
tv (0, usecs
);
230 ACE_hrtime_t start
= ACE_OS::gethrtime ();
231 if (this->send (sbuf
, bufsz
) <= 0)
232 ACE_ERROR_RETURN ((LM_ERROR
, "(%P) %p\n", "send"), -1);
235 if ((this->get_response (rbuf
, bufsz
)) <= 0)
236 ACE_ERROR_RETURN ((LM_ERROR
, "(%P) %p\n", "get_response"), -1);
238 ACE_hrtime_t end
= ACE_OS::gethrtime ();
240 history
.sample (end
- start
);
242 if (VERBOSE
&& i
% 500 == 0)
244 ACE_DEBUG ((LM_DEBUG
,
245 "Send %d / %d events\n", i
, nsamples
));
248 ACE_hrtime_t test_end
= ACE_OS::gethrtime ();
250 ACE_High_Res_Timer::global_scale_factor_type gsf
=
251 ACE_High_Res_Timer::global_scale_factor ();
255 history
.dump_samples (ACE_TEXT("HISTORY"), gsf
);
258 ACE_Basic_Stats latency
;
259 history
.collect_basic_stats (latency
);
260 latency
.dump_results (ACE_TEXT("Client"), gsf
);
261 ACE_Throughput_Stats::dump_throughput (ACE_TEXT("Client"),
263 test_end
- test_start
,
264 latency
.samples_count ());
273 const char buf
= 'S';
274 int n
= this->endpoint_
.send (&buf
, 1u);
278 if (ACE_Reactor::instance ()->remove_handler
279 (this, ACE_Event_Handler::READ_MASK
) == -1)
280 ACE_ERROR_RETURN ((LM_ERROR
,
281 "ACE_Reactor::remove_handler: Client\n"),
288 // ****************************************************************
290 class Server
: public ACE_Event_Handler
293 Server (const ACE_INET_Addr
&addr
);
297 // = Override <ACE_Event_Handler> methods.
298 virtual ACE_HANDLE
get_handle () const;
299 virtual int handle_input (ACE_HANDLE
);
300 virtual int handle_close (ACE_HANDLE handle
,
301 ACE_Reactor_Mask close_mask
);
304 /// Receives datagrams.
305 ACE_SOCK_Stream endpoint_
;
308 Server (const Server
&) = delete;
309 Server
&operator= (const Server
&) = delete;
312 Server::Server (const ACE_INET_Addr
&addr
)
314 ACE_SOCK_Acceptor acceptor
;
316 if (acceptor
.open (addr
, 1) == -1)
317 ACE_DEBUG ((LM_DEBUG
, "%p\n", "open"));
319 ACE_DEBUG ((LM_DEBUG
, "Listening on %s:%d\n",
320 addr
.get_host_name (),
321 addr
.get_port_number ()));
322 if (acceptor
.accept (this->endpoint_
) == -1)
323 ACE_ERROR ((LM_ERROR
, "Server::Server %p\n",
328 if (ACE_Reactor::instance ()->register_handler
330 ACE_Event_Handler::READ_MASK
) == -1)
331 ACE_ERROR ((LM_ERROR
,
332 "ACE_Reactor::register_handler: Server\n"));
335 #if !defined (ACE_LACKS_SO_SNDBUF)
338 if (this->endpoint_
.set_option (SOL_SOCKET
,
341 sizeof (so_bufsz
)) == -1
344 ACE_ERROR ((LM_ERROR
, "Server::Server: SO_SNDBUF %p\n",
345 "set_option failed"));
348 #endif /* ACE_LACKS_SO_SNDBUF */
349 #if !defined (ACE_LACKS_SO_RCVBUF)
352 if (this->endpoint_
.set_option (SOL_SOCKET
,
355 sizeof (so_bufsz
)) == -1
358 ACE_ERROR ((LM_ERROR
, "Server::Server: SO_RCVBUF %p\n",
359 "set_option failed"));
362 #endif /* !ACE_LACKS_SO_RCVBUF */
363 if (acceptor
.close () == -1)
364 ACE_ERROR ((LM_ERROR
, "Server::Server %p\n",
370 this->endpoint_
.close ();
374 Server::get_handle () const
376 return this->endpoint_
.get_handle ();
380 Server::handle_input (ACE_HANDLE
)
384 ssize_t n
= this->endpoint_
.recv (buf
, bufsz
);
387 ACE_DEBUG ((LM_ERROR
,
389 "handle_input: recv"));
391 // Send the message back as the response.
392 if (this->endpoint_
.send (buf
, n
) == n
)
394 if (n
== 1 && buf
[0] == 'S')
398 // Indicate done by returning 1.
402 if (ACE_Reactor::instance ()->remove_handler
403 (this, ACE_Event_Handler::READ_MASK
) == -1)
404 ACE_ERROR_RETURN ((LM_ERROR
,
405 "ACE_Reactor::remove_handler: server\n"),
408 ACE_Reactor::end_event_loop ();
414 ACE_DEBUG ((LM_ERROR
,
416 "handle_input: send"));
421 Server::handle_close (ACE_HANDLE
,
424 this->endpoint_
.close ();
429 static ACE_THR_FUNC_RETURN
430 thread_pool_worker (void *)
432 // Server thread function.
434 while (!ACE_Reactor::event_loop_done ())
436 if (ACE_Reactor::instance ()->handle_events () == -1)
437 ACE_ERROR ((LM_ERROR
,
438 ACE_TEXT ("(%t) %p\n"),
439 ACE_TEXT ("Error handling events")));
446 run_server (ACE_INET_Addr
&addr
)
450 ACE_Reactor
*new_reactor
= 0;
456 ACE_Select_Reactor
*sr
= new ACE_Select_Reactor ();
457 new_reactor
= new ACE_Reactor (sr
, 1);
462 ACE_TP_Reactor
*sr
= new ACE_TP_Reactor ();
463 new_reactor
= new ACE_Reactor (sr
, 1);
467 #if defined (ACE_WIN32)
471 #endif /* ACE_WIN32 */
473 ACE_ERROR_RETURN ((LM_ERROR
, "Invalid reactor type selected\n"), -1);
475 ACE_Reactor::instance (new_reactor
, 1);
478 Server
server (addr
);
482 // Handle input in the current thread.
483 // This is actually equivalent to thread-per-connection model.
484 while (server
.handle_input (0) != 1)
492 // Run the reactor event loop.
493 ACE_Reactor::run_event_loop ();
496 ACE_Thread_Manager::instance ()->spawn_n (svr_thrno
,
498 ACE_Thread_Manager::instance ()->wait ();
503 break; // won't happen here.
511 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
513 int c
, dstport
= DEFPORT
;
515 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
)
516 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO
)) / 2;
517 priority
= ACE_Sched_Params::next_priority (ACE_SCHED_FIFO
,
519 // Enable FIFO scheduling, e.g.
520 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
522 ACE_SCOPE_PROCESS
)) != 0)
524 if (ACE_OS::last_error () == EPERM
)
526 ACE_DEBUG ((LM_DEBUG
,
527 "server (%P|%t): user is not superuser, "
528 "test runs in time-shared class\n"));
531 ACE_ERROR ((LM_ERROR
,
532 "server (%P|%t): sched_params failed\n"));
535 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("hxwvb:I:p:sci:m:at:"));
537 while ((c
= get_opt ()) != -1)
550 bufsz
= ACE_OS::atoi (get_opt
.opt_arg ());
553 ACE_ERROR_RETURN ((LM_ERROR
,
554 "\nMessage size must be greater than 0!\n\n"),
556 else if (bufsz
> BUFSIZ
)
557 ACE_ERROR_RETURN ((LM_ERROR
,
558 "\nbufsz must be <= %d\n",
564 nsamples
= ACE_OS::atoi (get_opt
.opt_arg ());
566 ACE_ERROR_RETURN ((LM_ERROR
,
567 "\nIterations must be greater than 0!\n\n"),
572 use_reactor
= SELECT
;
580 #if defined (ACE_WIN32)
584 ACE_ERROR_RETURN ((LM_ERROR
, "WFMO_Reactor is not supported\n"), -1);
585 #endif /* ACE_WIN32 */
588 so_bufsz
= ACE_OS::atoi (get_opt
.opt_arg ());
591 ACE_ERROR_RETURN ((LM_ERROR
,
592 "\nInvalid socket buffer size!\n\n"),
597 usdelay
= ACE_OS::atoi (get_opt
.opt_arg ());
602 ACE_ERROR_RETURN ((LM_ERROR
,
603 "%s: bad usdelay: %s\n",
611 dstport
= ACE_OS::atoi (get_opt
.opt_arg ());
613 ACE_ERROR_RETURN ((LM_ERROR
,
614 "\nInvalid port number!\n\n"),
618 svr_thrno
= ACE_OS::atoi (get_opt
.opt_arg ());
621 ACE_ERROR_RETURN ((LM_ERROR
,
622 "\nInvalid server thread number!\n\n"),
640 if ((get_opt
.opt_ind () >= argc
&& client
!= 0) || argc
== 1)
646 ACE_INET_Addr
addr (dstport
);
650 return run_server (addr
);
653 if ((u_int
) bufsz
< sizeof (ACE_hrtime_t
))
655 ACE_ERROR_RETURN ((LM_ERROR
,
656 "\nbufsz must be >= %d\n",
657 sizeof (ACE_hrtime_t
)),
661 ACE_INET_Addr remote_addr
;
663 if (ACE_OS::ace_isdigit(argv
[get_opt
.opt_ind ()][0]))
665 if (remote_addr
.set (ACE_HTONS(dstport
),
666 (ACE_UINT32
) ACE_OS::inet_addr
667 (ACE_TEXT_ALWAYS_CHAR(argv
[get_opt
.opt_ind ()])), 0) == -1)
668 ACE_ERROR_RETURN ((LM_ERROR
,
669 "invalid IP address: %s\n",
670 argv
[get_opt
.opt_ind ()]),
675 if (remote_addr
.set (dstport
, argv
[get_opt
.opt_ind ()]) == -1)
676 ACE_ERROR_RETURN ((LM_ERROR
,
677 "invalid IP address: %s\n",
678 argv
[get_opt
.opt_ind ()]),
681 get_opt
.opt_ind ()++;
683 ACE_DEBUG ((LM_DEBUG
, "Connecting to %s:%d\n",
684 remote_addr
.get_host_name (),
685 remote_addr
.get_port_number ()));
687 Client
client (remote_addr
);
689 ACE_DEBUG ((LM_DEBUG
,
690 "\nSending %d byte packets to %s:%d "
691 "with so_bufsz = %d\n\n",
693 addr
.get_host_name (),