Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / performance-tests / UDP / udp_test.cpp
blob48936920de4f30434895fc7be93a1a54c4608420
2 //=============================================================================
3 /**
4 * @file udp_test.cpp
6 * Measures UDP round-trip performance.
8 * @author Fred Kuhns and David L. Levine
9 */
10 //=============================================================================
13 #include "ace/OS_main.h"
14 #include "ace/Reactor.h"
15 #include "ace/SOCK_Dgram.h"
16 #include "ace/INET_Addr.h"
17 #include "ace/ACE.h"
18 #include "ace/Get_Opt.h"
19 #include "ace/High_Res_Timer.h"
20 #include "ace/Log_Msg.h"
21 #include "ace/OS_NS_stdio.h"
22 #include "ace/OS_NS_ctype.h"
23 #include "ace/OS_NS_arpa_inet.h"
24 #include "ace/OS_NS_string.h"
25 #include "ace/os_include/os_netdb.h"
26 #include "ace/OS_NS_unistd.h"
28 // FUZZ: disable check_for_math_include
29 #include <math.h>
31 // Global variables (evil).
32 static const u_short DEFPORT = 5050;
33 static const int MAXPKTSZ = 65536;
34 static const int DEFPKTSZ = 64;
35 static const int DEFITERATIONS = 1000;
36 static const int DEFINTERVAL = 1000; // 1000 usecs.
37 static const int DEFWINDOWSZ = 10; // 10 microsecond.
38 static char SendBuf[MAXPKTSZ];
39 static char RxBuf[MAXPKTSZ];
40 static ACE_TCHAR **cmd;
41 static ACE_TCHAR datafile[MAXHOSTNAMELEN];
43 static ACE_UINT32 nsamples = DEFITERATIONS;
44 static int usdelay = DEFINTERVAL;
45 static int bufsz = DEFPKTSZ;
46 static int window = DEFWINDOWSZ;
47 static int VERBOSE = 0;
48 static int logfile = 0;
49 static int server = 0;
50 static int client = 0;
51 static u_int use_reactor = 0;
52 ACE_hrtime_t max_allow = 0;
53 ACE_hrtime_t total_ltime;
54 ACE_hrtime_t ltime;
56 static void
57 usage ()
59 ACE_ERROR ((LM_ERROR,
60 "%s\n"
61 "[-w window_size]\n"
62 " [-f datafile] (creates datafile.samp and datafile.dist)\n"
63 " [-v] (Verbose)\n"
64 " [-b send_bufsz]\n"
65 " [-n nsamples]\n"
66 " [-I usdelay]\n"
67 " [-s so_bufsz]\n"
68 " [-p port]\n"
69 " [-t]\n"
70 " [-r]\n"
71 " [-x max_sample_allowed]\n"
72 " [-a to use the ACE reactor]\n"
73 " targethost\n",
74 *cmd));
77 static ACE_hrtime_t *Samples;
78 static u_int *Dist;
79 static ACE_TCHAR sumfile[30];
80 static ACE_TCHAR distfile[30];
81 static ACE_TCHAR sampfile[30];
83 class Client : public ACE_Event_Handler
85 public:
86 Client (const ACE_INET_Addr &addr,
87 const ACE_INET_Addr &remote_addr);
89 virtual ~Client ();
91 // = Override <ACE_Event_Handler> methods.
92 virtual ACE_HANDLE get_handle () const;
93 virtual int handle_input (ACE_HANDLE);
94 virtual int handle_close (ACE_HANDLE handle,
95 ACE_Reactor_Mask close_mask);
97 //FUZZ: disable check_for_lack_ACE_OS
98 /// Send the <buf> to the server.
99 int send (const char *buf, size_t len);
100 //FUZZ: enable check_for_lack_ACE_OS
102 /// Wait for the response.
103 int get_response (char *buf, size_t len);
105 /// Send messages to server and record statistics.
106 int run ();
108 //FUZZ: disable check_for_lack_ACE_OS
109 /// Send shutdown message to server.
110 int shutdown ();
111 //FUZZ: enable check_for_lack_ACE_OS
113 private:
114 /// To send messages and receive responses.
115 ACE_SOCK_Dgram endpoint_;
117 /// The address to send messages to.
118 ACE_INET_Addr remote_addr_;
120 Client () = delete;
121 Client (const Client &) = delete;
122 Client &operator= (const Client &) = delete;
125 Client::Client (const ACE_INET_Addr &addr,
126 const ACE_INET_Addr &remote_addr)
127 : endpoint_ (addr),
128 remote_addr_ (remote_addr)
130 if (use_reactor)
132 if (ACE_Reactor::instance ()->register_handler
133 (this, ACE_Event_Handler::READ_MASK) == -1)
134 ACE_ERROR ((LM_ERROR,
135 "ACE_Reactor::register_handler: Client\n"));
139 Client::~Client ()
143 ACE_HANDLE
144 Client::get_handle () const
146 return endpoint_.get_handle ();
150 Client::handle_input (ACE_HANDLE)
152 char buf[BUFSIZ];
153 ACE_INET_Addr from_addr;
155 ssize_t n = endpoint_.recv (buf, sizeof buf, from_addr);
157 if (n == -1)
158 ACE_ERROR ((LM_ERROR,
159 "%p\n",
160 "handle_input"));
161 else
162 ACE_DEBUG ((LM_DEBUG,
163 "(%P|%t) buf of size %d = %*s\n",
166 buf));
168 return 0;
172 Client::handle_close (ACE_HANDLE,
173 ACE_Reactor_Mask)
175 this->endpoint_.close ();
176 return 0;
180 Client::send (const char *buf, size_t len)
182 return this->endpoint_.send (buf, len, remote_addr_);
186 Client::get_response (char *buf, size_t len)
188 ACE_INET_Addr addr;
189 return this->endpoint_.recv (buf, len, addr);
193 Client::run ()
195 int ndist = 0;
196 int i;
197 int j;
198 int n;
199 int maxindx = 0;
200 int minindx = 0;
201 char *sbuf = SendBuf;
202 char *rbuf = RxBuf;
204 ACE_High_Res_Timer timer;
205 ACE_hrtime_t sample;
207 int d;
208 double std_dev = 0.0;
209 double std_err = 0.0;
210 double sample_mean = 0.0;
212 int tracking_last_over = 0;
213 ACE_High_Res_Timer since_over;
214 ACE_hrtime_t psum = 0;
215 ACE_hrtime_t sum = 0;
216 ACE_hrtime_t max = 0;
217 ACE_hrtime_t min = (ACE_hrtime_t) (u_int) -1;
218 FILE *sumfp = 0;
219 FILE *distfp = 0;
220 FILE *sampfp = 0;
221 pid_t *pid = (pid_t *) sbuf;
222 int *seq = (int *) (sbuf + sizeof (int));
224 ACE_OS::memset (sbuf, 0, bufsz);
225 ACE_OS::memset (rbuf, 0, bufsz);
227 *pid = ACE_OS::getpid ();
228 *seq = 0;
230 ACE_DEBUG ((LM_DEBUG,
231 "PID = %d, Starting SEQ = %d\n",
232 *pid,
233 *seq));
235 // Allocate memory to hold samples.
236 Samples = (ACE_hrtime_t *) ACE_OS::calloc (nsamples,
237 sizeof (ACE_hrtime_t));
239 for (i = -1, *seq = 0, j = 0;
240 i < (ACE_INT32) nsamples;
241 (*seq)++, i++, j++, timer.reset ())
243 timer.start ();
244 if (this->send (sbuf, bufsz) <= 0)
245 ACE_ERROR_RETURN ((LM_ERROR, "(%P) %p\n", "send"), -1);
247 if ((n = get_response (rbuf, bufsz)) <= 0)
248 ACE_ERROR_RETURN ((LM_ERROR, "(%P) %p\n", "get_response"), -1);
250 timer.stop ();
252 if (n <= 0)
253 ACE_ERROR_RETURN ((LM_ERROR,
254 "\nTrouble receiving from socket!\n\n"),
255 -1);
257 timer.elapsed_time (sample); // in nanoseconds.
259 if (i < 0 )
261 ACE_DEBUG ((LM_DEBUG,
262 "Ignoring first sample of %u usecs\n",
263 (ACE_UINT32) (sample)));
264 continue;
266 else if (max_allow > 0 && sample > max_allow)
268 ACE_DEBUG ((LM_DEBUG, "Sample # %i = "
269 "%u msec is over the limit (%u)!\n",
271 (ACE_UINT32) (sample / (ACE_UINT32) 1000000),
272 (ACE_UINT32) (max_allow / (ACE_UINT32) 1000000)));
274 if (tracking_last_over)
276 since_over.stop ();
277 ACE_Time_Value over_time;
278 since_over.elapsed_time (over_time);
279 ACE_DEBUG ((LM_DEBUG,
280 "\tTime since last over = %u msec!\n",
281 over_time.msec ()));
282 since_over.reset ();
285 tracking_last_over = 1;
286 since_over.start ();
287 i--;
288 continue;
291 Samples[i] = sample;
292 sum += sample;
294 if (min == (ACE_hrtime_t) (u_int) -1)
296 min = sample;
297 minindx = i;
299 if (sample > max)
301 max = sample;
302 maxindx = i;
304 if (sample < min)
306 min = sample;
307 minindx = i;
310 if (VERBOSE)
312 psum += sample;
313 if (j == 500)
315 ACE_DEBUG ((LM_DEBUG,
316 "(%i) Partial (running) mean %u usecs\n",
317 i + 1,
318 (ACE_UINT32) (psum / (ACE_UINT32) (1000 * 500))));
319 j = 0;
320 psum = 0;
325 sample_mean = ((double) ACE_U64_TO_U32 (sum)) / (double) nsamples;
327 if (logfile)
329 ACE_OS::sprintf (sumfile, ACE_TEXT("%s.sum"), datafile);
330 ACE_OS::sprintf (distfile, ACE_TEXT("%s.dist"), datafile);
331 ACE_OS::sprintf (sampfile, ACE_TEXT("%s.samp"), datafile);
333 distfp = ACE_OS::fopen(distfile, ACE_TEXT("w"));
335 if (distfp == 0)
337 ACE_DEBUG ((LM_DEBUG,
338 "Unable to open dist file!\n\n"));
339 logfile = 0;
341 if (logfile && (sampfp = ACE_OS::fopen (sampfile, ACE_TEXT("w"))) == 0)
343 ACE_OS::fclose (distfp);
344 ACE_DEBUG ((LM_DEBUG,
345 "Unable to open sample file!\n\n"));
346 logfile = 0;
348 if (logfile && (sumfp = ACE_OS::fopen (sumfile, ACE_TEXT("w"))) == 0)
350 ACE_OS::fclose (distfp);
351 ACE_OS::fclose (sampfp);
352 ACE_DEBUG ((LM_DEBUG,
353 "Unable to open sample file!\n\n"));
354 logfile = 0;
358 window = window * 1000; // convert to nsec.
359 ndist = (int)((max-min) / window) + 1;
360 Dist = (u_int *) ACE_OS::calloc (ndist,
361 sizeof (u_int));
363 for (i = 0; i < (ACE_INT32) nsamples; i++)
365 std_dev += ((double) ACE_U64_TO_U32 (Samples[i]) - sample_mean) *
366 ((double) ACE_U64_TO_U32 (Samples[i]) - sample_mean);
367 d = (int)((Samples[i] - min)/window);
369 if (d < 0 || d > ndist)
371 ACE_DEBUG ((LM_DEBUG,
372 "\nError indexing into dist array %d (%d)\n\n",
374 ndist));
375 ACE_OS::exit (1);
377 Dist[d] += 1;
378 if (logfile)
379 ACE_OS::fprintf (sampfp,
380 "%u\n",
381 ACE_U64_TO_U32 (Samples[i]));
384 if (logfile)
386 ACE_hrtime_t tmp;
387 tmp = min + (window / 2);
389 for (i = 0; i < ndist; i++)
391 ACE_OS::fprintf (distfp,
392 "%u %d\n",
393 (ACE_UINT32) (tmp / (ACE_UINT32) 1000),
394 Dist[i]);
395 tmp += window;
399 std_dev = (double) sqrt (std_dev / (double) (nsamples - 1.0));
400 std_err = (double) std_dev / sqrt ((double) nsamples);
402 ACE_DEBUG ((LM_DEBUG,
403 "\nResults for %i samples (usec):\n"
404 "\tSample Mean = %f,\n"
405 "\tSample Max = %u, Max index = %d,\n"
406 "\tSample Min = %u, Min index = %d,\n"
407 "\tStandard Deviation = %f,\n"
408 "\tStandard Error = %f\n",
409 nsamples,
410 sample_mean / 1000.0,
411 (u_int) (max / (ACE_UINT32) 1000),
412 maxindx,
413 (u_int) (min / (ACE_UINT32) 1000),
414 minindx,
415 std_dev / 1000.0,
416 std_err / 1000.0));
418 if (logfile)
420 ACE_OS::fprintf (sumfp,
421 "Command executed:\n");
422 for (; *cmd; cmd++)
423 ACE_OS::fprintf (sumfp,
424 "%s ",
425 ACE_TEXT_ALWAYS_CHAR (*cmd));
426 ACE_OS::fprintf (sumfp,
427 "\n");
429 ACE_OS::fprintf (sumfp,
430 "\nResults for %i samples (usec):"
431 "\tSample Mean = %f,\n"
432 "\tSample Max = %u, Max index = %d,\n"
433 "\tSample Min = %u, Min index = %d,\n"
434 "\tStandard Deviation = %f,\n"
435 "\tStandard Error = %f\n",
436 nsamples,
437 sample_mean / 1000.0,
438 (ACE_UINT32) (max / (ACE_UINT32) 1000),
439 maxindx,
440 (ACE_UINT32) (min / (ACE_UINT32) 1000),
441 minindx,
442 std_dev / 1000.0,
443 std_err / 1000.0);
446 return 0;
450 Client::shutdown ()
452 const char buf = 'S';
453 const int n = endpoint_.send (&buf, 1u, remote_addr_);
455 if (use_reactor)
457 if (ACE_Reactor::instance ()->remove_handler
458 (this, ACE_Event_Handler::READ_MASK) == -1)
459 ACE_ERROR_RETURN ((LM_ERROR,
460 "ACE_Reactor::remove_handler: Client\n"),
461 -1);
464 return n;
467 class Server : public ACE_Event_Handler
469 public:
470 Server (const ACE_INET_Addr &addr);
472 virtual ~Server ();
474 // = Override <ACE_Event_Handler> methods.
475 virtual ACE_HANDLE get_handle () const;
476 virtual int handle_input (ACE_HANDLE);
477 virtual int handle_close (ACE_HANDLE handle,
478 ACE_Reactor_Mask close_mask);
480 private:
481 /// Receives datagrams.
482 ACE_SOCK_Dgram endpoint_;
484 Server () = delete;
485 Server (const Server &) = delete;
486 Server &operator= (const Server &) = delete;
489 Server::Server (const ACE_INET_Addr &addr)
490 : endpoint_ (addr)
492 if (use_reactor)
494 if (ACE_Reactor::instance ()->register_handler
495 (this,
496 ACE_Event_Handler::READ_MASK) == -1)
497 ACE_ERROR ((LM_ERROR,
498 "ACE_Reactor::register_handler: Server\n"));
502 Server::~Server ()
506 ACE_HANDLE
507 Server::get_handle () const
509 return endpoint_.get_handle ();
513 Server::handle_input (ACE_HANDLE)
515 char buf[BUFSIZ];
516 ACE_INET_Addr from_addr;
518 ssize_t n = endpoint_.recv (buf, sizeof buf, from_addr);
520 if (n == -1)
521 ACE_DEBUG ((LM_ERROR,
522 "%p\n",
523 "handle_input: recv"));
525 // Send the message back as the response.
526 if (endpoint_.send (buf, n, from_addr) == n)
528 if (n == 1 && buf[0] == 'S')
530 if (use_reactor)
532 if (ACE_Reactor::instance ()->remove_handler
533 (this, ACE_Event_Handler::READ_MASK) == -1)
534 ACE_ERROR_RETURN ((LM_ERROR,
535 "ACE_Reactor::remove_handler: server\n"),
536 -1);
538 ACE_Reactor::end_event_loop ();
540 else
542 // Indicate done by returning 1.
543 return 1;
547 return 0;
549 else
551 ACE_DEBUG ((LM_ERROR,
552 "%p\n",
553 "handle_input: send"));
554 return -1;
559 Server::handle_close (ACE_HANDLE,
560 ACE_Reactor_Mask)
562 endpoint_.close ();
564 return 0;
568 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
570 int c, dstport = DEFPORT;
571 int so_bufsz = 0;
573 cmd = argv;
575 //FUZZ: disable check_for_lack_ACE_OS
576 ACE_Get_Opt getopt (argc, argv, ACE_TEXT("x:w:f:vs:I:p:rtn:b:a"));
578 while ((c = getopt ()) != -1)
580 //FUZZ: enable check_for_lack_ACE_OS
581 switch ((char) c)
583 case 'x':
584 max_allow = ACE_OS::atoi (getopt.opt_arg ());
585 break;
586 case 'w':
587 window = ACE_OS::atoi (getopt.opt_arg ());
588 if (window < 2)
589 ACE_ERROR_RETURN ((LM_ERROR,
590 "Invalid window!\n\n"),
592 break;
593 case 'f':
594 ACE_OS::strcpy (datafile, getopt.opt_arg ());
595 logfile = 1;
596 break;
597 case 'v':
598 VERBOSE = 1;
599 break;
600 case 'b':
601 bufsz = ACE_OS::atoi (getopt.opt_arg ());
603 if (bufsz <= 0)
604 ACE_ERROR_RETURN ((LM_ERROR,
605 "\nBuffer size must be greater than 0!\n\n"),
607 break;
608 case 'n':
609 nsamples = ACE_OS::atoi (getopt.opt_arg ());
610 if (nsamples <= 0)
611 ACE_ERROR_RETURN ((LM_ERROR,
612 "\nIterations must be greater than 0!\n\n"),
614 break;
615 case 'a':
616 use_reactor = 1;
617 break;
618 case 's':
619 so_bufsz = ACE_OS::atoi (getopt.opt_arg ());
621 if (so_bufsz <= 0)
622 ACE_ERROR_RETURN ((LM_ERROR,
623 "\nInvalid socket buffer size!\n\n"),
625 break;
626 case 'I':
627 usdelay = ACE_OS::atoi (getopt.opt_arg ());
629 if (usdelay == 0)
630 ACE_ERROR_RETURN ((LM_ERROR,
631 "%s: bad usdelay: %s\n",
632 argv[0],
633 getopt.opt_arg ()),
635 break;
636 case 'p':
637 dstport = ACE_OS::atoi (getopt.opt_arg ());
638 if (dstport <= 0)
639 ACE_ERROR_RETURN ((LM_ERROR,
640 "\nInvalid port number!\n\n"),
642 break;
643 case 't':
644 server = 0;
645 client = 1;
646 break;
647 case 'r':
648 client = 0;
649 server = 1;
650 break;
651 default:
652 usage ();
653 return 1;
657 if ((getopt.opt_ind () >= argc && client != 0) || argc == 1)
659 usage ();
660 return 1;
663 ACE_INET_Addr addr (server ? dstport : dstport + 1);
665 if (server)
667 Server server (addr);
669 if (use_reactor)
671 ACE_Reactor::run_event_loop ();
673 else
675 // Handle input in the current thread.
676 while (server.handle_input (0) != 1)
678 continue;
682 else
684 if ((u_int) bufsz < sizeof (ACE_hrtime_t))
685 ACE_ERROR_RETURN ((LM_ERROR,
686 "\nbufsz must be >= %d\n",
687 sizeof (ACE_hrtime_t)),
689 ACE_INET_Addr remote_addr;
691 if (ACE_OS::ace_isdigit(argv[getopt.opt_ind ()][0]))
693 if (remote_addr.set (dstport,
694 (ACE_UINT32) ACE_OS::inet_addr
695 (ACE_TEXT_ALWAYS_CHAR(argv[getopt.opt_ind ()]))) == -1)
696 ACE_ERROR_RETURN ((LM_ERROR,
697 "invalid IP address: %s\n",
698 argv[getopt.opt_ind ()]),
701 else
703 if (remote_addr.set (dstport, argv[getopt.opt_ind ()]) == -1)
704 ACE_ERROR_RETURN ((LM_ERROR,
705 "invalid IP address: %s\n",
706 argv[getopt.opt_ind ()]),
709 getopt.opt_ind ()++;
711 Client client (addr, remote_addr);
713 ACE_DEBUG ((LM_DEBUG,
714 "\nSending %d byte packets to %s:%d "
715 "with so_bufsz = %d\n\n",
716 bufsz,
717 addr.get_host_name (),
718 dstport,
719 so_bufsz));
721 client.run ();
722 client.shutdown ();
725 return 0;