1 //=============================================================================
3 * @file Dev_Poll_Reactor_Test.cpp
5 * This test verifies that the Dev_Poll_Reactor is functioning
6 * properly, and demonstrates how "speculative reads" can be
7 * performed. "Speculative reads" are necessary when using event
8 * demultiplexing mechanisms that use a "state change" interface.
9 * Similarly, "speculative writes" may be necessary, i.e. keep
10 * writing until the connection is flow controlled. An example of a
11 * demuxing mechanism with such an interface is Linux's `/dev/epoll'
12 * character device. Mechansims with state change interfaces are
13 * also said to be "edge triggered," versus "level triggered"
14 * mechanisms such as select().
16 * @author Ossama Othman <ossama@dre.vanderbilt.edu>
18 //=============================================================================
20 #include "test_config.h"
22 #if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)
24 #include "ace/OS_NS_signal.h"
25 #include "ace/Reactor.h"
26 #include "ace/Dev_Poll_Reactor.h"
28 #include "ace/Acceptor.h"
29 #include "ace/Connector.h"
31 #include "ace/SOCK_Acceptor.h"
32 #include "ace/SOCK_Connector.h"
33 #include "ace/SOCK_Stream.h"
35 #include "ace/OS_NS_unistd.h"
36 #include "ace/OS_NS_netdb.h"
39 typedef ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
> SVC_HANDLER
;
41 // ----------------------------------------------------
43 class Client
: public SVC_HANDLER
49 //FUZZ: disable check_for_lack_ACE_OS
50 virtual int open (void * = 0);
51 //FUZZ: enable check_for_lack_ACE_OS
53 virtual int handle_output (ACE_HANDLE handle
);
55 virtual int handle_timeout (const ACE_Time_Value
¤t_time
,
58 virtual int handle_close (ACE_HANDLE handle
,
59 ACE_Reactor_Mask mask
);
63 unsigned int call_count_
;
68 class Server
: public SVC_HANDLER
74 virtual int handle_input (ACE_HANDLE handle
);
76 virtual int handle_timeout (const ACE_Time_Value
¤t_time
,
79 virtual int handle_close (ACE_HANDLE handle
,
80 ACE_Reactor_Mask mask
);
84 unsigned int call_count_
;
88 // ----------------------------------------------------
98 // ACE_TEST_ASSERT (this->reactor () != 0);
101 && this->reactor ()->register_handler (
103 ACE_Event_Handler::WRITE_MASK
) == -1)
104 ACE_ERROR_RETURN ((LM_ERROR
,
105 ACE_TEXT ("(%t) %p\n"),
106 ACE_TEXT ("unable to register client handler")),
113 Client::handle_output (ACE_HANDLE
)
115 for (int i
= 1; i
<= 5; ++i
)
117 char buffer
[BUFSIZ
] = { 0 };
119 ACE_OS::snprintf (buffer
, BUFSIZ
, "test message %d.\n", i
);
122 this->peer ().send (buffer
, ACE_OS::strlen (buffer
));
124 if (bytes_sent
== -1)
126 if (errno
== EWOULDBLOCK
)
127 return 0; // Flow control kicked in.
128 else if (errno
== EPIPE
|| errno
== ECONNRESET
)
130 ACE_DEBUG ((LM_DEBUG
,
131 ACE_TEXT ("(%t) Client::handle_output; server ")
132 ACE_TEXT ("closed handle %d\n"),
133 this->peer ().get_handle ()));
137 ACE_ERROR_RETURN ((LM_ERROR
,
138 ACE_TEXT ("(%t) %p\n"),
139 ACE_TEXT ("Client::handle_output")),
142 else if (bytes_sent
== 0)
145 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%t) Sent %s"), buffer
));
152 Client::handle_timeout (const ACE_Time_Value
&, const void *)
155 ACE_TEXT ("(%t) Expected client timeout occurred at: %T\n")));
159 int status
= this->handle_output (this->get_handle ());
160 if (status
== -1 || this->call_count_
> 10)
162 if (this->reactor ()->end_reactor_event_loop () == 0)
164 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
166 ACE_ERROR ((LM_ERROR
,
167 ACE_TEXT ("(%t) %p\n"),
168 ACE_TEXT ("Failed client reactor shutdown")));
170 // Force this service handler to be closed in either case.
178 Client::handle_close (ACE_HANDLE handle
,
179 ACE_Reactor_Mask mask
)
182 ACE_TEXT ("(%t) Client Svc_Handler closed ")
183 ACE_TEXT ("handle <%d> with reactor mask <0x%x>.\n"),
187 // There is no point in running reactor after this client is closed.
188 if (this->reactor ()->end_reactor_event_loop () == 0)
190 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
192 ACE_ERROR ((LM_ERROR
,
193 ACE_TEXT ("(%t) %p\n"),
194 ACE_TEXT ("Failed client reactor shutdown")));
196 return SVC_HANDLER::handle_close (handle
, mask
);
199 // ----------------------------------------------------
201 Server::Server (void)
207 Server::handle_input (ACE_HANDLE
/* handle */)
209 char buffer
[BUFSIZ
+1] = { 0 }; // Insure a trailing nul
210 ssize_t bytes_read
= 0;
212 char * const begin
= buffer
;
213 char * const end
= buffer
+ BUFSIZ
;
215 for (char * buf
= begin
; buf
< end
; buf
+= bytes_read
)
217 // Keep reading until it is no longer possible to do so.
219 // This is done since the underlying event demultiplexing
220 // mechanism may have a "state change" interface (as opposed to
221 // "state monitoring"), in which case a "speculative" read is
223 bytes_read
= this->peer ().recv (buf
, end
- buf
);
225 ACE_DEBUG ((LM_DEBUG
,
226 ACE_TEXT ("****** bytes_read = %d\n"),
229 if (bytes_read
== -1)
231 if (errno
== EWOULDBLOCK
)
234 // ACE_HEX_DUMP ((LM_DEBUG,
237 // "BUFFER CONTENTS"));
244 ACE_ERROR_RETURN ((LM_ERROR
,
245 ACE_TEXT ("(%t) %p\n"),
246 ACE_TEXT ("Server::handle_input")),
249 else if (bytes_read
== 0)
253 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%t) Message received: %s\n"), buffer
));
259 Server::handle_timeout (const ACE_Time_Value
&,
263 ACE_TEXT ("(%t) Expected server timeout occurred at: %T\n")));
265 // if (this->call_count_ == 0
266 // && this->handle_input (this->get_handle ()) != 0
267 // && errno != EWOULDBLOCK)
270 // ACE_DEBUG ((LM_INFO,
271 // "SERVER HANDLE = %d\n",
272 // this->get_handle ()));
277 if (this->call_count_
> 10)
279 if (this->reactor ()->end_reactor_event_loop () == 0)
281 ACE_TEXT ("(%t) Successful server reactor shutdown.\n")));
283 ACE_ERROR ((LM_ERROR
,
284 ACE_TEXT ("(%t) %p\n"),
285 ACE_TEXT ("Failed server reactor shutdown")));
287 // Force this service handler to be closed in either case.
295 Server::handle_close (ACE_HANDLE handle
,
296 ACE_Reactor_Mask mask
)
298 if (this->call_count_
> 4)
301 ACE_TEXT ("(%t) Server Svc_Handler closing ")
302 ACE_TEXT ("handle <%d,%d> with reactor mask <0x%x>.\n"),
308 return SVC_HANDLER::handle_close (handle
, mask
);
311 // ----------------------------------------------------
313 typedef ACE_Acceptor
<Server
, ACE_SOCK_ACCEPTOR
> ACCEPTOR
;
314 typedef ACE_Connector
<Client
, ACE_SOCK_CONNECTOR
> CONNECTOR
;
316 // ----------------------------------------------------
318 class TestAcceptor
: public ACCEPTOR
322 virtual int accept_svc_handler (Server
* handler
)
324 int result
= this->ACCEPTOR::accept_svc_handler (handler
);
328 if (errno
!= EWOULDBLOCK
)
329 ACE_ERROR ((LM_ERROR
,
330 ACE_TEXT ("(%t) %p\n"),
331 ACE_TEXT ("Unable to accept connection")));
336 ACE_DEBUG ((LM_DEBUG
,
337 ACE_TEXT ("(%t) Accepted connection. ")
338 ACE_TEXT ("Stream handle: <%d>\n"),
339 handler
->get_handle ()));
341 // if (handler->handle_input (handler->get_handle ()) == -1
342 // && errno != EWOULDBLOCK)
346 ACE_Time_Value
delay (2, 0);
347 ACE_Time_Value
restart (2, 0);
348 if (handler
->reactor ()->schedule_timer (handler
,
353 ACE_ERROR_RETURN ((LM_ERROR
,
354 ACE_TEXT ("(%t) %p\n"),
355 ACE_TEXT ("Unable to schedule server side ")
356 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
366 // ----------------------------------------------------
368 class TestConnector
: public CONNECTOR
372 virtual int connect_svc_handler (
373 CONNECTOR::handler_type
*& handler
,
374 const CONNECTOR::addr_type
&remote_addr
,
375 ACE_Time_Value
*timeout
,
376 const CONNECTOR::addr_type
&local_addr
,
381 const int result
= this->CONNECTOR::connect_svc_handler (handler
,
392 ACE_TCHAR hostname
[MAXHOSTNAMELEN
];
393 if (remote_addr
.get_host_name (hostname
,
394 sizeof (hostname
)) != 0)
396 ACE_ERROR_RETURN ((LM_ERROR
,
397 ACE_TEXT ("(%t) %p\n"),
398 ACE_TEXT ("Unable to retrieve hostname")),
402 ACE_DEBUG ((LM_DEBUG
,
403 ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
405 (int) remote_addr
.get_port_number ()));
408 ACE_Time_Value
delay (4, 0);
409 ACE_Time_Value
restart (3, 0);
410 if (handler
->reactor ()->schedule_timer (handler
,
415 ACE_ERROR_RETURN ((LM_ERROR
,
416 ACE_TEXT ("(%t) %p\n"),
417 ACE_TEXT ("Unable to schedule client side ")
418 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
426 virtual int connect_svc_handler (
427 CONNECTOR::handler_type
*& handler
,
428 CONNECTOR::handler_type
*& sh_copy
,
429 const CONNECTOR::addr_type
&remote_addr
,
430 ACE_Time_Value
*timeout
,
431 const CONNECTOR::addr_type
&local_addr
,
436 return this->connect_svc_handler (handler
, remote_addr
, timeout
,
437 local_addr
, reuse_addr
, flags
,
442 // ----------------------------------------------------
445 disable_signal (int sigmin
, int sigmax
)
447 #if !defined (ACE_LACKS_UNIX_SIGNALS)
449 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
450 ACE_ERROR ((LM_ERROR
,
451 ACE_TEXT ("Error: (%P|%t):%p\n"),
452 ACE_TEXT ("sigemptyset failed")));
454 for (int i
= sigmin
; i
<= sigmax
; i
++)
455 ACE_OS::sigaddset (&signal_set
, i
);
457 // Put the <signal_set>.
458 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
459 // In multi-threaded application this is not POSIX compliant
460 // but let's leave it just in case.
461 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
463 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
464 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
465 ACE_ERROR_RETURN ((LM_ERROR
,
466 ACE_TEXT ("Error: (%P|%t): %p\n"),
467 ACE_TEXT ("SIG_BLOCK failed")),
470 ACE_UNUSED_ARG (sigmin
);
471 ACE_UNUSED_ARG (sigmax
);
472 #endif /* ACE_LACKS_UNIX_SIGNALS */
477 // ----------------------------------------------------
480 server_worker (void *p
)
482 disable_signal (SIGPIPE
, SIGPIPE
);
484 const unsigned short port
= *(static_cast<unsigned short *> (p
));
488 if (addr
.set (port
, INADDR_LOOPBACK
) != 0)
490 ACE_ERROR ((LM_ERROR
,
491 ACE_TEXT ("(%t) %p\n"),
492 ACE_TEXT ("server_worker - ACE_INET_Addr::set")));
497 ACE_Dev_Poll_Reactor dp_reactor
;
498 dp_reactor
.restart (1); // Restart on EINTR
499 ACE_Reactor
reactor (&dp_reactor
);
504 ACE_SET_BITS (flags
, ACE_NONBLOCK
); // Enable non-blocking in the
507 if (server
.open (addr
, &reactor
, flags
) != 0)
509 ACE_ERROR ((LM_ERROR
,
510 ACE_TEXT ("(%t) %p\n"),
511 ACE_TEXT ("Unable to open server service handler")));
516 if (reactor
.run_reactor_event_loop () != 0)
518 ACE_ERROR ((LM_ERROR
,
519 ACE_TEXT ("(%t) %p\n"),
520 ACE_TEXT ("Error when running server ")
521 ACE_TEXT ("reactor event loop")));
527 ACE_TEXT ("(%t) Reactor event loop finished ")
528 ACE_TEXT ("successfully.\n")));
533 // ----------------------------------------------------
537 // unsigned short port;
539 // ACE_Condition<ACE_SYNCH_MUTEX> * cv;
542 // ----------------------------------------------------
545 run_main (int, ACE_TCHAR
*[])
547 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
549 // Make sure we ignore SIGPIPE
550 disable_signal (SIGPIPE
, SIGPIPE
);
552 ACE_Dev_Poll_Reactor dp_reactor
;
553 dp_reactor
.restart (1); // Restart on EINTR
554 ACE_Reactor
reactor (&dp_reactor
);
556 TestConnector client
;
559 ACE_SET_BITS (flags
, ACE_NONBLOCK
); // Enable non-blocking in the
562 if (client
.open (&reactor
, flags
) != 0)
563 ACE_ERROR_RETURN ((LM_ERROR
,
564 ACE_TEXT ("(%t) %p\n"),
565 ACE_TEXT ("Unable to open client service handler")),
568 // ACE_SYNCH_MUTEX mutex;
569 // ACE_Condition<ACE_SYNCH_MUTEX> cv (mutex);
572 // arg.port = 54678; // Port the server will listen on.
575 unsigned short port
= 54678;
577 if (ACE_Thread_Manager::instance ()->spawn (server_worker
, &port
) == -1)
578 ACE_ERROR_RETURN ((LM_ERROR
,
579 ACE_TEXT ("(%t) %p\n"),
580 ACE_TEXT ("Unable to spawn server thread")),
583 ACE_OS::sleep (5); // Wait for the listening endpoint to be set up.
586 if (addr
.set (port
, INADDR_LOOPBACK
) != 0)
587 ACE_ERROR_RETURN ((LM_ERROR
,
588 ACE_TEXT ("(%t) %p\n"),
589 ACE_TEXT ("ACE_INET_Addr::set")),
592 Client
*client_handler
= 0;
594 if (client
.connect (client_handler
, addr
) != 0)
595 ACE_ERROR_RETURN ((LM_ERROR
,
596 ACE_TEXT ("(%t) %p\n"),
597 ACE_TEXT ("Unable to connect to server")),
600 if (reactor
.run_reactor_event_loop () != 0)
601 ACE_ERROR_RETURN ((LM_ERROR
,
602 ACE_TEXT ("(%t) %p\n"),
603 ACE_TEXT ("Error when running client ")
604 ACE_TEXT ("reactor event loop")),
607 if (ACE_Thread_Manager::instance ()->wait () != 0)
608 ACE_ERROR_RETURN ((LM_ERROR
,
609 ACE_TEXT ("(%t) %p\n"),
610 ACE_TEXT ("Error waiting for threads to complete")),
621 run_main (int, ACE_TCHAR
*[])
623 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Test"));
625 ACE_TEXT ("Dev Poll and Event Poll are not supported ")
626 ACE_TEXT ("on this platform\n")));
631 #endif /* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */