1 //=============================================================================
3 * @file Dev_Poll_Reactor_Echo_Test.cpp
5 * This test implements a simple echo server using the
6 * Dev_Poll_Reactor. This forces the reactor to behave like a
7 * reactor would in a typical client/server application, i.e.,
8 * receive a message then send a messages.
9 * @author Justin Wilson <wilsonj@ociweb.com>
11 //=============================================================================
13 #include "test_config.h"
15 #if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)
17 #include "ace/OS_NS_signal.h"
18 #include "ace/Reactor.h"
19 #include "ace/Dev_Poll_Reactor.h"
21 #include "ace/Acceptor.h"
22 #include "ace/Connector.h"
24 #include "ace/SOCK_Acceptor.h"
25 #include "ace/SOCK_Connector.h"
26 #include "ace/SOCK_Stream.h"
28 #include "ace/OS_NS_unistd.h"
29 #include "ace/OS_NS_netdb.h"
33 using SVC_HANDLER
= ACE_Svc_Handler
<ACE_SOCK_Stream
, ACE_NULL_SYNCH
>;
35 // ----------------------------------------------------
37 class Client
: public SVC_HANDLER
42 //FUZZ: disable check_for_lack_ACE_OS
43 int open (void * = 0) override
;
44 //FUZZ: enable check_for_lack_ACE_OS
46 int handle_output (ACE_HANDLE handle
) override
;
48 int handle_input (ACE_HANDLE handle
) override
;
50 int handle_timeout (const ACE_Time_Value
¤t_time
,
51 const void *act
) override
;
53 int handle_close (ACE_HANDLE handle
,
54 ACE_Reactor_Mask mask
) override
;
60 unsigned int call_count_
;
64 class Server
: public SVC_HANDLER
69 int handle_input (ACE_HANDLE handle
) override
;
71 int handle_output (ACE_HANDLE handle
) override
;
73 int handle_close (ACE_HANDLE handle
,
74 ACE_Reactor_Mask mask
) override
;
77 int send_i (const char* buffer
,
80 std::queue
<std::string
*> buffer_list_
;
84 // ----------------------------------------------------
94 // Trigger writes on a timer.
95 ACE_Time_Value
delay (1, 0);
96 ACE_Time_Value
restart (1, 0);
97 if (this->reactor ()->schedule_timer (this,
102 ACE_ERROR_RETURN ((LM_ERROR
,
103 ACE_TEXT ("(%t) %p\n"),
104 ACE_TEXT ("Unable to schedule client side ")
105 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
109 if (this->reactor ()->register_handler (this, ACE_Event_Handler::READ_MASK
) == -1)
111 ACE_ERROR_RETURN ((LM_ERROR
,
112 ACE_TEXT ("(%t) %p\n"),
113 ACE_TEXT ("Unable to register for reading ")
114 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
122 Client::handle_output (ACE_HANDLE handle
)
124 std::string buffer
= "Hello, world!";
125 ssize_t bytes_sent
= this->peer ().send (buffer
.data (), buffer
.size ());
127 ACE_DEBUG ((LM_DEBUG
,
128 ACE_TEXT ("(%t) Client::handle_output; handle = %d")
129 ACE_TEXT (" bytes sent %d\n"),
133 if (bytes_sent
== -1)
135 if (errno
== EWOULDBLOCK
)
136 return 0; // Flow control kicked in.
137 else if (errno
== EPIPE
|| errno
== ECONNRESET
)
139 ACE_DEBUG ((LM_DEBUG
,
140 ACE_TEXT ("(%t) Client::handle_output; server ")
141 ACE_TEXT ("closed handle %d\n"),
142 this->peer ().get_handle ()));
146 ACE_ERROR_RETURN ((LM_ERROR
,
147 ACE_TEXT ("(%t) %p\n"),
148 ACE_TEXT ("Client::handle_output")),
151 else if (bytes_sent
== 0)
154 this->sent
.append (buffer
.substr (0, bytes_sent
));
160 Client::handle_input (ACE_HANDLE handle
)
165 ssize_t bytes_read
= this->peer ().recv (buffer
, BUFSIZ
);
166 ACE_DEBUG ((LM_DEBUG
,
167 ACE_TEXT ("(%t) Client::handle_input handle = %d bytes_read = %d\n"),
168 handle
, bytes_read
));
170 if (bytes_read
== -1 && errno
== EWOULDBLOCK
)
174 else if (bytes_read
<= 0)
181 this->received
.append (buffer
, bytes_read
);
187 Client::handle_timeout (const ACE_Time_Value
&, const void *)
190 ACE_TEXT ("(%t) Expected client timeout occurred at: %T\n")));
192 if (this->call_count_
!= 10)
194 // Register for write.
195 if (this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK
) == -1)
197 ACE_ERROR_RETURN ((LM_ERROR
,
198 ACE_TEXT ("(%t) %p\n"),
199 ACE_TEXT ("Unable to register for writing ")
200 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
209 if (this->reactor ()->end_reactor_event_loop () == 0)
211 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
213 ACE_ERROR ((LM_ERROR
,
214 ACE_TEXT ("(%t) %p\n"),
215 ACE_TEXT ("Failed client reactor shutdown")));
217 // Force this service handler to be closed in either case.
223 Client::handle_close (ACE_HANDLE handle
,
224 ACE_Reactor_Mask mask
)
226 ACE_DEBUG ((LM_DEBUG
,
227 ACE_TEXT ("(%t) Client::handle_close handle = %d mask = %xd\n"), handle
, mask
));
229 //return SVC_HANDLER::handle_close (handle, mask);
232 // ----------------------------------------------------
240 Server::handle_input (ACE_HANDLE handle
)
245 ssize_t bytes_read
= this->peer ().recv (buffer
, BUFSIZ
);
246 ACE_DEBUG ((LM_DEBUG
,
247 ACE_TEXT ("(%t) Server::handle_input handle = %d bytes_read = %d\n"),
248 handle
, bytes_read
));
250 if (bytes_read
== -1 && errno
== EWOULDBLOCK
)
252 ACE_DEBUG ((LM_DEBUG
,
253 ACE_TEXT ("(%t) Server::handle_input handle = %d EWOULDBLOCK\n"),
257 else if (bytes_read
== 0)
260 ACE_DEBUG ((LM_DEBUG
,
261 ACE_TEXT ("(%t) Server::handle_input handle = %d CLOSED\n"),
267 if (send_i (buffer
, bytes_read
) == -1)
274 Server::send_i (const char* buffer
,
282 if (buffer_list_
.empty ())
284 // Register for write.
285 if (this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK
) == -1)
287 ACE_ERROR_RETURN ((LM_ERROR
,
288 ACE_TEXT ("(%t) %p\n"),
289 ACE_TEXT ("Unable to register for writing ")
290 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
295 buffer_list_
.push (new std::string (buffer
, size
));
300 Server::handle_output (ACE_HANDLE handle
)
302 while (!buffer_list_
.empty ())
304 size_t bytes_to_send
= buffer_list_
.front ()->size () - offset_
;
305 ssize_t bytes_sent
= this->peer ().send (buffer_list_
.front ()->data () + offset_
, bytes_to_send
);
307 ACE_DEBUG ((LM_DEBUG
,
308 ACE_TEXT ("(%t) Server::handle_output; handle = %d")
309 ACE_TEXT (" bytes sent %d\n"),
310 handle
, bytes_sent
));
312 if (bytes_sent
== -1)
314 if (errno
== EWOULDBLOCK
)
316 else if (errno
== EPIPE
|| errno
== ECONNRESET
)
318 ACE_DEBUG ((LM_DEBUG
,
319 ACE_TEXT ("(%t) Client::handle_output; server ")
320 ACE_TEXT ("closed handle %d\n"),
321 this->peer ().get_handle ()));
325 ACE_ERROR_RETURN ((LM_ERROR
,
326 ACE_TEXT ("(%t) %p\n"),
327 ACE_TEXT ("Client::handle_output")),
330 else if (bytes_sent
== 0)
334 if (bytes_sent
== static_cast<ssize_t
> (bytes_to_send
))
336 delete buffer_list_
.front ();
342 offset_
+= bytes_sent
;
351 Server::handle_close (ACE_HANDLE handle
,
352 ACE_Reactor_Mask mask
)
354 ACE_DEBUG ((LM_DEBUG
,
355 ACE_TEXT ("(%t) Server::handle_close handle = %d mask = %xd\n"), handle
, mask
));
357 //return SVC_HANDLER::handle_close (handle, mask);
360 // ----------------------------------------------------
362 using ACCEPTOR
= ACE_Acceptor
<Server
, ACE_SOCK_Acceptor
>;
363 using CONNECTOR
= ACE_Connector
<Client
, ACE_SOCK_Connector
>;
365 // ----------------------------------------------------
367 class TestAcceptor
: public ACCEPTOR
370 int accept_svc_handler (Server
* handler
) override
372 int result
= this->ACCEPTOR::accept_svc_handler (handler
);
376 if (errno
!= EWOULDBLOCK
)
377 ACE_ERROR ((LM_ERROR
,
378 ACE_TEXT ("(%t) %p\n"),
379 ACE_TEXT ("Unable to accept connection")));
384 ACE_DEBUG ((LM_DEBUG
,
385 ACE_TEXT ("(%t) Accepted connection. ")
386 ACE_TEXT ("Stream handle: <%d>\n"),
387 handler
->get_handle ()));
393 // ----------------------------------------------------
395 class TestConnector
: public CONNECTOR
398 int connect_svc_handler (
399 CONNECTOR::handler_type
*& handler
,
400 const CONNECTOR::addr_type
&remote_addr
,
401 ACE_Time_Value
*timeout
,
402 const CONNECTOR::addr_type
&local_addr
,
407 const int result
= this->CONNECTOR::connect_svc_handler (handler
,
418 ACE_TCHAR hostname
[MAXHOSTNAMELEN
];
419 if (remote_addr
.get_host_name (hostname
,
420 sizeof (hostname
)) != 0)
422 ACE_ERROR_RETURN ((LM_ERROR
,
423 ACE_TEXT ("(%t) %p\n"),
424 ACE_TEXT ("Unable to retrieve hostname")),
428 ACE_DEBUG ((LM_DEBUG
,
429 ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
431 (int) remote_addr
.get_port_number ()));
436 int connect_svc_handler (
437 CONNECTOR::handler_type
*& handler
,
438 CONNECTOR::handler_type
*& sh_copy
,
439 const CONNECTOR::addr_type
&remote_addr
,
440 ACE_Time_Value
*timeout
,
441 const CONNECTOR::addr_type
&local_addr
,
444 int perms
) override
{
446 return this->connect_svc_handler (handler
, remote_addr
, timeout
,
447 local_addr
, reuse_addr
, flags
,
452 // ----------------------------------------------------
455 disable_signal (int sigmin
, int sigmax
)
457 #if !defined (ACE_LACKS_UNIX_SIGNALS)
459 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
460 ACE_ERROR ((LM_ERROR
,
461 ACE_TEXT ("Error: (%P|%t):%p\n"),
462 ACE_TEXT ("sigemptyset failed")));
464 for (int i
= sigmin
; i
<= sigmax
; i
++)
465 ACE_OS::sigaddset (&signal_set
, i
);
467 // Put the <signal_set>.
468 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
469 // In multi-threaded application this is not POSIX compliant
470 // but let's leave it just in case.
471 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
473 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
474 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
475 ACE_ERROR_RETURN ((LM_ERROR
,
476 ACE_TEXT ("Error: (%P|%t): %p\n"),
477 ACE_TEXT ("SIG_BLOCK failed")),
480 ACE_UNUSED_ARG (sigmin
);
481 ACE_UNUSED_ARG (sigmax
);
482 #endif /* ACE_LACKS_UNIX_SIGNALS */
487 // ----------------------------------------------------
490 run_main (int, ACE_TCHAR
*[])
492 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Echo_Test"));
494 // Make sure we ignore SIGPIPE
495 disable_signal (SIGPIPE
, SIGPIPE
);
497 ACE_Dev_Poll_Reactor dp_reactor
;
498 dp_reactor
.restart (1); // Restart on EINTR
499 ACE_Reactor
reactor (&dp_reactor
);
501 TestConnector client
;
504 ACE_SET_BITS (flags
, ACE_NONBLOCK
); // Enable non-blocking in the
507 if (client
.open (&reactor
, flags
) != 0)
508 ACE_ERROR_RETURN ((LM_ERROR
,
509 ACE_TEXT ("(%t) %p\n"),
510 ACE_TEXT ("Unable to open client service handler")),
513 unsigned short port
= 54678;
517 if (addr
.set (port
, INADDR_LOOPBACK
) != 0)
518 ACE_ERROR_RETURN ((LM_ERROR
,
519 ACE_TEXT ("(%t) %p\n"),
520 ACE_TEXT ("server_worker - ACE_INET_Addr::set")),
525 if (server
.open (addr
, &reactor
, flags
) != 0)
526 ACE_ERROR_RETURN ((LM_ERROR
,
527 ACE_TEXT ("(%t) %p\n"),
528 ACE_TEXT ("Unable to open server service handler")),
531 Client
*client_handler
= 0;
533 if (client
.connect (client_handler
, addr
) != 0)
534 ACE_ERROR_RETURN ((LM_ERROR
,
535 ACE_TEXT ("(%t) %p\n"),
536 ACE_TEXT ("Unable to connect to server")),
539 if (reactor
.run_reactor_event_loop () != 0)
540 ACE_ERROR_RETURN ((LM_ERROR
,
541 ACE_TEXT ("(%t) %p\n"),
542 ACE_TEXT ("Error when running client ")
543 ACE_TEXT ("reactor event loop")),
546 ACE_DEBUG((LM_DEBUG
, "sent: %C\n", client_handler
->sent
.c_str ()));
547 ACE_DEBUG((LM_DEBUG
, "received: %C\n", client_handler
->received
.c_str ()));
549 ACE_TEST_ASSERT (client_handler
->sent
== client_handler
->received
);
559 run_main (int, ACE_TCHAR
*[])
561 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Echo_Test"));
563 ACE_TEXT ("Dev Poll and Event Poll are not supported ")
564 ACE_TEXT ("on this platform\n")));
569 #endif /* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */