Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / ACE / tests / Dev_Poll_Reactor_Test.cpp
blob8dffa050bf385f00831c20bc3cedab721b64f21b
1 //=============================================================================
2 /**
3 * @file Dev_Poll_Reactor_Test.cpp
5 * This test verifies that the Dev_Poll_Reactor is functioning
6 * properly, and demonstrates how "speculative reads" can be
7 * performed. "Speculative reads" are necessary when using event
8 * demultiplexing mechanisms that use a "state change" interface.
9 * Similarly, "speculative writes" may be necessary, i.e. keep
10 * writing until the connection is flow controlled. An example of a
11 * demuxing mechanism with such an interface is Linux's `/dev/epoll'
12 * character device. Mechansims with state change interfaces are
13 * also said to be "edge triggered," versus "level triggered"
14 * mechanisms such as select().
16 * @author Ossama Othman <ossama@dre.vanderbilt.edu>
18 //=============================================================================
20 #include "test_config.h"
22 #if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)
24 #include "ace/OS_NS_signal.h"
25 #include "ace/Reactor.h"
26 #include "ace/Dev_Poll_Reactor.h"
28 #include "ace/Acceptor.h"
29 #include "ace/Connector.h"
31 #include "ace/SOCK_Acceptor.h"
32 #include "ace/SOCK_Connector.h"
33 #include "ace/SOCK_Stream.h"
35 #include "ace/OS_NS_unistd.h"
36 #include "ace/OS_NS_netdb.h"
39 using SVC_HANDLER = ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH>;
41 // ----------------------------------------------------
43 class Client : public SVC_HANDLER
45 public:
46 Client ();
48 //FUZZ: disable check_for_lack_ACE_OS
49 int open (void * = 0) override;
50 //FUZZ: enable check_for_lack_ACE_OS
52 int handle_output (ACE_HANDLE handle) override;
54 int handle_timeout (const ACE_Time_Value &current_time,
55 const void *act) override;
57 int handle_close (ACE_HANDLE handle,
58 ACE_Reactor_Mask mask) override;
60 private:
61 unsigned int call_count_;
65 class Server : public SVC_HANDLER
67 public:
68 Server ();
70 int handle_input (ACE_HANDLE handle) override;
72 int handle_timeout (const ACE_Time_Value &current_time,
73 const void *act) override;
75 int handle_close (ACE_HANDLE handle,
76 ACE_Reactor_Mask mask) override;
78 private:
79 unsigned int call_count_;
82 // ----------------------------------------------------
84 Client::Client ()
85 : call_count_ (0)
89 int
90 Client::open (void *)
92 // ACE_TEST_ASSERT (this->reactor () != 0);
94 if (this->reactor ()
95 && this->reactor ()->register_handler (
96 this,
97 ACE_Event_Handler::WRITE_MASK) == -1)
98 ACE_ERROR_RETURN ((LM_ERROR,
99 ACE_TEXT ("(%t) %p\n"),
100 ACE_TEXT ("unable to register client handler")),
101 -1);
103 return 0;
107 Client::handle_output (ACE_HANDLE)
109 for (int i = 1; i <= 5; ++i)
111 char buffer[BUFSIZ] = { 0 };
113 ACE_OS::snprintf (buffer, BUFSIZ, "test message %d.\n", i);
115 ssize_t bytes_sent =
116 this->peer ().send (buffer, ACE_OS::strlen (buffer));
118 if (bytes_sent == -1)
120 if (errno == EWOULDBLOCK)
121 return 0; // Flow control kicked in.
122 else if (errno == EPIPE || errno == ECONNRESET)
124 ACE_DEBUG ((LM_DEBUG,
125 ACE_TEXT ("(%t) Client::handle_output; server ")
126 ACE_TEXT ("closed handle %d\n"),
127 this->peer ().get_handle ()));
128 return -1;
130 else
131 ACE_ERROR_RETURN ((LM_ERROR,
132 ACE_TEXT ("(%t) %p\n"),
133 ACE_TEXT ("Client::handle_output")),
134 -1);
136 else if (bytes_sent == 0)
137 return -1;
138 else
139 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Sent %s"), buffer));
142 return 0;
146 Client::handle_timeout (const ACE_Time_Value &, const void *)
148 ACE_DEBUG ((LM_INFO,
149 ACE_TEXT ("(%t) Expected client timeout occurred at: %T\n")));
151 this->call_count_++;
153 int status = this->handle_output (this->get_handle ());
154 if (status == -1 || this->call_count_ > 10)
156 if (this->reactor ()->end_reactor_event_loop () == 0)
157 ACE_DEBUG ((LM_INFO,
158 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
159 else
160 ACE_ERROR ((LM_ERROR,
161 ACE_TEXT ("(%t) %p\n"),
162 ACE_TEXT ("Failed client reactor shutdown")));
164 // Force this service handler to be closed in either case.
165 return -1;
168 return 0;
172 Client::handle_close (ACE_HANDLE handle,
173 ACE_Reactor_Mask mask)
175 ACE_DEBUG ((LM_INFO,
176 ACE_TEXT ("(%t) Client Svc_Handler closed ")
177 ACE_TEXT ("handle <%d> with reactor mask <0x%x>.\n"),
178 handle,
179 mask));
181 // There is no point in running reactor after this client is closed.
182 if (this->reactor ()->end_reactor_event_loop () == 0)
183 ACE_DEBUG ((LM_INFO,
184 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
185 else
186 ACE_ERROR ((LM_ERROR,
187 ACE_TEXT ("(%t) %p\n"),
188 ACE_TEXT ("Failed client reactor shutdown")));
190 return SVC_HANDLER::handle_close (handle, mask);
193 // ----------------------------------------------------
195 Server::Server ()
196 : call_count_ (0)
201 Server::handle_input (ACE_HANDLE /* handle */)
203 char buffer[BUFSIZ+1] = { 0 }; // Insure a trailing nul
204 ssize_t bytes_read = 0;
206 char * const begin = buffer;
207 char * const end = buffer + BUFSIZ;
209 for (char * buf = begin; buf < end; buf += bytes_read)
211 // Keep reading until it is no longer possible to do so.
213 // This is done since the underlying event demultiplexing
214 // mechanism may have a "state change" interface (as opposed to
215 // "state monitoring"), in which case a "speculative" read is
216 // done.
217 bytes_read = this->peer ().recv (buf, end - buf);
219 ACE_DEBUG ((LM_DEBUG,
220 ACE_TEXT ("****** bytes_read = %d\n"),
221 bytes_read));
223 if (bytes_read == -1)
225 if (errno == EWOULDBLOCK)
227 // ACE_HEX_DUMP ((LM_DEBUG,
228 // buf,
229 // 80,
230 // "BUFFER CONTENTS"));
231 if (buf == buffer)
232 return 0;
233 else
234 break;
236 else
237 ACE_ERROR_RETURN ((LM_ERROR,
238 ACE_TEXT ("(%t) %p\n"),
239 ACE_TEXT ("Server::handle_input")),
240 -1);
242 else if (bytes_read == 0)
243 return -1;
246 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Message received: %s\n"), buffer));
248 return 0;
252 Server::handle_timeout (const ACE_Time_Value &,
253 const void *)
255 ACE_DEBUG ((LM_INFO,
256 ACE_TEXT ("(%t) Expected server timeout occurred at: %T\n")));
258 // if (this->call_count_ == 0
259 // && this->handle_input (this->get_handle ()) != 0
260 // && errno != EWOULDBLOCK)
261 // return -1;
263 // ACE_DEBUG ((LM_INFO,
264 // "SERVER HANDLE = %d\n",
265 // this->get_handle ()));
268 this->call_count_++;
270 if (this->call_count_ > 10)
272 if (this->reactor ()->end_reactor_event_loop () == 0)
273 ACE_DEBUG ((LM_INFO,
274 ACE_TEXT ("(%t) Successful server reactor shutdown.\n")));
275 else
276 ACE_ERROR ((LM_ERROR,
277 ACE_TEXT ("(%t) %p\n"),
278 ACE_TEXT ("Failed server reactor shutdown")));
280 // Force this service handler to be closed in either case.
281 return -1;
284 return 0;
288 Server::handle_close (ACE_HANDLE handle,
289 ACE_Reactor_Mask mask)
291 if (this->call_count_ > 4)
293 ACE_DEBUG ((LM_INFO,
294 ACE_TEXT ("(%t) Server Svc_Handler closing ")
295 ACE_TEXT ("handle <%d,%d> with reactor mask <0x%x>.\n"),
296 handle,
297 this->get_handle (),
298 mask));
301 return SVC_HANDLER::handle_close (handle, mask);
304 // ----------------------------------------------------
306 using ACCEPTOR = ACE_Acceptor<Server, ACE_SOCK_Acceptor>;
307 using CONNECTOR = ACE_Connector<Client, ACE_SOCK_Connector>;
309 // ----------------------------------------------------
311 class TestAcceptor : public ACCEPTOR
313 public:
314 int accept_svc_handler (Server * handler) override
316 int result = this->ACCEPTOR::accept_svc_handler (handler);
318 if (result != 0)
320 if (errno != EWOULDBLOCK)
321 ACE_ERROR ((LM_ERROR,
322 ACE_TEXT ("(%t) %p\n"),
323 ACE_TEXT ("Unable to accept connection")));
325 return result;
328 ACE_DEBUG ((LM_DEBUG,
329 ACE_TEXT ("(%t) Accepted connection. ")
330 ACE_TEXT ("Stream handle: <%d>\n"),
331 handler->get_handle ()));
333 // if (handler->handle_input (handler->get_handle ()) == -1
334 // && errno != EWOULDBLOCK)
335 // return -1;
337 // #if 0
338 ACE_Time_Value delay (2, 0);
339 ACE_Time_Value restart (2, 0);
340 if (handler->reactor ()->schedule_timer (handler,
342 delay,
343 restart) == -1)
345 ACE_ERROR_RETURN ((LM_ERROR,
346 ACE_TEXT ("(%t) %p\n"),
347 ACE_TEXT ("Unable to schedule server side ")
348 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
349 -1);
351 // #endif /* 0 */
353 return result;
357 // ----------------------------------------------------
359 class TestConnector : public CONNECTOR
361 public:
362 int connect_svc_handler (
363 CONNECTOR::handler_type *& handler,
364 const CONNECTOR::addr_type &remote_addr,
365 ACE_Time_Value *timeout,
366 const CONNECTOR::addr_type &local_addr,
367 int reuse_addr,
368 int flags,
369 int perms) override
371 const int result = this->CONNECTOR::connect_svc_handler (handler,
372 remote_addr,
373 timeout,
374 local_addr,
375 reuse_addr,
376 flags,
377 perms);
379 if (result != 0)
380 return result;
382 ACE_TCHAR hostname[MAXHOSTNAMELEN];
383 if (remote_addr.get_host_name (hostname,
384 sizeof (hostname)) != 0)
386 ACE_ERROR_RETURN ((LM_ERROR,
387 ACE_TEXT ("(%t) %p\n"),
388 ACE_TEXT ("Unable to retrieve hostname")),
389 -1);
392 ACE_DEBUG ((LM_DEBUG,
393 ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
394 hostname,
395 (int) remote_addr.get_port_number ()));
397 // #if 0
398 ACE_Time_Value delay (4, 0);
399 ACE_Time_Value restart (3, 0);
400 if (handler->reactor ()->schedule_timer (handler,
402 delay,
403 restart) == -1)
405 ACE_ERROR_RETURN ((LM_ERROR,
406 ACE_TEXT ("(%t) %p\n"),
407 ACE_TEXT ("Unable to schedule client side ")
408 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
409 -1);
411 // #endif /* 0 */
413 return result;
416 int connect_svc_handler (
417 CONNECTOR::handler_type *& handler,
418 CONNECTOR::handler_type *& sh_copy,
419 const CONNECTOR::addr_type &remote_addr,
420 ACE_Time_Value *timeout,
421 const CONNECTOR::addr_type &local_addr,
422 int reuse_addr,
423 int flags,
424 int perms) override {
425 sh_copy = handler;
426 return this->connect_svc_handler (handler, remote_addr, timeout,
427 local_addr, reuse_addr, flags,
428 perms);
432 // ----------------------------------------------------
434 static int
435 disable_signal (int sigmin, int sigmax)
437 #if !defined (ACE_LACKS_UNIX_SIGNALS)
438 sigset_t signal_set;
439 if (ACE_OS::sigemptyset (&signal_set) == - 1)
440 ACE_ERROR ((LM_ERROR,
441 ACE_TEXT ("Error: (%P|%t):%p\n"),
442 ACE_TEXT ("sigemptyset failed")));
444 for (int i = sigmin; i <= sigmax; i++)
445 ACE_OS::sigaddset (&signal_set, i);
447 // Put the <signal_set>.
448 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
449 // In multi-threaded application this is not POSIX compliant
450 // but let's leave it just in case.
451 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
452 # else
453 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
454 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
455 ACE_ERROR_RETURN ((LM_ERROR,
456 ACE_TEXT ("Error: (%P|%t): %p\n"),
457 ACE_TEXT ("SIG_BLOCK failed")),
458 -1);
459 #else
460 ACE_UNUSED_ARG (sigmin);
461 ACE_UNUSED_ARG (sigmax);
462 #endif /* ACE_LACKS_UNIX_SIGNALS */
464 return 0;
467 // ----------------------------------------------------
469 ACE_THR_FUNC_RETURN
470 server_worker (void *p)
472 disable_signal (SIGPIPE, SIGPIPE);
474 const unsigned short port = *(static_cast<unsigned short *> (p));
476 ACE_INET_Addr addr;
478 if (addr.set (port, INADDR_LOOPBACK) != 0)
480 ACE_ERROR ((LM_ERROR,
481 ACE_TEXT ("(%t) %p\n"),
482 ACE_TEXT ("server_worker - ACE_INET_Addr::set")));
484 return (void *) -1;
487 ACE_Dev_Poll_Reactor dp_reactor;
488 dp_reactor.restart (1); // Restart on EINTR
489 ACE_Reactor reactor (&dp_reactor);
491 TestAcceptor server;
493 int flags = 0;
494 ACE_SET_BITS (flags, ACE_NONBLOCK); // Enable non-blocking in the
495 // Svc_Handlers.
497 if (server.open (addr, &reactor, flags) != 0)
499 ACE_ERROR ((LM_ERROR,
500 ACE_TEXT ("(%t) %p\n"),
501 ACE_TEXT ("Unable to open server service handler")));
503 return (void *) -1;
506 if (reactor.run_reactor_event_loop () != 0)
508 ACE_ERROR ((LM_ERROR,
509 ACE_TEXT ("(%t) %p\n"),
510 ACE_TEXT ("Error when running server ")
511 ACE_TEXT ("reactor event loop")));
513 return (void *) -1;
516 ACE_DEBUG ((LM_INFO,
517 ACE_TEXT ("(%t) Reactor event loop finished ")
518 ACE_TEXT ("successfully.\n")));
520 return 0;
523 // ----------------------------------------------------
525 // struct server_arg
526 // {
527 // unsigned short port;
529 // ACE_Condition<ACE_SYNCH_MUTEX> * cv;
530 // };
532 // ----------------------------------------------------
535 run_main (int, ACE_TCHAR *[])
537 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
539 // Make sure we ignore SIGPIPE
540 disable_signal (SIGPIPE, SIGPIPE);
542 ACE_Dev_Poll_Reactor dp_reactor;
543 dp_reactor.restart (1); // Restart on EINTR
544 ACE_Reactor reactor (&dp_reactor);
546 TestConnector client;
548 int flags = 0;
549 ACE_SET_BITS (flags, ACE_NONBLOCK); // Enable non-blocking in the
550 // Svc_Handlers.
552 if (client.open (&reactor, flags) != 0)
553 ACE_ERROR_RETURN ((LM_ERROR,
554 ACE_TEXT ("(%t) %p\n"),
555 ACE_TEXT ("Unable to open client service handler")),
556 -1);
558 // ACE_SYNCH_MUTEX mutex;
559 // ACE_Condition<ACE_SYNCH_MUTEX> cv (mutex);
561 // server_arg arg;
562 // arg.port = 54678; // Port the server will listen on.
563 // arg.cv = &cv;
565 unsigned short port = 54678;
567 if (ACE_Thread_Manager::instance ()->spawn (server_worker, &port) == -1)
568 ACE_ERROR_RETURN ((LM_ERROR,
569 ACE_TEXT ("(%t) %p\n"),
570 ACE_TEXT ("Unable to spawn server thread")),
571 -1);
573 ACE_OS::sleep (5); // Wait for the listening endpoint to be set up.
575 ACE_INET_Addr addr;
576 if (addr.set (port, INADDR_LOOPBACK) != 0)
577 ACE_ERROR_RETURN ((LM_ERROR,
578 ACE_TEXT ("(%t) %p\n"),
579 ACE_TEXT ("ACE_INET_Addr::set")),
580 -1);
582 Client *client_handler = 0;
584 if (client.connect (client_handler, addr) != 0)
585 ACE_ERROR_RETURN ((LM_ERROR,
586 ACE_TEXT ("(%t) %p\n"),
587 ACE_TEXT ("Unable to connect to server")),
588 -1);
590 if (reactor.run_reactor_event_loop () != 0)
591 ACE_ERROR_RETURN ((LM_ERROR,
592 ACE_TEXT ("(%t) %p\n"),
593 ACE_TEXT ("Error when running client ")
594 ACE_TEXT ("reactor event loop")),
595 -1);
597 if (ACE_Thread_Manager::instance ()->wait () != 0)
598 ACE_ERROR_RETURN ((LM_ERROR,
599 ACE_TEXT ("(%t) %p\n"),
600 ACE_TEXT ("Error waiting for threads to complete")),
601 -1);
603 ACE_END_TEST;
605 return 0;
608 #else
611 run_main (int, ACE_TCHAR *[])
613 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
614 ACE_ERROR ((LM_INFO,
615 ACE_TEXT ("Dev Poll and Event Poll are not supported ")
616 ACE_TEXT ("on this platform\n")));
617 ACE_END_TEST;
618 return 0;
621 #endif /* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */