Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / tests / Dev_Poll_Reactor_Echo_Test.cpp
blobb634d93a08148dffac72e08e9b7622266c5bdfb5
1 //=============================================================================
2 /**
3 * @file Dev_Poll_Reactor_Echo_Test.cpp
5 * This test implements a simple echo server using the
6 * Dev_Poll_Reactor. This forces the reactor to behave like a
7 * reactor would in a typical client/server application, i.e.,
8 * receive a message then send a messages.
9 * @author Justin Wilson <wilsonj@ociweb.com>
11 //=============================================================================
13 #include "test_config.h"
15 #if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)
17 #include "ace/OS_NS_signal.h"
18 #include "ace/Reactor.h"
19 #include "ace/Dev_Poll_Reactor.h"
21 #include "ace/Acceptor.h"
22 #include "ace/Connector.h"
24 #include "ace/SOCK_Acceptor.h"
25 #include "ace/SOCK_Connector.h"
26 #include "ace/SOCK_Stream.h"
28 #include "ace/OS_NS_unistd.h"
29 #include "ace/OS_NS_netdb.h"
31 #include <queue>
33 using SVC_HANDLER = ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH>;
35 // ----------------------------------------------------
37 class Client : public SVC_HANDLER
39 public:
40 Client ();
42 //FUZZ: disable check_for_lack_ACE_OS
43 int open (void * = 0) override;
44 //FUZZ: enable check_for_lack_ACE_OS
46 int handle_output (ACE_HANDLE handle) override;
48 int handle_input (ACE_HANDLE handle) override;
50 int handle_timeout (const ACE_Time_Value &current_time,
51 const void *act) override;
53 int handle_close (ACE_HANDLE handle,
54 ACE_Reactor_Mask mask) override;
56 std::string sent;
57 std::string received;
59 private:
60 unsigned int call_count_;
64 class Server : public SVC_HANDLER
66 public:
67 Server ();
69 int handle_input (ACE_HANDLE handle) override;
71 int handle_output (ACE_HANDLE handle) override;
73 int handle_close (ACE_HANDLE handle,
74 ACE_Reactor_Mask mask) override;
76 private:
77 int send_i (const char* buffer,
78 size_t size);
80 std::queue<std::string*> buffer_list_;
81 size_t offset_;
84 // ----------------------------------------------------
86 Client::Client ()
87 : call_count_ (0)
91 int
92 Client::open (void *)
94 // Trigger writes on a timer.
95 ACE_Time_Value delay (1, 0);
96 ACE_Time_Value restart (1, 0);
97 if (this->reactor ()->schedule_timer (this,
99 delay,
100 restart) == -1)
102 ACE_ERROR_RETURN ((LM_ERROR,
103 ACE_TEXT ("(%t) %p\n"),
104 ACE_TEXT ("Unable to schedule client side ")
105 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
106 -1);
109 if (this->reactor ()->register_handler (this, ACE_Event_Handler::READ_MASK) == -1)
111 ACE_ERROR_RETURN ((LM_ERROR,
112 ACE_TEXT ("(%t) %p\n"),
113 ACE_TEXT ("Unable to register for reading ")
114 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
115 -1);
118 return 0;
122 Client::handle_output (ACE_HANDLE handle)
124 std::string buffer = "Hello, world!";
125 ssize_t bytes_sent = this->peer ().send (buffer.data (), buffer.size ());
127 ACE_DEBUG ((LM_DEBUG,
128 ACE_TEXT ("(%t) Client::handle_output; handle = %d")
129 ACE_TEXT (" bytes sent %d\n"),
130 handle,
131 bytes_sent));
133 if (bytes_sent == -1)
135 if (errno == EWOULDBLOCK)
136 return 0; // Flow control kicked in.
137 else if (errno == EPIPE || errno == ECONNRESET)
139 ACE_DEBUG ((LM_DEBUG,
140 ACE_TEXT ("(%t) Client::handle_output; server ")
141 ACE_TEXT ("closed handle %d\n"),
142 this->peer ().get_handle ()));
143 return -1;
145 else
146 ACE_ERROR_RETURN ((LM_ERROR,
147 ACE_TEXT ("(%t) %p\n"),
148 ACE_TEXT ("Client::handle_output")),
149 -1);
151 else if (bytes_sent == 0)
152 return -1;
153 else
154 this->sent.append (buffer.substr (0, bytes_sent));
156 return -1;
160 Client::handle_input (ACE_HANDLE handle)
162 for (;;)
164 char buffer[BUFSIZ];
165 ssize_t bytes_read = this->peer ().recv (buffer, BUFSIZ);
166 ACE_DEBUG ((LM_DEBUG,
167 ACE_TEXT ("(%t) Client::handle_input handle = %d bytes_read = %d\n"),
168 handle, bytes_read));
170 if (bytes_read == -1 && errno == EWOULDBLOCK)
172 return 0;
174 else if (bytes_read <= 0)
176 // Closed.
177 return -1;
179 else
181 this->received.append (buffer, bytes_read);
187 Client::handle_timeout (const ACE_Time_Value &, const void *)
189 ACE_DEBUG ((LM_INFO,
190 ACE_TEXT ("(%t) Expected client timeout occurred at: %T\n")));
192 if (this->call_count_ != 10)
194 // Register for write.
195 if (this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK) == -1)
197 ACE_ERROR_RETURN ((LM_ERROR,
198 ACE_TEXT ("(%t) %p\n"),
199 ACE_TEXT ("Unable to register for writing ")
200 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
201 -1);
203 this->call_count_++;
204 return 0;
206 else
208 // Shutdown.
209 if (this->reactor ()->end_reactor_event_loop () == 0)
210 ACE_DEBUG ((LM_INFO,
211 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
212 else
213 ACE_ERROR ((LM_ERROR,
214 ACE_TEXT ("(%t) %p\n"),
215 ACE_TEXT ("Failed client reactor shutdown")));
217 // Force this service handler to be closed in either case.
218 return -1;
223 Client::handle_close (ACE_HANDLE handle,
224 ACE_Reactor_Mask mask)
226 ACE_DEBUG ((LM_DEBUG,
227 ACE_TEXT ("(%t) Client::handle_close handle = %d mask = %xd\n"), handle, mask));
228 return 0;
229 //return SVC_HANDLER::handle_close (handle, mask);
232 // ----------------------------------------------------
234 Server::Server ()
235 : offset_ (0)
240 Server::handle_input (ACE_HANDLE handle)
242 for (;;)
244 char buffer[BUFSIZ];
245 ssize_t bytes_read = this->peer ().recv (buffer, BUFSIZ);
246 ACE_DEBUG ((LM_DEBUG,
247 ACE_TEXT ("(%t) Server::handle_input handle = %d bytes_read = %d\n"),
248 handle, bytes_read));
250 if (bytes_read == -1 && errno == EWOULDBLOCK)
252 ACE_DEBUG ((LM_DEBUG,
253 ACE_TEXT ("(%t) Server::handle_input handle = %d EWOULDBLOCK\n"),
254 handle));
255 return 0;
257 else if (bytes_read == 0)
259 // Closed.
260 ACE_DEBUG ((LM_DEBUG,
261 ACE_TEXT ("(%t) Server::handle_input handle = %d CLOSED\n"),
262 handle));
263 return -1;
265 else
267 if (send_i (buffer, bytes_read) == -1)
268 return -1;
274 Server::send_i (const char* buffer,
275 size_t size)
277 if (size == 0)
279 return 0;
282 if (buffer_list_.empty ())
284 // Register for write.
285 if (this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK) == -1)
287 ACE_ERROR_RETURN ((LM_ERROR,
288 ACE_TEXT ("(%t) %p\n"),
289 ACE_TEXT ("Unable to register for writing ")
290 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
291 -1);
295 buffer_list_.push (new std::string (buffer, size));
296 return 0;
300 Server::handle_output (ACE_HANDLE handle)
302 while (!buffer_list_.empty ())
304 size_t bytes_to_send = buffer_list_.front ()->size () - offset_;
305 ssize_t bytes_sent = this->peer ().send (buffer_list_.front ()->data () + offset_, bytes_to_send);
307 ACE_DEBUG ((LM_DEBUG,
308 ACE_TEXT ("(%t) Server::handle_output; handle = %d")
309 ACE_TEXT (" bytes sent %d\n"),
310 handle, bytes_sent));
312 if (bytes_sent == -1)
314 if (errno == EWOULDBLOCK)
315 return 0;
316 else if (errno == EPIPE || errno == ECONNRESET)
318 ACE_DEBUG ((LM_DEBUG,
319 ACE_TEXT ("(%t) Client::handle_output; server ")
320 ACE_TEXT ("closed handle %d\n"),
321 this->peer ().get_handle ()));
322 return -1;
324 else
325 ACE_ERROR_RETURN ((LM_ERROR,
326 ACE_TEXT ("(%t) %p\n"),
327 ACE_TEXT ("Client::handle_output")),
328 -1);
330 else if (bytes_sent == 0)
331 return -1;
332 else
334 if (bytes_sent == static_cast<ssize_t> (bytes_to_send))
336 delete buffer_list_.front ();
337 buffer_list_.pop ();
338 offset_ = 0;
340 else
342 offset_ += bytes_sent;
347 return -1;
351 Server::handle_close (ACE_HANDLE handle,
352 ACE_Reactor_Mask mask)
354 ACE_DEBUG ((LM_DEBUG,
355 ACE_TEXT ("(%t) Server::handle_close handle = %d mask = %xd\n"), handle, mask));
356 return 0;
357 //return SVC_HANDLER::handle_close (handle, mask);
360 // ----------------------------------------------------
362 using ACCEPTOR = ACE_Acceptor<Server, ACE_SOCK_Acceptor>;
363 using CONNECTOR = ACE_Connector<Client, ACE_SOCK_Connector>;
365 // ----------------------------------------------------
367 class TestAcceptor : public ACCEPTOR
369 public:
370 int accept_svc_handler (Server * handler) override
372 int result = this->ACCEPTOR::accept_svc_handler (handler);
374 if (result != 0)
376 if (errno != EWOULDBLOCK)
377 ACE_ERROR ((LM_ERROR,
378 ACE_TEXT ("(%t) %p\n"),
379 ACE_TEXT ("Unable to accept connection")));
381 return result;
384 ACE_DEBUG ((LM_DEBUG,
385 ACE_TEXT ("(%t) Accepted connection. ")
386 ACE_TEXT ("Stream handle: <%d>\n"),
387 handler->get_handle ()));
389 return result;
393 // ----------------------------------------------------
395 class TestConnector : public CONNECTOR
397 public:
398 int connect_svc_handler (
399 CONNECTOR::handler_type *& handler,
400 const CONNECTOR::addr_type &remote_addr,
401 ACE_Time_Value *timeout,
402 const CONNECTOR::addr_type &local_addr,
403 int reuse_addr,
404 int flags,
405 int perms) override
407 const int result = this->CONNECTOR::connect_svc_handler (handler,
408 remote_addr,
409 timeout,
410 local_addr,
411 reuse_addr,
412 flags,
413 perms);
415 if (result != 0)
416 return result;
418 ACE_TCHAR hostname[MAXHOSTNAMELEN];
419 if (remote_addr.get_host_name (hostname,
420 sizeof (hostname)) != 0)
422 ACE_ERROR_RETURN ((LM_ERROR,
423 ACE_TEXT ("(%t) %p\n"),
424 ACE_TEXT ("Unable to retrieve hostname")),
425 -1);
428 ACE_DEBUG ((LM_DEBUG,
429 ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
430 hostname,
431 (int) remote_addr.get_port_number ()));
433 return result;
436 int connect_svc_handler (
437 CONNECTOR::handler_type *& handler,
438 CONNECTOR::handler_type *& sh_copy,
439 const CONNECTOR::addr_type &remote_addr,
440 ACE_Time_Value *timeout,
441 const CONNECTOR::addr_type &local_addr,
442 int reuse_addr,
443 int flags,
444 int perms) override {
445 sh_copy = handler;
446 return this->connect_svc_handler (handler, remote_addr, timeout,
447 local_addr, reuse_addr, flags,
448 perms);
452 // ----------------------------------------------------
454 static int
455 disable_signal (int sigmin, int sigmax)
457 #if !defined (ACE_LACKS_UNIX_SIGNALS)
458 sigset_t signal_set;
459 if (ACE_OS::sigemptyset (&signal_set) == - 1)
460 ACE_ERROR ((LM_ERROR,
461 ACE_TEXT ("Error: (%P|%t):%p\n"),
462 ACE_TEXT ("sigemptyset failed")));
464 for (int i = sigmin; i <= sigmax; i++)
465 ACE_OS::sigaddset (&signal_set, i);
467 // Put the <signal_set>.
468 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
469 // In multi-threaded application this is not POSIX compliant
470 // but let's leave it just in case.
471 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
472 # else
473 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
474 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
475 ACE_ERROR_RETURN ((LM_ERROR,
476 ACE_TEXT ("Error: (%P|%t): %p\n"),
477 ACE_TEXT ("SIG_BLOCK failed")),
478 -1);
479 #else
480 ACE_UNUSED_ARG (sigmin);
481 ACE_UNUSED_ARG (sigmax);
482 #endif /* ACE_LACKS_UNIX_SIGNALS */
484 return 0;
487 // ----------------------------------------------------
490 run_main (int, ACE_TCHAR *[])
492 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Echo_Test"));
494 // Make sure we ignore SIGPIPE
495 disable_signal (SIGPIPE, SIGPIPE);
497 ACE_Dev_Poll_Reactor dp_reactor;
498 dp_reactor.restart (1); // Restart on EINTR
499 ACE_Reactor reactor (&dp_reactor);
501 TestConnector client;
503 int flags = 0;
504 ACE_SET_BITS (flags, ACE_NONBLOCK); // Enable non-blocking in the
505 // Svc_Handlers.
507 if (client.open (&reactor, flags) != 0)
508 ACE_ERROR_RETURN ((LM_ERROR,
509 ACE_TEXT ("(%t) %p\n"),
510 ACE_TEXT ("Unable to open client service handler")),
511 -1);
513 unsigned short port = 54678;
515 ACE_INET_Addr addr;
517 if (addr.set (port, INADDR_LOOPBACK) != 0)
518 ACE_ERROR_RETURN ((LM_ERROR,
519 ACE_TEXT ("(%t) %p\n"),
520 ACE_TEXT ("server_worker - ACE_INET_Addr::set")),
521 -1);
523 TestAcceptor server;
525 if (server.open (addr, &reactor, flags) != 0)
526 ACE_ERROR_RETURN ((LM_ERROR,
527 ACE_TEXT ("(%t) %p\n"),
528 ACE_TEXT ("Unable to open server service handler")),
529 -1);
531 Client *client_handler = 0;
533 if (client.connect (client_handler, addr) != 0)
534 ACE_ERROR_RETURN ((LM_ERROR,
535 ACE_TEXT ("(%t) %p\n"),
536 ACE_TEXT ("Unable to connect to server")),
537 -1);
539 if (reactor.run_reactor_event_loop () != 0)
540 ACE_ERROR_RETURN ((LM_ERROR,
541 ACE_TEXT ("(%t) %p\n"),
542 ACE_TEXT ("Error when running client ")
543 ACE_TEXT ("reactor event loop")),
544 -1);
546 ACE_DEBUG((LM_DEBUG, "sent: %C\n", client_handler->sent.c_str ()));
547 ACE_DEBUG((LM_DEBUG, "received: %C\n", client_handler->received.c_str ()));
549 ACE_TEST_ASSERT (client_handler->sent == client_handler->received);
551 ACE_END_TEST;
553 return 0;
556 #else
559 run_main (int, ACE_TCHAR *[])
561 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Echo_Test"));
562 ACE_ERROR ((LM_INFO,
563 ACE_TEXT ("Dev Poll and Event Poll are not supported ")
564 ACE_TEXT ("on this platform\n")));
565 ACE_END_TEST;
566 return 0;
569 #endif /* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */