Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / performance-tests / TCP / tcp_test.cpp
blob375cb51d3e31fe52f651b5a4bd43614a80ed873b
2 //=============================================================================
3 /**
4 * @file tcp_test.cpp
6 * Measures TCP round-trip performance.
8 * @author Based on udp_test by Fred Kuhns and David L. LevineModified by Carlos O'Ryan and Nanbor Wang.
9 */
10 //=============================================================================
13 #include "ace/Reactor.h"
14 #include "ace/Select_Reactor.h"
15 #include "ace/TP_Reactor.h"
16 #include "ace/SOCK_Stream.h"
17 #include "ace/SOCK_Acceptor.h"
18 #include "ace/SOCK_Connector.h"
19 #include "ace/INET_Addr.h"
20 #include "ace/ACE.h"
21 #include "ace/Get_Opt.h"
22 #include "ace/High_Res_Timer.h"
23 #include "ace/Thread_Manager.h"
24 #include "ace/Sched_Params.h"
25 #include "ace/Stats.h"
26 #include "ace/Throughput_Stats.h"
27 #include "ace/Sample_History.h"
28 #include "ace/OS_main.h"
29 #include "ace/OS_NS_arpa_inet.h"
30 #include "ace/OS_NS_ctype.h"
31 #include "ace/OS_NS_errno.h"
32 #include "ace/OS_NS_string.h"
33 #include "ace/OS_NS_unistd.h"
35 // FUZZ: disable check_for_math_include
36 #include <math.h>
38 // Global variables (evil).
39 static const u_short DEFPORT = 5050;
40 static const int MAXPKTSZ = 65536;
41 static const int DEFPKTSZ = 64;
42 static const int DEFITERATIONS = 1000;
43 static const int DEFINTERVAL = 0;
44 static const int DEFAULT_THRNO = 10;
46 static char sbuf[MAXPKTSZ];
47 static char rbuf[MAXPKTSZ];
49 static int usdelay = DEFINTERVAL;
50 static int bufsz = DEFPKTSZ;
51 static int VERBOSE = 0;
52 static int dump_history = 0;
53 static int svr_thrno = DEFAULT_THRNO;
54 static int server = 0;
55 static int client = 0;
56 static int nsamples = DEFITERATIONS;
57 static int so_bufsz = 0;
58 static u_int use_reactor = 0;
59 static int usecs = 0;
61 enum {
62 SELECT = 1,
63 TP,
64 WFMO
68 static void
69 usage ()
71 ACE_ERROR ((LM_ERROR,
72 "tcp_test\n"
73 " [-v] (Verbose)\n"
74 " [-h] (dump all the samples)\n"
75 " [-m message size]\n"
76 " [-i iterations]\n"
77 " [-I usdelay]\n"
78 " [-b socket bufsz]\n"
79 " [-p port]\n"
80 " [-s]\n"
81 " [-c]\n"
82 // " [-x max_sample_allowed]\n"
83 " [-t number of threads]\n"
84 " [-a to use the ACE Select reactor]\n"
85 " [-x to use the ACE TP reactor]\n"
86 " [-w to use the ACE WFMO reactor]\n"
87 " targethost\n"));
90 // ****************************************************************
92 class Client : public ACE_Event_Handler
94 public:
95 Client (const ACE_INET_Addr &remote_addr);
97 virtual ~Client ();
99 // = Override <ACE_Event_Handler> methods.
100 virtual ACE_HANDLE get_handle () const;
101 virtual int handle_input (ACE_HANDLE);
102 virtual int handle_close (ACE_HANDLE handle,
103 ACE_Reactor_Mask close_mask);
105 //FUZZ: disable check_for_lack_ACE_OS
106 /// Send the <buf> to the server.
107 int send (const char *buf, size_t len);
108 //FUZZ: enable check_for_lack_ACE_OS
110 /// Wait for the response.
111 int get_response (char *buf, size_t len);
113 /// Send messages to server and record statistics.
114 int run ();
116 //FUZZ: disable check_for_lack_ACE_OS
117 /// Send shutdown message to server.
118 int shutdown ();
119 //FUZZ: enable check_for_lack_ACE_OS
121 private:
122 /// To send messages and receive responses.
123 ACE_SOCK_Stream endpoint_;
125 /// The address to send messages to.
126 ACE_INET_Addr remote_addr_;
128 Client () = delete;
129 Client (const Client &) = delete;
130 Client &operator= (const Client &) = delete;
133 Client::Client (const ACE_INET_Addr &remote_addr)
134 : remote_addr_ (remote_addr)
136 ACE_SOCK_Connector connector;
137 if (connector.connect (this->endpoint_, remote_addr) == -1)
139 ACE_ERROR ((LM_ERROR, "Client - %p\n",
140 "connect failed"));
143 if (use_reactor)
145 if (ACE_Reactor::instance ()->register_handler
146 (this, ACE_Event_Handler::READ_MASK) == -1)
147 ACE_ERROR ((LM_ERROR,
148 "ACE_Reactor::register_handler: Client\n"));
152 Client::~Client ()
156 ACE_HANDLE
157 Client::get_handle () const
159 return this->endpoint_.get_handle ();
163 Client::handle_input (ACE_HANDLE)
165 char buf[BUFSIZ];
167 ssize_t n = this->endpoint_.recv (buf, sizeof buf);
169 if (n == -1)
170 ACE_ERROR ((LM_ERROR,
171 "%p\n",
172 "handle_input"));
173 else
174 ACE_DEBUG ((LM_DEBUG,
175 "(%P|%t) buf of size %d = %*s\n",
178 buf));
180 return 0;
184 Client::handle_close (ACE_HANDLE,
185 ACE_Reactor_Mask)
187 this->endpoint_.close ();
188 return 0;
192 Client::send (const char *buf, size_t len)
194 return this->endpoint_.send (buf, len);
198 Client::get_response (char *buf, size_t len)
200 return this->endpoint_.recv (buf, len);
204 Client::run ()
206 ACE_OS::memset (sbuf, 0, bufsz);
207 ACE_OS::memset (rbuf, 0, bufsz);
209 for (int j = 0; j != 100; ++j)
211 if (this->send (sbuf, bufsz) <= 0)
212 ACE_ERROR_RETURN ((LM_ERROR, "(%P) %p\n", "send"), -1);
214 // ssize_t n;
215 if ((this->get_response (rbuf, bufsz)) <= 0)
216 ACE_ERROR_RETURN ((LM_ERROR, "(%P) %p\n", "get_response"), -1);
219 ACE_Sample_History history (nsamples);
221 ACE_hrtime_t test_start = ACE_OS::gethrtime ();
222 for (int i = 0; i != nsamples; ++i)
224 if (usecs != 0)
226 ACE_Time_Value tv (0, usecs);
227 ACE_OS::sleep (tv);
230 ACE_hrtime_t start = ACE_OS::gethrtime ();
231 if (this->send (sbuf, bufsz) <= 0)
232 ACE_ERROR_RETURN ((LM_ERROR, "(%P) %p\n", "send"), -1);
234 // ssize_t n;
235 if ((this->get_response (rbuf, bufsz)) <= 0)
236 ACE_ERROR_RETURN ((LM_ERROR, "(%P) %p\n", "get_response"), -1);
238 ACE_hrtime_t end = ACE_OS::gethrtime ();
240 history.sample (end - start);
242 if (VERBOSE && i % 500 == 0)
244 ACE_DEBUG ((LM_DEBUG,
245 "Send %d / %d events\n", i, nsamples));
248 ACE_hrtime_t test_end = ACE_OS::gethrtime ();
250 ACE_High_Res_Timer::global_scale_factor_type gsf =
251 ACE_High_Res_Timer::global_scale_factor ();
253 if (dump_history)
255 history.dump_samples (ACE_TEXT("HISTORY"), gsf);
258 ACE_Basic_Stats latency;
259 history.collect_basic_stats (latency);
260 latency.dump_results (ACE_TEXT("Client"), gsf);
261 ACE_Throughput_Stats::dump_throughput (ACE_TEXT("Client"),
262 gsf,
263 test_end - test_start,
264 latency.samples_count ());
267 return 0;
271 Client::shutdown ()
273 const char buf = 'S';
274 int n = this->endpoint_.send (&buf, 1u);
276 if (use_reactor)
278 if (ACE_Reactor::instance ()->remove_handler
279 (this, ACE_Event_Handler::READ_MASK) == -1)
280 ACE_ERROR_RETURN ((LM_ERROR,
281 "ACE_Reactor::remove_handler: Client\n"),
282 -1);
285 return n;
288 // ****************************************************************
290 class Server : public ACE_Event_Handler
292 public:
293 Server (const ACE_INET_Addr &addr);
295 virtual ~Server ();
297 // = Override <ACE_Event_Handler> methods.
298 virtual ACE_HANDLE get_handle () const;
299 virtual int handle_input (ACE_HANDLE);
300 virtual int handle_close (ACE_HANDLE handle,
301 ACE_Reactor_Mask close_mask);
303 private:
304 /// Receives datagrams.
305 ACE_SOCK_Stream endpoint_;
307 Server () = delete;
308 Server (const Server &) = delete;
309 Server &operator= (const Server &) = delete;
312 Server::Server (const ACE_INET_Addr &addr)
314 ACE_SOCK_Acceptor acceptor;
316 if (acceptor.open (addr, 1) == -1)
317 ACE_DEBUG ((LM_DEBUG, "%p\n", "open"));
319 ACE_DEBUG ((LM_DEBUG, "Listening on %s:%d\n",
320 addr.get_host_name (),
321 addr.get_port_number ()));
322 if (acceptor.accept (this->endpoint_) == -1)
323 ACE_ERROR ((LM_ERROR, "Server::Server %p\n",
324 "accept failed"));
326 if (use_reactor)
328 if (ACE_Reactor::instance ()->register_handler
329 (this,
330 ACE_Event_Handler::READ_MASK) == -1)
331 ACE_ERROR ((LM_ERROR,
332 "ACE_Reactor::register_handler: Server\n"));
335 #if !defined (ACE_LACKS_SO_SNDBUF)
336 if (so_bufsz != 0)
338 if (this->endpoint_.set_option (SOL_SOCKET,
339 SO_SNDBUF,
340 (void *) &so_bufsz,
341 sizeof (so_bufsz)) == -1
342 && errno != ENOTSUP)
344 ACE_ERROR ((LM_ERROR, "Server::Server: SO_SNDBUF %p\n",
345 "set_option failed"));
348 #endif /* ACE_LACKS_SO_SNDBUF */
349 #if !defined (ACE_LACKS_SO_RCVBUF)
350 if (so_bufsz != 0)
352 if (this->endpoint_.set_option (SOL_SOCKET,
353 SO_RCVBUF,
354 (void *) &so_bufsz,
355 sizeof (so_bufsz)) == -1
356 && errno != ENOTSUP)
358 ACE_ERROR ((LM_ERROR, "Server::Server: SO_RCVBUF %p\n",
359 "set_option failed"));
362 #endif /* !ACE_LACKS_SO_RCVBUF */
363 if (acceptor.close () == -1)
364 ACE_ERROR ((LM_ERROR, "Server::Server %p\n",
365 "close failed"));
368 Server::~Server ()
370 this->endpoint_.close ();
373 ACE_HANDLE
374 Server::get_handle () const
376 return this->endpoint_.get_handle ();
380 Server::handle_input (ACE_HANDLE)
382 char buf[BUFSIZ];
384 ssize_t n = this->endpoint_.recv (buf, bufsz);
386 if (n == -1)
387 ACE_DEBUG ((LM_ERROR,
388 "%p\n",
389 "handle_input: recv"));
391 // Send the message back as the response.
392 if (this->endpoint_.send (buf, n) == n)
394 if (n == 1 && buf[0] == 'S')
396 if (!use_reactor)
398 // Indicate done by returning 1.
399 return 1;
402 if (ACE_Reactor::instance ()->remove_handler
403 (this, ACE_Event_Handler::READ_MASK) == -1)
404 ACE_ERROR_RETURN ((LM_ERROR,
405 "ACE_Reactor::remove_handler: server\n"),
406 -1);
408 ACE_Reactor::end_event_loop ();
411 return 0;
414 ACE_DEBUG ((LM_ERROR,
415 "%p\n",
416 "handle_input: send"));
417 return -1;
421 Server::handle_close (ACE_HANDLE,
422 ACE_Reactor_Mask)
424 this->endpoint_.close ();
426 return 0;
429 static ACE_THR_FUNC_RETURN
430 thread_pool_worker (void *)
432 // Server thread function.
434 while (!ACE_Reactor::event_loop_done ())
436 if (ACE_Reactor::instance ()->handle_events () == -1)
437 ACE_ERROR ((LM_ERROR,
438 ACE_TEXT ("(%t) %p\n"),
439 ACE_TEXT ("Error handling events")));
442 return 0;
446 run_server (ACE_INET_Addr &addr)
448 if (use_reactor)
450 ACE_Reactor *new_reactor = 0;
452 switch (use_reactor)
454 case SELECT:
456 ACE_Select_Reactor *sr = new ACE_Select_Reactor ();
457 new_reactor = new ACE_Reactor (sr, 1);
459 break;
460 case TP:
462 ACE_TP_Reactor *sr = new ACE_TP_Reactor ();
463 new_reactor = new ACE_Reactor (sr, 1);
465 break;
466 case WFMO:
467 #if defined (ACE_WIN32)
469 #else
471 #endif /* ACE_WIN32 */
472 default:
473 ACE_ERROR_RETURN ((LM_ERROR, "Invalid reactor type selected\n"), -1);
475 ACE_Reactor::instance (new_reactor, 1);
478 Server server (addr);
480 if (!use_reactor)
482 // Handle input in the current thread.
483 // This is actually equivalent to thread-per-connection model.
484 while (server.handle_input (0) != 1)
485 continue;
487 else
489 switch (use_reactor)
491 case SELECT:
492 // Run the reactor event loop.
493 ACE_Reactor::run_event_loop ();
494 break;
495 case TP:
496 ACE_Thread_Manager::instance ()->spawn_n (svr_thrno,
497 thread_pool_worker);
498 ACE_Thread_Manager::instance ()->wait ();
499 break;
500 case WFMO:
501 break;
502 default:
503 break; // won't happen here.
507 return 0;
511 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
513 int c, dstport = DEFPORT;
514 int priority =
515 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
516 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
517 priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO,
518 priority);
519 // Enable FIFO scheduling, e.g.
520 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
521 priority,
522 ACE_SCOPE_PROCESS)) != 0)
524 if (ACE_OS::last_error () == EPERM)
526 ACE_DEBUG ((LM_DEBUG,
527 "server (%P|%t): user is not superuser, "
528 "test runs in time-shared class\n"));
530 else
531 ACE_ERROR ((LM_ERROR,
532 "server (%P|%t): sched_params failed\n"));
535 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("hxwvb:I:p:sci:m:at:"));
537 while ((c = get_opt ()) != -1)
539 switch ((char) c)
541 case 'v':
542 VERBOSE = 1;
543 break;
545 case 'h':
546 dump_history = 1;
547 break;
549 case 'm':
550 bufsz = ACE_OS::atoi (get_opt.opt_arg ());
552 if (bufsz <= 0)
553 ACE_ERROR_RETURN ((LM_ERROR,
554 "\nMessage size must be greater than 0!\n\n"),
556 else if (bufsz > BUFSIZ)
557 ACE_ERROR_RETURN ((LM_ERROR,
558 "\nbufsz must be <= %d\n",
559 BUFSIZ),
561 break;
563 case 'i':
564 nsamples = ACE_OS::atoi (get_opt.opt_arg ());
565 if (nsamples <= 0)
566 ACE_ERROR_RETURN ((LM_ERROR,
567 "\nIterations must be greater than 0!\n\n"),
569 break;
571 case 'a':
572 use_reactor = SELECT;
573 break;
575 case 'x':
576 use_reactor = TP;
577 break;
579 case 'w':
580 #if defined (ACE_WIN32)
581 use_reactor = WFMO;
582 break;
583 #else
584 ACE_ERROR_RETURN ((LM_ERROR, "WFMO_Reactor is not supported\n"), -1);
585 #endif /* ACE_WIN32 */
587 case 'b':
588 so_bufsz = ACE_OS::atoi (get_opt.opt_arg ());
590 if (so_bufsz <= 0)
591 ACE_ERROR_RETURN ((LM_ERROR,
592 "\nInvalid socket buffer size!\n\n"),
594 break;
596 case 'I':
597 usdelay = ACE_OS::atoi (get_opt.opt_arg ());
599 if (usdelay < 0)
601 usdelay = 0;
602 ACE_ERROR_RETURN ((LM_ERROR,
603 "%s: bad usdelay: %s\n",
604 argv[0],
605 get_opt.opt_arg ()),
608 break;
610 case 'p':
611 dstport = ACE_OS::atoi (get_opt.opt_arg ());
612 if (dstport <= 0)
613 ACE_ERROR_RETURN ((LM_ERROR,
614 "\nInvalid port number!\n\n"),
616 break;
617 case 't':
618 svr_thrno = ACE_OS::atoi (get_opt.opt_arg ());
620 if (svr_thrno <= 0)
621 ACE_ERROR_RETURN ((LM_ERROR,
622 "\nInvalid server thread number!\n\n"),
624 break;
626 case 'c':
627 server = 0;
628 client = 1;
629 break;
630 case 's':
631 client = 0;
632 server = 1;
633 break;
634 default:
635 usage ();
636 return 1;
640 if ((get_opt.opt_ind () >= argc && client != 0) || argc == 1)
642 usage ();
643 return 1;
646 ACE_INET_Addr addr (dstport);
648 if (server)
650 return run_server (addr);
653 if ((u_int) bufsz < sizeof (ACE_hrtime_t))
655 ACE_ERROR_RETURN ((LM_ERROR,
656 "\nbufsz must be >= %d\n",
657 sizeof (ACE_hrtime_t)),
661 ACE_INET_Addr remote_addr;
663 if (ACE_OS::ace_isdigit(argv[get_opt.opt_ind ()][0]))
665 if (remote_addr.set (ACE_HTONS(dstport),
666 (ACE_UINT32) ACE_OS::inet_addr
667 (ACE_TEXT_ALWAYS_CHAR(argv[get_opt.opt_ind ()])), 0) == -1)
668 ACE_ERROR_RETURN ((LM_ERROR,
669 "invalid IP address: %s\n",
670 argv[get_opt.opt_ind ()]),
673 else
675 if (remote_addr.set (dstport, argv[get_opt.opt_ind ()]) == -1)
676 ACE_ERROR_RETURN ((LM_ERROR,
677 "invalid IP address: %s\n",
678 argv[get_opt.opt_ind ()]),
681 get_opt.opt_ind ()++;
683 ACE_DEBUG ((LM_DEBUG, "Connecting to %s:%d\n",
684 remote_addr.get_host_name (),
685 remote_addr.get_port_number ()));
687 Client client (remote_addr);
689 ACE_DEBUG ((LM_DEBUG,
690 "\nSending %d byte packets to %s:%d "
691 "with so_bufsz = %d\n\n",
692 bufsz,
693 addr.get_host_name (),
694 dstport,
695 so_bufsz));
697 client.run ();
698 client.shutdown ();
700 return 0;