1 //=============================================================================
3 * @file Dev_Poll_Reactor_Test.cpp
5 * This test verifies that the Dev_Poll_Reactor is functioning
6 * properly, and demonstrates how "speculative reads" can be
7 * performed. "Speculative reads" are necessary when using event
8 * demultiplexing mechanisms that use a "state change" interface.
9 * Similarly, "speculative writes" may be necessary, i.e. keep
10 * writing until the connection is flow controlled. An example of a
11 * demuxing mechanism with such an interface is Linux's `/dev/epoll'
12 * character device. Mechansims with state change interfaces are
13 * also said to be "edge triggered," versus "level triggered"
14 * mechanisms such as select().
16 * @author Ossama Othman <ossama@dre.vanderbilt.edu>
18 //=============================================================================
20 #include "test_config.h"
22 #if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)
24 #include "ace/OS_NS_signal.h"
25 #include "ace/Reactor.h"
26 #include "ace/Dev_Poll_Reactor.h"
28 #include "ace/Acceptor.h"
29 #include "ace/Connector.h"
31 #include "ace/SOCK_Acceptor.h"
32 #include "ace/SOCK_Connector.h"
33 #include "ace/SOCK_Stream.h"
35 #include "ace/OS_NS_unistd.h"
36 #include "ace/OS_NS_netdb.h"
39 using SVC_HANDLER
= ACE_Svc_Handler
<ACE_SOCK_Stream
, ACE_NULL_SYNCH
>;
41 // ----------------------------------------------------
43 class Client
: public SVC_HANDLER
48 //FUZZ: disable check_for_lack_ACE_OS
49 int open (void * = 0) override
;
50 //FUZZ: enable check_for_lack_ACE_OS
52 int handle_output (ACE_HANDLE handle
) override
;
54 int handle_timeout (const ACE_Time_Value
¤t_time
,
55 const void *act
) override
;
57 int handle_close (ACE_HANDLE handle
,
58 ACE_Reactor_Mask mask
) override
;
61 unsigned int call_count_
;
65 class Server
: public SVC_HANDLER
70 int handle_input (ACE_HANDLE handle
) override
;
72 int handle_timeout (const ACE_Time_Value
¤t_time
,
73 const void *act
) override
;
75 int handle_close (ACE_HANDLE handle
,
76 ACE_Reactor_Mask mask
) override
;
79 unsigned int call_count_
;
82 // ----------------------------------------------------
92 // ACE_TEST_ASSERT (this->reactor () != 0);
95 && this->reactor ()->register_handler (
97 ACE_Event_Handler::WRITE_MASK
) == -1)
98 ACE_ERROR_RETURN ((LM_ERROR
,
99 ACE_TEXT ("(%t) %p\n"),
100 ACE_TEXT ("unable to register client handler")),
107 Client::handle_output (ACE_HANDLE
)
109 for (int i
= 1; i
<= 5; ++i
)
111 char buffer
[BUFSIZ
] = { 0 };
113 ACE_OS::snprintf (buffer
, BUFSIZ
, "test message %d.\n", i
);
116 this->peer ().send (buffer
, ACE_OS::strlen (buffer
));
118 if (bytes_sent
== -1)
120 if (errno
== EWOULDBLOCK
)
121 return 0; // Flow control kicked in.
122 else if (errno
== EPIPE
|| errno
== ECONNRESET
)
124 ACE_DEBUG ((LM_DEBUG
,
125 ACE_TEXT ("(%t) Client::handle_output; server ")
126 ACE_TEXT ("closed handle %d\n"),
127 this->peer ().get_handle ()));
131 ACE_ERROR_RETURN ((LM_ERROR
,
132 ACE_TEXT ("(%t) %p\n"),
133 ACE_TEXT ("Client::handle_output")),
136 else if (bytes_sent
== 0)
139 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%t) Sent %s"), buffer
));
146 Client::handle_timeout (const ACE_Time_Value
&, const void *)
149 ACE_TEXT ("(%t) Expected client timeout occurred at: %T\n")));
153 int status
= this->handle_output (this->get_handle ());
154 if (status
== -1 || this->call_count_
> 10)
156 if (this->reactor ()->end_reactor_event_loop () == 0)
158 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
160 ACE_ERROR ((LM_ERROR
,
161 ACE_TEXT ("(%t) %p\n"),
162 ACE_TEXT ("Failed client reactor shutdown")));
164 // Force this service handler to be closed in either case.
172 Client::handle_close (ACE_HANDLE handle
,
173 ACE_Reactor_Mask mask
)
176 ACE_TEXT ("(%t) Client Svc_Handler closed ")
177 ACE_TEXT ("handle <%d> with reactor mask <0x%x>.\n"),
181 // There is no point in running reactor after this client is closed.
182 if (this->reactor ()->end_reactor_event_loop () == 0)
184 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
186 ACE_ERROR ((LM_ERROR
,
187 ACE_TEXT ("(%t) %p\n"),
188 ACE_TEXT ("Failed client reactor shutdown")));
190 return SVC_HANDLER::handle_close (handle
, mask
);
193 // ----------------------------------------------------
201 Server::handle_input (ACE_HANDLE
/* handle */)
203 char buffer
[BUFSIZ
+1] = { 0 }; // Insure a trailing nul
204 ssize_t bytes_read
= 0;
206 char * const begin
= buffer
;
207 char * const end
= buffer
+ BUFSIZ
;
209 for (char * buf
= begin
; buf
< end
; buf
+= bytes_read
)
211 // Keep reading until it is no longer possible to do so.
213 // This is done since the underlying event demultiplexing
214 // mechanism may have a "state change" interface (as opposed to
215 // "state monitoring"), in which case a "speculative" read is
217 bytes_read
= this->peer ().recv (buf
, end
- buf
);
219 ACE_DEBUG ((LM_DEBUG
,
220 ACE_TEXT ("****** bytes_read = %d\n"),
223 if (bytes_read
== -1)
225 if (errno
== EWOULDBLOCK
)
227 // ACE_HEX_DUMP ((LM_DEBUG,
230 // "BUFFER CONTENTS"));
237 ACE_ERROR_RETURN ((LM_ERROR
,
238 ACE_TEXT ("(%t) %p\n"),
239 ACE_TEXT ("Server::handle_input")),
242 else if (bytes_read
== 0)
246 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%t) Message received: %s\n"), buffer
));
252 Server::handle_timeout (const ACE_Time_Value
&,
256 ACE_TEXT ("(%t) Expected server timeout occurred at: %T\n")));
258 // if (this->call_count_ == 0
259 // && this->handle_input (this->get_handle ()) != 0
260 // && errno != EWOULDBLOCK)
263 // ACE_DEBUG ((LM_INFO,
264 // "SERVER HANDLE = %d\n",
265 // this->get_handle ()));
270 if (this->call_count_
> 10)
272 if (this->reactor ()->end_reactor_event_loop () == 0)
274 ACE_TEXT ("(%t) Successful server reactor shutdown.\n")));
276 ACE_ERROR ((LM_ERROR
,
277 ACE_TEXT ("(%t) %p\n"),
278 ACE_TEXT ("Failed server reactor shutdown")));
280 // Force this service handler to be closed in either case.
288 Server::handle_close (ACE_HANDLE handle
,
289 ACE_Reactor_Mask mask
)
291 if (this->call_count_
> 4)
294 ACE_TEXT ("(%t) Server Svc_Handler closing ")
295 ACE_TEXT ("handle <%d,%d> with reactor mask <0x%x>.\n"),
301 return SVC_HANDLER::handle_close (handle
, mask
);
304 // ----------------------------------------------------
306 using ACCEPTOR
= ACE_Acceptor
<Server
, ACE_SOCK_Acceptor
>;
307 using CONNECTOR
= ACE_Connector
<Client
, ACE_SOCK_Connector
>;
309 // ----------------------------------------------------
311 class TestAcceptor
: public ACCEPTOR
314 int accept_svc_handler (Server
* handler
) override
316 int result
= this->ACCEPTOR::accept_svc_handler (handler
);
320 if (errno
!= EWOULDBLOCK
)
321 ACE_ERROR ((LM_ERROR
,
322 ACE_TEXT ("(%t) %p\n"),
323 ACE_TEXT ("Unable to accept connection")));
328 ACE_DEBUG ((LM_DEBUG
,
329 ACE_TEXT ("(%t) Accepted connection. ")
330 ACE_TEXT ("Stream handle: <%d>\n"),
331 handler
->get_handle ()));
333 // if (handler->handle_input (handler->get_handle ()) == -1
334 // && errno != EWOULDBLOCK)
338 ACE_Time_Value
delay (2, 0);
339 ACE_Time_Value
restart (2, 0);
340 if (handler
->reactor ()->schedule_timer (handler
,
345 ACE_ERROR_RETURN ((LM_ERROR
,
346 ACE_TEXT ("(%t) %p\n"),
347 ACE_TEXT ("Unable to schedule server side ")
348 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
357 // ----------------------------------------------------
359 class TestConnector
: public CONNECTOR
362 int connect_svc_handler (
363 CONNECTOR::handler_type
*& handler
,
364 const CONNECTOR::addr_type
&remote_addr
,
365 ACE_Time_Value
*timeout
,
366 const CONNECTOR::addr_type
&local_addr
,
371 const int result
= this->CONNECTOR::connect_svc_handler (handler
,
382 ACE_TCHAR hostname
[MAXHOSTNAMELEN
];
383 if (remote_addr
.get_host_name (hostname
,
384 sizeof (hostname
)) != 0)
386 ACE_ERROR_RETURN ((LM_ERROR
,
387 ACE_TEXT ("(%t) %p\n"),
388 ACE_TEXT ("Unable to retrieve hostname")),
392 ACE_DEBUG ((LM_DEBUG
,
393 ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
395 (int) remote_addr
.get_port_number ()));
398 ACE_Time_Value
delay (4, 0);
399 ACE_Time_Value
restart (3, 0);
400 if (handler
->reactor ()->schedule_timer (handler
,
405 ACE_ERROR_RETURN ((LM_ERROR
,
406 ACE_TEXT ("(%t) %p\n"),
407 ACE_TEXT ("Unable to schedule client side ")
408 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
416 int connect_svc_handler (
417 CONNECTOR::handler_type
*& handler
,
418 CONNECTOR::handler_type
*& sh_copy
,
419 const CONNECTOR::addr_type
&remote_addr
,
420 ACE_Time_Value
*timeout
,
421 const CONNECTOR::addr_type
&local_addr
,
424 int perms
) override
{
426 return this->connect_svc_handler (handler
, remote_addr
, timeout
,
427 local_addr
, reuse_addr
, flags
,
432 // ----------------------------------------------------
435 disable_signal (int sigmin
, int sigmax
)
437 #if !defined (ACE_LACKS_UNIX_SIGNALS)
439 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
440 ACE_ERROR ((LM_ERROR
,
441 ACE_TEXT ("Error: (%P|%t):%p\n"),
442 ACE_TEXT ("sigemptyset failed")));
444 for (int i
= sigmin
; i
<= sigmax
; i
++)
445 ACE_OS::sigaddset (&signal_set
, i
);
447 // Put the <signal_set>.
448 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
449 // In multi-threaded application this is not POSIX compliant
450 // but let's leave it just in case.
451 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
453 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
454 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
455 ACE_ERROR_RETURN ((LM_ERROR
,
456 ACE_TEXT ("Error: (%P|%t): %p\n"),
457 ACE_TEXT ("SIG_BLOCK failed")),
460 ACE_UNUSED_ARG (sigmin
);
461 ACE_UNUSED_ARG (sigmax
);
462 #endif /* ACE_LACKS_UNIX_SIGNALS */
467 // ----------------------------------------------------
470 server_worker (void *p
)
472 disable_signal (SIGPIPE
, SIGPIPE
);
474 const unsigned short port
= *(static_cast<unsigned short *> (p
));
478 if (addr
.set (port
, INADDR_LOOPBACK
) != 0)
480 ACE_ERROR ((LM_ERROR
,
481 ACE_TEXT ("(%t) %p\n"),
482 ACE_TEXT ("server_worker - ACE_INET_Addr::set")));
487 ACE_Dev_Poll_Reactor dp_reactor
;
488 dp_reactor
.restart (1); // Restart on EINTR
489 ACE_Reactor
reactor (&dp_reactor
);
494 ACE_SET_BITS (flags
, ACE_NONBLOCK
); // Enable non-blocking in the
497 if (server
.open (addr
, &reactor
, flags
) != 0)
499 ACE_ERROR ((LM_ERROR
,
500 ACE_TEXT ("(%t) %p\n"),
501 ACE_TEXT ("Unable to open server service handler")));
506 if (reactor
.run_reactor_event_loop () != 0)
508 ACE_ERROR ((LM_ERROR
,
509 ACE_TEXT ("(%t) %p\n"),
510 ACE_TEXT ("Error when running server ")
511 ACE_TEXT ("reactor event loop")));
517 ACE_TEXT ("(%t) Reactor event loop finished ")
518 ACE_TEXT ("successfully.\n")));
523 // ----------------------------------------------------
527 // unsigned short port;
529 // ACE_Condition<ACE_SYNCH_MUTEX> * cv;
532 // ----------------------------------------------------
535 run_main (int, ACE_TCHAR
*[])
537 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
539 // Make sure we ignore SIGPIPE
540 disable_signal (SIGPIPE
, SIGPIPE
);
542 ACE_Dev_Poll_Reactor dp_reactor
;
543 dp_reactor
.restart (1); // Restart on EINTR
544 ACE_Reactor
reactor (&dp_reactor
);
546 TestConnector client
;
549 ACE_SET_BITS (flags
, ACE_NONBLOCK
); // Enable non-blocking in the
552 if (client
.open (&reactor
, flags
) != 0)
553 ACE_ERROR_RETURN ((LM_ERROR
,
554 ACE_TEXT ("(%t) %p\n"),
555 ACE_TEXT ("Unable to open client service handler")),
558 // ACE_SYNCH_MUTEX mutex;
559 // ACE_Condition<ACE_SYNCH_MUTEX> cv (mutex);
562 // arg.port = 54678; // Port the server will listen on.
565 unsigned short port
= 54678;
567 if (ACE_Thread_Manager::instance ()->spawn (server_worker
, &port
) == -1)
568 ACE_ERROR_RETURN ((LM_ERROR
,
569 ACE_TEXT ("(%t) %p\n"),
570 ACE_TEXT ("Unable to spawn server thread")),
573 ACE_OS::sleep (5); // Wait for the listening endpoint to be set up.
576 if (addr
.set (port
, INADDR_LOOPBACK
) != 0)
577 ACE_ERROR_RETURN ((LM_ERROR
,
578 ACE_TEXT ("(%t) %p\n"),
579 ACE_TEXT ("ACE_INET_Addr::set")),
582 Client
*client_handler
= 0;
584 if (client
.connect (client_handler
, addr
) != 0)
585 ACE_ERROR_RETURN ((LM_ERROR
,
586 ACE_TEXT ("(%t) %p\n"),
587 ACE_TEXT ("Unable to connect to server")),
590 if (reactor
.run_reactor_event_loop () != 0)
591 ACE_ERROR_RETURN ((LM_ERROR
,
592 ACE_TEXT ("(%t) %p\n"),
593 ACE_TEXT ("Error when running client ")
594 ACE_TEXT ("reactor event loop")),
597 if (ACE_Thread_Manager::instance ()->wait () != 0)
598 ACE_ERROR_RETURN ((LM_ERROR
,
599 ACE_TEXT ("(%t) %p\n"),
600 ACE_TEXT ("Error waiting for threads to complete")),
611 run_main (int, ACE_TCHAR
*[])
613 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
615 ACE_TEXT ("Dev Poll and Event Poll are not supported ")
616 ACE_TEXT ("on this platform\n")));
621 #endif /* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */