1 //=============================================================================
3 * @file Dev_Poll_Reactor_Echo_Test.cpp
5 * This test implements a simple echo server using the
6 * Dev_Poll_Reactor. This forces the reactor to behave like a
7 * reactor would in a typical client/server application, i.e.,
8 * receive a message then send a messages.
9 * @author Justin Wilson <wilsonj@ociweb.com>
11 //=============================================================================
13 #include "test_config.h"
15 #if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)
17 #include "ace/OS_NS_signal.h"
18 #include "ace/Reactor.h"
19 #include "ace/Dev_Poll_Reactor.h"
21 #include "ace/Acceptor.h"
22 #include "ace/Connector.h"
24 #include "ace/SOCK_Acceptor.h"
25 #include "ace/SOCK_Connector.h"
26 #include "ace/SOCK_Stream.h"
28 #include "ace/OS_NS_unistd.h"
29 #include "ace/OS_NS_netdb.h"
33 typedef ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
> SVC_HANDLER
;
35 // ----------------------------------------------------
37 class Client
: public SVC_HANDLER
43 //FUZZ: disable check_for_lack_ACE_OS
44 virtual int open (void * = 0);
45 //FUZZ: enable check_for_lack_ACE_OS
47 virtual int handle_output (ACE_HANDLE handle
);
49 virtual int handle_input (ACE_HANDLE handle
);
51 virtual int handle_timeout (const ACE_Time_Value
¤t_time
,
54 virtual int handle_close (ACE_HANDLE handle
,
55 ACE_Reactor_Mask mask
);
61 unsigned int call_count_
;
65 class Server
: public SVC_HANDLER
71 virtual int handle_input (ACE_HANDLE handle
);
73 virtual int handle_output (ACE_HANDLE handle
);
75 virtual int handle_close (ACE_HANDLE handle
,
76 ACE_Reactor_Mask mask
);
79 int send_i (const char* buffer
,
82 std::queue
<std::string
*> buffer_list_
;
86 // ----------------------------------------------------
96 // Trigger writes on a timer.
97 ACE_Time_Value
delay (1, 0);
98 ACE_Time_Value
restart (1, 0);
99 if (this->reactor ()->schedule_timer (this,
104 ACE_ERROR_RETURN ((LM_ERROR
,
105 ACE_TEXT ("(%t) %p\n"),
106 ACE_TEXT ("Unable to schedule client side ")
107 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
111 if (this->reactor ()->register_handler (this, ACE_Event_Handler::READ_MASK
) == -1)
113 ACE_ERROR_RETURN ((LM_ERROR
,
114 ACE_TEXT ("(%t) %p\n"),
115 ACE_TEXT ("Unable to register for reading ")
116 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
124 Client::handle_output (ACE_HANDLE handle
)
126 std::string buffer
= "Hello, world!";
127 ssize_t bytes_sent
= this->peer ().send (buffer
.data (), buffer
.size ());
129 ACE_DEBUG ((LM_DEBUG
,
130 ACE_TEXT ("(%t) Client::handle_output; handle = %d")
131 ACE_TEXT (" bytes sent %d\n"),
135 if (bytes_sent
== -1)
137 if (errno
== EWOULDBLOCK
)
138 return 0; // Flow control kicked in.
139 else if (errno
== EPIPE
|| errno
== ECONNRESET
)
141 ACE_DEBUG ((LM_DEBUG
,
142 ACE_TEXT ("(%t) Client::handle_output; server ")
143 ACE_TEXT ("closed handle %d\n"),
144 this->peer ().get_handle ()));
148 ACE_ERROR_RETURN ((LM_ERROR
,
149 ACE_TEXT ("(%t) %p\n"),
150 ACE_TEXT ("Client::handle_output")),
153 else if (bytes_sent
== 0)
156 this->sent
.append (buffer
.substr (0, bytes_sent
));
162 Client::handle_input (ACE_HANDLE handle
)
167 ssize_t bytes_read
= this->peer ().recv (buffer
, BUFSIZ
);
168 ACE_DEBUG ((LM_DEBUG
,
169 ACE_TEXT ("(%t) Client::handle_input handle = %d bytes_read = %d\n"),
170 handle
, bytes_read
));
172 if (bytes_read
== -1 && errno
== EWOULDBLOCK
)
176 else if (bytes_read
<= 0)
183 this->received
.append (buffer
, bytes_read
);
189 Client::handle_timeout (const ACE_Time_Value
&, const void *)
192 ACE_TEXT ("(%t) Expected client timeout occurred at: %T\n")));
194 if (this->call_count_
!= 10)
196 // Register for write.
197 if (this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK
) == -1)
199 ACE_ERROR_RETURN ((LM_ERROR
,
200 ACE_TEXT ("(%t) %p\n"),
201 ACE_TEXT ("Unable to register for writing ")
202 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
211 if (this->reactor ()->end_reactor_event_loop () == 0)
213 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
215 ACE_ERROR ((LM_ERROR
,
216 ACE_TEXT ("(%t) %p\n"),
217 ACE_TEXT ("Failed client reactor shutdown")));
219 // Force this service handler to be closed in either case.
225 Client::handle_close (ACE_HANDLE handle
,
226 ACE_Reactor_Mask mask
)
228 ACE_DEBUG ((LM_DEBUG
,
229 ACE_TEXT ("(%t) Client::handle_close handle = %d mask = %xd\n"), handle
, mask
));
231 //return SVC_HANDLER::handle_close (handle, mask);
234 // ----------------------------------------------------
236 Server::Server (void)
242 Server::handle_input (ACE_HANDLE handle
)
247 ssize_t bytes_read
= this->peer ().recv (buffer
, BUFSIZ
);
248 ACE_DEBUG ((LM_DEBUG
,
249 ACE_TEXT ("(%t) Server::handle_input handle = %d bytes_read = %d\n"),
250 handle
, bytes_read
));
252 if (bytes_read
== -1 && errno
== EWOULDBLOCK
)
254 ACE_DEBUG ((LM_DEBUG
,
255 ACE_TEXT ("(%t) Server::handle_input handle = %d EWOULDBLOCK\n"),
259 else if (bytes_read
== 0)
262 ACE_DEBUG ((LM_DEBUG
,
263 ACE_TEXT ("(%t) Server::handle_input handle = %d CLOSED\n"),
269 if (send_i (buffer
, bytes_read
) == -1)
276 Server::send_i (const char* buffer
,
284 if (buffer_list_
.empty ())
286 // Register for write.
287 if (this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK
) == -1)
289 ACE_ERROR_RETURN ((LM_ERROR
,
290 ACE_TEXT ("(%t) %p\n"),
291 ACE_TEXT ("Unable to register for writing ")
292 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
297 buffer_list_
.push (new std::string (buffer
, size
));
302 Server::handle_output (ACE_HANDLE handle
)
304 while (!buffer_list_
.empty ())
306 size_t bytes_to_send
= buffer_list_
.front ()->size () - offset_
;
307 ssize_t bytes_sent
= this->peer ().send (buffer_list_
.front ()->data () + offset_
, bytes_to_send
);
309 ACE_DEBUG ((LM_DEBUG
,
310 ACE_TEXT ("(%t) Server::handle_output; handle = %d")
311 ACE_TEXT (" bytes sent %d\n"),
312 handle
, bytes_sent
));
314 if (bytes_sent
== -1)
316 if (errno
== EWOULDBLOCK
)
318 else if (errno
== EPIPE
|| errno
== ECONNRESET
)
320 ACE_DEBUG ((LM_DEBUG
,
321 ACE_TEXT ("(%t) Client::handle_output; server ")
322 ACE_TEXT ("closed handle %d\n"),
323 this->peer ().get_handle ()));
327 ACE_ERROR_RETURN ((LM_ERROR
,
328 ACE_TEXT ("(%t) %p\n"),
329 ACE_TEXT ("Client::handle_output")),
332 else if (bytes_sent
== 0)
336 if (bytes_sent
== static_cast<ssize_t
> (bytes_to_send
))
338 delete buffer_list_
.front ();
344 offset_
+= bytes_sent
;
353 Server::handle_close (ACE_HANDLE handle
,
354 ACE_Reactor_Mask mask
)
356 ACE_DEBUG ((LM_DEBUG
,
357 ACE_TEXT ("(%t) Server::handle_close handle = %d mask = %xd\n"), handle
, mask
));
359 //return SVC_HANDLER::handle_close (handle, mask);
362 // ----------------------------------------------------
364 typedef ACE_Acceptor
<Server
, ACE_SOCK_ACCEPTOR
> ACCEPTOR
;
365 typedef ACE_Connector
<Client
, ACE_SOCK_CONNECTOR
> CONNECTOR
;
367 // ----------------------------------------------------
369 class TestAcceptor
: public ACCEPTOR
373 virtual int accept_svc_handler (Server
* handler
)
375 int result
= this->ACCEPTOR::accept_svc_handler (handler
);
379 if (errno
!= EWOULDBLOCK
)
380 ACE_ERROR ((LM_ERROR
,
381 ACE_TEXT ("(%t) %p\n"),
382 ACE_TEXT ("Unable to accept connection")));
387 ACE_DEBUG ((LM_DEBUG
,
388 ACE_TEXT ("(%t) Accepted connection. ")
389 ACE_TEXT ("Stream handle: <%d>\n"),
390 handler
->get_handle ()));
397 // ----------------------------------------------------
399 class TestConnector
: public CONNECTOR
403 virtual int connect_svc_handler (
404 CONNECTOR::handler_type
*& handler
,
405 const CONNECTOR::addr_type
&remote_addr
,
406 ACE_Time_Value
*timeout
,
407 const CONNECTOR::addr_type
&local_addr
,
412 const int result
= this->CONNECTOR::connect_svc_handler (handler
,
423 ACE_TCHAR hostname
[MAXHOSTNAMELEN
];
424 if (remote_addr
.get_host_name (hostname
,
425 sizeof (hostname
)) != 0)
427 ACE_ERROR_RETURN ((LM_ERROR
,
428 ACE_TEXT ("(%t) %p\n"),
429 ACE_TEXT ("Unable to retrieve hostname")),
433 ACE_DEBUG ((LM_DEBUG
,
434 ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
436 (int) remote_addr
.get_port_number ()));
441 virtual int connect_svc_handler (
442 CONNECTOR::handler_type
*& handler
,
443 CONNECTOR::handler_type
*& sh_copy
,
444 const CONNECTOR::addr_type
&remote_addr
,
445 ACE_Time_Value
*timeout
,
446 const CONNECTOR::addr_type
&local_addr
,
451 return this->connect_svc_handler (handler
, remote_addr
, timeout
,
452 local_addr
, reuse_addr
, flags
,
457 // ----------------------------------------------------
460 disable_signal (int sigmin
, int sigmax
)
462 #if !defined (ACE_LACKS_UNIX_SIGNALS)
464 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
465 ACE_ERROR ((LM_ERROR
,
466 ACE_TEXT ("Error: (%P|%t):%p\n"),
467 ACE_TEXT ("sigemptyset failed")));
469 for (int i
= sigmin
; i
<= sigmax
; i
++)
470 ACE_OS::sigaddset (&signal_set
, i
);
472 // Put the <signal_set>.
473 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
474 // In multi-threaded application this is not POSIX compliant
475 // but let's leave it just in case.
476 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
478 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
479 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
480 ACE_ERROR_RETURN ((LM_ERROR
,
481 ACE_TEXT ("Error: (%P|%t): %p\n"),
482 ACE_TEXT ("SIG_BLOCK failed")),
485 ACE_UNUSED_ARG (sigmin
);
486 ACE_UNUSED_ARG (sigmax
);
487 #endif /* ACE_LACKS_UNIX_SIGNALS */
492 // ----------------------------------------------------
495 run_main (int, ACE_TCHAR
*[])
497 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Echo_Test"));
499 // Make sure we ignore SIGPIPE
500 disable_signal (SIGPIPE
, SIGPIPE
);
502 ACE_Dev_Poll_Reactor dp_reactor
;
503 dp_reactor
.restart (1); // Restart on EINTR
504 ACE_Reactor
reactor (&dp_reactor
);
506 TestConnector client
;
509 ACE_SET_BITS (flags
, ACE_NONBLOCK
); // Enable non-blocking in the
512 if (client
.open (&reactor
, flags
) != 0)
513 ACE_ERROR_RETURN ((LM_ERROR
,
514 ACE_TEXT ("(%t) %p\n"),
515 ACE_TEXT ("Unable to open client service handler")),
518 unsigned short port
= 54678;
522 if (addr
.set (port
, INADDR_LOOPBACK
) != 0)
523 ACE_ERROR_RETURN ((LM_ERROR
,
524 ACE_TEXT ("(%t) %p\n"),
525 ACE_TEXT ("server_worker - ACE_INET_Addr::set")),
530 if (server
.open (addr
, &reactor
, flags
) != 0)
531 ACE_ERROR_RETURN ((LM_ERROR
,
532 ACE_TEXT ("(%t) %p\n"),
533 ACE_TEXT ("Unable to open server service handler")),
536 Client
*client_handler
= 0;
538 if (client
.connect (client_handler
, addr
) != 0)
539 ACE_ERROR_RETURN ((LM_ERROR
,
540 ACE_TEXT ("(%t) %p\n"),
541 ACE_TEXT ("Unable to connect to server")),
544 if (reactor
.run_reactor_event_loop () != 0)
545 ACE_ERROR_RETURN ((LM_ERROR
,
546 ACE_TEXT ("(%t) %p\n"),
547 ACE_TEXT ("Error when running client ")
548 ACE_TEXT ("reactor event loop")),
551 ACE_DEBUG((LM_DEBUG
, "sent: %C\n", client_handler
->sent
.c_str ()));
552 ACE_DEBUG((LM_DEBUG
, "received: %C\n", client_handler
->received
.c_str ()));
554 ACE_TEST_ASSERT (client_handler
->sent
== client_handler
->received
);
564 run_main (int, ACE_TCHAR
*[])
566 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Echo_Test"));
568 ACE_TEXT ("Dev Poll and Event Poll are not supported ")
569 ACE_TEXT ("on this platform\n")));
574 #endif /* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */