Doxygen changes
[ACE_TAO.git] / ACE / tests / Dev_Poll_Reactor_Test.cpp
blob3d0230593343591c3f260a3a347bded519aa2c2b
1 //=============================================================================
2 /**
3 * @file Dev_Poll_Reactor_Test.cpp
5 * This test verifies that the Dev_Poll_Reactor is functioning
6 * properly, and demonstrates how "speculative reads" can be
7 * performed. "Speculative reads" are necessary when using event
8 * demultiplexing mechanisms that use a "state change" interface.
9 * Similarly, "speculative writes" may be necessary, i.e. keep
10 * writing until the connection is flow controlled. An example of a
11 * demuxing mechanism with such an interface is Linux's `/dev/epoll'
12 * character device. Mechansims with state change interfaces are
13 * also said to be "edge triggered," versus "level triggered"
14 * mechanisms such as select().
16 * @author Ossama Othman <ossama@dre.vanderbilt.edu>
18 //=============================================================================
20 #include "test_config.h"
22 #if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)
24 #include "ace/OS_NS_signal.h"
25 #include "ace/Reactor.h"
26 #include "ace/Dev_Poll_Reactor.h"
28 #include "ace/Acceptor.h"
29 #include "ace/Connector.h"
31 #include "ace/SOCK_Acceptor.h"
32 #include "ace/SOCK_Connector.h"
33 #include "ace/SOCK_Stream.h"
35 #include "ace/OS_NS_unistd.h"
36 #include "ace/OS_NS_netdb.h"
39 typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> SVC_HANDLER;
41 // ----------------------------------------------------
43 class Client : public SVC_HANDLER
45 public:
47 Client (void);
49 //FUZZ: disable check_for_lack_ACE_OS
50 virtual int open (void * = 0);
51 //FUZZ: enable check_for_lack_ACE_OS
53 virtual int handle_output (ACE_HANDLE handle);
55 virtual int handle_timeout (const ACE_Time_Value &current_time,
56 const void *act);
58 virtual int handle_close (ACE_HANDLE handle,
59 ACE_Reactor_Mask mask);
61 private:
63 unsigned int call_count_;
68 class Server : public SVC_HANDLER
70 public:
72 Server (void);
74 virtual int handle_input (ACE_HANDLE handle);
76 virtual int handle_timeout (const ACE_Time_Value &current_time,
77 const void *act);
79 virtual int handle_close (ACE_HANDLE handle,
80 ACE_Reactor_Mask mask);
82 private:
84 unsigned int call_count_;
88 // ----------------------------------------------------
90 Client::Client (void)
91 : call_count_ (0)
95 int
96 Client::open (void *)
98 // ACE_TEST_ASSERT (this->reactor () != 0);
100 if (this->reactor ()
101 && this->reactor ()->register_handler (
102 this,
103 ACE_Event_Handler::WRITE_MASK) == -1)
104 ACE_ERROR_RETURN ((LM_ERROR,
105 ACE_TEXT ("(%t) %p\n"),
106 ACE_TEXT ("unable to register client handler")),
107 -1);
109 return 0;
113 Client::handle_output (ACE_HANDLE)
115 for (int i = 1; i <= 5; ++i)
117 char buffer[BUFSIZ] = { 0 };
119 ACE_OS::snprintf (buffer, BUFSIZ, "test message %d.\n", i);
121 ssize_t bytes_sent =
122 this->peer ().send (buffer, ACE_OS::strlen (buffer));
124 if (bytes_sent == -1)
126 if (errno == EWOULDBLOCK)
127 return 0; // Flow control kicked in.
128 else if (errno == EPIPE || errno == ECONNRESET)
130 ACE_DEBUG ((LM_DEBUG,
131 ACE_TEXT ("(%t) Client::handle_output; server ")
132 ACE_TEXT ("closed handle %d\n"),
133 this->peer ().get_handle ()));
134 return -1;
136 else
137 ACE_ERROR_RETURN ((LM_ERROR,
138 ACE_TEXT ("(%t) %p\n"),
139 ACE_TEXT ("Client::handle_output")),
140 -1);
142 else if (bytes_sent == 0)
143 return -1;
144 else
145 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Sent %s"), buffer));
148 return 0;
152 Client::handle_timeout (const ACE_Time_Value &, const void *)
154 ACE_DEBUG ((LM_INFO,
155 ACE_TEXT ("(%t) Expected client timeout occurred at: %T\n")));
157 this->call_count_++;
159 int status = this->handle_output (this->get_handle ());
160 if (status == -1 || this->call_count_ > 10)
162 if (this->reactor ()->end_reactor_event_loop () == 0)
163 ACE_DEBUG ((LM_INFO,
164 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
165 else
166 ACE_ERROR ((LM_ERROR,
167 ACE_TEXT ("(%t) %p\n"),
168 ACE_TEXT ("Failed client reactor shutdown")));
170 // Force this service handler to be closed in either case.
171 return -1;
174 return 0;
178 Client::handle_close (ACE_HANDLE handle,
179 ACE_Reactor_Mask mask)
181 ACE_DEBUG ((LM_INFO,
182 ACE_TEXT ("(%t) Client Svc_Handler closed ")
183 ACE_TEXT ("handle <%d> with reactor mask <0x%x>.\n"),
184 handle,
185 mask));
187 // There is no point in running reactor after this client is closed.
188 if (this->reactor ()->end_reactor_event_loop () == 0)
189 ACE_DEBUG ((LM_INFO,
190 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
191 else
192 ACE_ERROR ((LM_ERROR,
193 ACE_TEXT ("(%t) %p\n"),
194 ACE_TEXT ("Failed client reactor shutdown")));
196 return SVC_HANDLER::handle_close (handle, mask);
199 // ----------------------------------------------------
201 Server::Server (void)
202 : call_count_ (0)
207 Server::handle_input (ACE_HANDLE /* handle */)
209 char buffer[BUFSIZ+1] = { 0 }; // Insure a trailing nul
210 ssize_t bytes_read = 0;
212 char * const begin = buffer;
213 char * const end = buffer + BUFSIZ;
215 for (char * buf = begin; buf < end; buf += bytes_read)
217 // Keep reading until it is no longer possible to do so.
219 // This is done since the underlying event demultiplexing
220 // mechanism may have a "state change" interface (as opposed to
221 // "state monitoring"), in which case a "speculative" read is
222 // done.
223 bytes_read = this->peer ().recv (buf, end - buf);
225 ACE_DEBUG ((LM_DEBUG,
226 ACE_TEXT ("****** bytes_read = %d\n"),
227 bytes_read));
229 if (bytes_read == -1)
231 if (errno == EWOULDBLOCK)
234 // ACE_HEX_DUMP ((LM_DEBUG,
235 // buf,
236 // 80,
237 // "BUFFER CONTENTS"));
238 if (buf == buffer)
239 return 0;
240 else
241 break;
243 else
244 ACE_ERROR_RETURN ((LM_ERROR,
245 ACE_TEXT ("(%t) %p\n"),
246 ACE_TEXT ("Server::handle_input")),
247 -1);
249 else if (bytes_read == 0)
250 return -1;
253 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Message received: %s\n"), buffer));
255 return 0;
259 Server::handle_timeout (const ACE_Time_Value &,
260 const void *)
262 ACE_DEBUG ((LM_INFO,
263 ACE_TEXT ("(%t) Expected server timeout occurred at: %T\n")));
265 // if (this->call_count_ == 0
266 // && this->handle_input (this->get_handle ()) != 0
267 // && errno != EWOULDBLOCK)
268 // return -1;
270 // ACE_DEBUG ((LM_INFO,
271 // "SERVER HANDLE = %d\n",
272 // this->get_handle ()));
275 this->call_count_++;
277 if (this->call_count_ > 10)
279 if (this->reactor ()->end_reactor_event_loop () == 0)
280 ACE_DEBUG ((LM_INFO,
281 ACE_TEXT ("(%t) Successful server reactor shutdown.\n")));
282 else
283 ACE_ERROR ((LM_ERROR,
284 ACE_TEXT ("(%t) %p\n"),
285 ACE_TEXT ("Failed server reactor shutdown")));
287 // Force this service handler to be closed in either case.
288 return -1;
291 return 0;
295 Server::handle_close (ACE_HANDLE handle,
296 ACE_Reactor_Mask mask)
298 if (this->call_count_ > 4)
300 ACE_DEBUG ((LM_INFO,
301 ACE_TEXT ("(%t) Server Svc_Handler closing ")
302 ACE_TEXT ("handle <%d,%d> with reactor mask <0x%x>.\n"),
303 handle,
304 this->get_handle (),
305 mask));
308 return SVC_HANDLER::handle_close (handle, mask);
311 // ----------------------------------------------------
313 typedef ACE_Acceptor<Server, ACE_SOCK_ACCEPTOR> ACCEPTOR;
314 typedef ACE_Connector<Client, ACE_SOCK_CONNECTOR> CONNECTOR;
316 // ----------------------------------------------------
318 class TestAcceptor : public ACCEPTOR
320 public:
322 virtual int accept_svc_handler (Server * handler)
324 int result = this->ACCEPTOR::accept_svc_handler (handler);
326 if (result != 0)
328 if (errno != EWOULDBLOCK)
329 ACE_ERROR ((LM_ERROR,
330 ACE_TEXT ("(%t) %p\n"),
331 ACE_TEXT ("Unable to accept connection")));
333 return result;
336 ACE_DEBUG ((LM_DEBUG,
337 ACE_TEXT ("(%t) Accepted connection. ")
338 ACE_TEXT ("Stream handle: <%d>\n"),
339 handler->get_handle ()));
341 // if (handler->handle_input (handler->get_handle ()) == -1
342 // && errno != EWOULDBLOCK)
343 // return -1;
345 // #if 0
346 ACE_Time_Value delay (2, 0);
347 ACE_Time_Value restart (2, 0);
348 if (handler->reactor ()->schedule_timer (handler,
350 delay,
351 restart) == -1)
353 ACE_ERROR_RETURN ((LM_ERROR,
354 ACE_TEXT ("(%t) %p\n"),
355 ACE_TEXT ("Unable to schedule server side ")
356 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
357 -1);
359 // #endif /* 0 */
361 return result;
366 // ----------------------------------------------------
368 class TestConnector : public CONNECTOR
370 public:
372 virtual int connect_svc_handler (
373 CONNECTOR::handler_type *& handler,
374 const CONNECTOR::addr_type &remote_addr,
375 ACE_Time_Value *timeout,
376 const CONNECTOR::addr_type &local_addr,
377 int reuse_addr,
378 int flags,
379 int perms)
381 const int result = this->CONNECTOR::connect_svc_handler (handler,
382 remote_addr,
383 timeout,
384 local_addr,
385 reuse_addr,
386 flags,
387 perms);
389 if (result != 0)
390 return result;
392 ACE_TCHAR hostname[MAXHOSTNAMELEN];
393 if (remote_addr.get_host_name (hostname,
394 sizeof (hostname)) != 0)
396 ACE_ERROR_RETURN ((LM_ERROR,
397 ACE_TEXT ("(%t) %p\n"),
398 ACE_TEXT ("Unable to retrieve hostname")),
399 -1);
402 ACE_DEBUG ((LM_DEBUG,
403 ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
404 hostname,
405 (int) remote_addr.get_port_number ()));
407 // #if 0
408 ACE_Time_Value delay (4, 0);
409 ACE_Time_Value restart (3, 0);
410 if (handler->reactor ()->schedule_timer (handler,
412 delay,
413 restart) == -1)
415 ACE_ERROR_RETURN ((LM_ERROR,
416 ACE_TEXT ("(%t) %p\n"),
417 ACE_TEXT ("Unable to schedule client side ")
418 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
419 -1);
421 // #endif /* 0 */
423 return result;
426 virtual int connect_svc_handler (
427 CONNECTOR::handler_type *& handler,
428 CONNECTOR::handler_type *& sh_copy,
429 const CONNECTOR::addr_type &remote_addr,
430 ACE_Time_Value *timeout,
431 const CONNECTOR::addr_type &local_addr,
432 int reuse_addr,
433 int flags,
434 int perms) {
435 sh_copy = handler;
436 return this->connect_svc_handler (handler, remote_addr, timeout,
437 local_addr, reuse_addr, flags,
438 perms);
442 // ----------------------------------------------------
444 static int
445 disable_signal (int sigmin, int sigmax)
447 #if !defined (ACE_LACKS_UNIX_SIGNALS)
448 sigset_t signal_set;
449 if (ACE_OS::sigemptyset (&signal_set) == - 1)
450 ACE_ERROR ((LM_ERROR,
451 ACE_TEXT ("Error: (%P|%t):%p\n"),
452 ACE_TEXT ("sigemptyset failed")));
454 for (int i = sigmin; i <= sigmax; i++)
455 ACE_OS::sigaddset (&signal_set, i);
457 // Put the <signal_set>.
458 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
459 // In multi-threaded application this is not POSIX compliant
460 // but let's leave it just in case.
461 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
462 # else
463 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
464 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
465 ACE_ERROR_RETURN ((LM_ERROR,
466 ACE_TEXT ("Error: (%P|%t): %p\n"),
467 ACE_TEXT ("SIG_BLOCK failed")),
468 -1);
469 #else
470 ACE_UNUSED_ARG (sigmin);
471 ACE_UNUSED_ARG (sigmax);
472 #endif /* ACE_LACKS_UNIX_SIGNALS */
474 return 0;
477 // ----------------------------------------------------
479 ACE_THR_FUNC_RETURN
480 server_worker (void *p)
482 disable_signal (SIGPIPE, SIGPIPE);
484 const unsigned short port = *(static_cast<unsigned short *> (p));
486 ACE_INET_Addr addr;
488 if (addr.set (port, INADDR_LOOPBACK) != 0)
490 ACE_ERROR ((LM_ERROR,
491 ACE_TEXT ("(%t) %p\n"),
492 ACE_TEXT ("server_worker - ACE_INET_Addr::set")));
494 return (void *) -1;
497 ACE_Dev_Poll_Reactor dp_reactor;
498 dp_reactor.restart (1); // Restart on EINTR
499 ACE_Reactor reactor (&dp_reactor);
501 TestAcceptor server;
503 int flags = 0;
504 ACE_SET_BITS (flags, ACE_NONBLOCK); // Enable non-blocking in the
505 // Svc_Handlers.
507 if (server.open (addr, &reactor, flags) != 0)
509 ACE_ERROR ((LM_ERROR,
510 ACE_TEXT ("(%t) %p\n"),
511 ACE_TEXT ("Unable to open server service handler")));
513 return (void *) -1;
516 if (reactor.run_reactor_event_loop () != 0)
518 ACE_ERROR ((LM_ERROR,
519 ACE_TEXT ("(%t) %p\n"),
520 ACE_TEXT ("Error when running server ")
521 ACE_TEXT ("reactor event loop")));
523 return (void *) -1;
526 ACE_DEBUG ((LM_INFO,
527 ACE_TEXT ("(%t) Reactor event loop finished ")
528 ACE_TEXT ("successfully.\n")));
530 return 0;
533 // ----------------------------------------------------
535 // struct server_arg
536 // {
537 // unsigned short port;
539 // ACE_Condition<ACE_SYNCH_MUTEX> * cv;
540 // };
542 // ----------------------------------------------------
545 run_main (int, ACE_TCHAR *[])
547 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
549 // Make sure we ignore SIGPIPE
550 disable_signal (SIGPIPE, SIGPIPE);
552 ACE_Dev_Poll_Reactor dp_reactor;
553 dp_reactor.restart (1); // Restart on EINTR
554 ACE_Reactor reactor (&dp_reactor);
556 TestConnector client;
558 int flags = 0;
559 ACE_SET_BITS (flags, ACE_NONBLOCK); // Enable non-blocking in the
560 // Svc_Handlers.
562 if (client.open (&reactor, flags) != 0)
563 ACE_ERROR_RETURN ((LM_ERROR,
564 ACE_TEXT ("(%t) %p\n"),
565 ACE_TEXT ("Unable to open client service handler")),
566 -1);
568 // ACE_SYNCH_MUTEX mutex;
569 // ACE_Condition<ACE_SYNCH_MUTEX> cv (mutex);
571 // server_arg arg;
572 // arg.port = 54678; // Port the server will listen on.
573 // arg.cv = &cv;
575 unsigned short port = 54678;
577 if (ACE_Thread_Manager::instance ()->spawn (server_worker, &port) == -1)
578 ACE_ERROR_RETURN ((LM_ERROR,
579 ACE_TEXT ("(%t) %p\n"),
580 ACE_TEXT ("Unable to spawn server thread")),
581 -1);
583 ACE_OS::sleep (5); // Wait for the listening endpoint to be set up.
585 ACE_INET_Addr addr;
586 if (addr.set (port, INADDR_LOOPBACK) != 0)
587 ACE_ERROR_RETURN ((LM_ERROR,
588 ACE_TEXT ("(%t) %p\n"),
589 ACE_TEXT ("ACE_INET_Addr::set")),
590 -1);
592 Client *client_handler = 0;
594 if (client.connect (client_handler, addr) != 0)
595 ACE_ERROR_RETURN ((LM_ERROR,
596 ACE_TEXT ("(%t) %p\n"),
597 ACE_TEXT ("Unable to connect to server")),
598 -1);
600 if (reactor.run_reactor_event_loop () != 0)
601 ACE_ERROR_RETURN ((LM_ERROR,
602 ACE_TEXT ("(%t) %p\n"),
603 ACE_TEXT ("Error when running client ")
604 ACE_TEXT ("reactor event loop")),
605 -1);
607 if (ACE_Thread_Manager::instance ()->wait () != 0)
608 ACE_ERROR_RETURN ((LM_ERROR,
609 ACE_TEXT ("(%t) %p\n"),
610 ACE_TEXT ("Error waiting for threads to complete")),
611 -1);
613 ACE_END_TEST;
615 return 0;
618 #else
621 run_main (int, ACE_TCHAR *[])
623 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
624 ACE_ERROR ((LM_INFO,
625 ACE_TEXT ("Dev Poll and Event Poll are not supported ")
626 ACE_TEXT ("on this platform\n")));
627 ACE_END_TEST;
628 return 0;
631 #endif /* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */