Doxygen changes
[ACE_TAO.git] / ACE / tests / Dev_Poll_Reactor_Echo_Test.cpp
blobe5d231dd9035eab741914587871a9cde7d270e86
1 //=============================================================================
2 /**
3 * @file Dev_Poll_Reactor_Echo_Test.cpp
5 * This test implements a simple echo server using the
6 * Dev_Poll_Reactor. This forces the reactor to behave like a
7 * reactor would in a typical client/server application, i.e.,
8 * receive a message then send a messages.
9 * @author Justin Wilson <wilsonj@ociweb.com>
11 //=============================================================================
13 #include "test_config.h"
15 #if defined (ACE_HAS_DEV_POLL) || defined (ACE_HAS_EVENT_POLL)
17 #include "ace/OS_NS_signal.h"
18 #include "ace/Reactor.h"
19 #include "ace/Dev_Poll_Reactor.h"
21 #include "ace/Acceptor.h"
22 #include "ace/Connector.h"
24 #include "ace/SOCK_Acceptor.h"
25 #include "ace/SOCK_Connector.h"
26 #include "ace/SOCK_Stream.h"
28 #include "ace/OS_NS_unistd.h"
29 #include "ace/OS_NS_netdb.h"
31 #include <queue>
33 typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> SVC_HANDLER;
35 // ----------------------------------------------------
37 class Client : public SVC_HANDLER
39 public:
41 Client (void);
43 //FUZZ: disable check_for_lack_ACE_OS
44 virtual int open (void * = 0);
45 //FUZZ: enable check_for_lack_ACE_OS
47 virtual int handle_output (ACE_HANDLE handle);
49 virtual int handle_input (ACE_HANDLE handle);
51 virtual int handle_timeout (const ACE_Time_Value &current_time,
52 const void *act);
54 virtual int handle_close (ACE_HANDLE handle,
55 ACE_Reactor_Mask mask);
57 std::string sent;
58 std::string received;
60 private:
61 unsigned int call_count_;
65 class Server : public SVC_HANDLER
67 public:
69 Server (void);
71 virtual int handle_input (ACE_HANDLE handle);
73 virtual int handle_output (ACE_HANDLE handle);
75 virtual int handle_close (ACE_HANDLE handle,
76 ACE_Reactor_Mask mask);
78 private:
79 int send_i (const char* buffer,
80 size_t size);
82 std::queue<std::string*> buffer_list_;
83 size_t offset_;
86 // ----------------------------------------------------
88 Client::Client (void)
89 : call_count_ (0)
93 int
94 Client::open (void *)
96 // Trigger writes on a timer.
97 ACE_Time_Value delay (1, 0);
98 ACE_Time_Value restart (1, 0);
99 if (this->reactor ()->schedule_timer (this,
101 delay,
102 restart) == -1)
104 ACE_ERROR_RETURN ((LM_ERROR,
105 ACE_TEXT ("(%t) %p\n"),
106 ACE_TEXT ("Unable to schedule client side ")
107 ACE_TEXT ("timer in ACE_Dev_Poll_Reactor")),
108 -1);
111 if (this->reactor ()->register_handler (this, ACE_Event_Handler::READ_MASK) == -1)
113 ACE_ERROR_RETURN ((LM_ERROR,
114 ACE_TEXT ("(%t) %p\n"),
115 ACE_TEXT ("Unable to register for reading ")
116 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
117 -1);
120 return 0;
124 Client::handle_output (ACE_HANDLE handle)
126 std::string buffer = "Hello, world!";
127 ssize_t bytes_sent = this->peer ().send (buffer.data (), buffer.size ());
129 ACE_DEBUG ((LM_DEBUG,
130 ACE_TEXT ("(%t) Client::handle_output; handle = %d")
131 ACE_TEXT (" bytes sent %d\n"),
132 handle,
133 bytes_sent));
135 if (bytes_sent == -1)
137 if (errno == EWOULDBLOCK)
138 return 0; // Flow control kicked in.
139 else if (errno == EPIPE || errno == ECONNRESET)
141 ACE_DEBUG ((LM_DEBUG,
142 ACE_TEXT ("(%t) Client::handle_output; server ")
143 ACE_TEXT ("closed handle %d\n"),
144 this->peer ().get_handle ()));
145 return -1;
147 else
148 ACE_ERROR_RETURN ((LM_ERROR,
149 ACE_TEXT ("(%t) %p\n"),
150 ACE_TEXT ("Client::handle_output")),
151 -1);
153 else if (bytes_sent == 0)
154 return -1;
155 else
156 this->sent.append (buffer.substr (0, bytes_sent));
158 return -1;
162 Client::handle_input (ACE_HANDLE handle)
164 for (;;)
166 char buffer[BUFSIZ];
167 ssize_t bytes_read = this->peer ().recv (buffer, BUFSIZ);
168 ACE_DEBUG ((LM_DEBUG,
169 ACE_TEXT ("(%t) Client::handle_input handle = %d bytes_read = %d\n"),
170 handle, bytes_read));
172 if (bytes_read == -1 && errno == EWOULDBLOCK)
174 return 0;
176 else if (bytes_read <= 0)
178 // Closed.
179 return -1;
181 else
183 this->received.append (buffer, bytes_read);
189 Client::handle_timeout (const ACE_Time_Value &, const void *)
191 ACE_DEBUG ((LM_INFO,
192 ACE_TEXT ("(%t) Expected client timeout occurred at: %T\n")));
194 if (this->call_count_ != 10)
196 // Register for write.
197 if (this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK) == -1)
199 ACE_ERROR_RETURN ((LM_ERROR,
200 ACE_TEXT ("(%t) %p\n"),
201 ACE_TEXT ("Unable to register for writing ")
202 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
203 -1);
205 this->call_count_++;
206 return 0;
208 else
210 // Shutdown.
211 if (this->reactor ()->end_reactor_event_loop () == 0)
212 ACE_DEBUG ((LM_INFO,
213 ACE_TEXT ("(%t) Successful client reactor shutdown.\n")));
214 else
215 ACE_ERROR ((LM_ERROR,
216 ACE_TEXT ("(%t) %p\n"),
217 ACE_TEXT ("Failed client reactor shutdown")));
219 // Force this service handler to be closed in either case.
220 return -1;
225 Client::handle_close (ACE_HANDLE handle,
226 ACE_Reactor_Mask mask)
228 ACE_DEBUG ((LM_DEBUG,
229 ACE_TEXT ("(%t) Client::handle_close handle = %d mask = %xd\n"), handle, mask));
230 return 0;
231 //return SVC_HANDLER::handle_close (handle, mask);
234 // ----------------------------------------------------
236 Server::Server (void)
237 : offset_ (0)
242 Server::handle_input (ACE_HANDLE handle)
244 for (;;)
246 char buffer[BUFSIZ];
247 ssize_t bytes_read = this->peer ().recv (buffer, BUFSIZ);
248 ACE_DEBUG ((LM_DEBUG,
249 ACE_TEXT ("(%t) Server::handle_input handle = %d bytes_read = %d\n"),
250 handle, bytes_read));
252 if (bytes_read == -1 && errno == EWOULDBLOCK)
254 ACE_DEBUG ((LM_DEBUG,
255 ACE_TEXT ("(%t) Server::handle_input handle = %d EWOULDBLOCK\n"),
256 handle));
257 return 0;
259 else if (bytes_read == 0)
261 // Closed.
262 ACE_DEBUG ((LM_DEBUG,
263 ACE_TEXT ("(%t) Server::handle_input handle = %d CLOSED\n"),
264 handle));
265 return -1;
267 else
269 if (send_i (buffer, bytes_read) == -1)
270 return -1;
276 Server::send_i (const char* buffer,
277 size_t size)
279 if (size == 0)
281 return 0;
284 if (buffer_list_.empty ())
286 // Register for write.
287 if (this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK) == -1)
289 ACE_ERROR_RETURN ((LM_ERROR,
290 ACE_TEXT ("(%t) %p\n"),
291 ACE_TEXT ("Unable to register for writing ")
292 ACE_TEXT ("in ACE_Dev_Poll_Reactor")),
293 -1);
297 buffer_list_.push (new std::string (buffer, size));
298 return 0;
302 Server::handle_output (ACE_HANDLE handle)
304 while (!buffer_list_.empty ())
306 size_t bytes_to_send = buffer_list_.front ()->size () - offset_;
307 ssize_t bytes_sent = this->peer ().send (buffer_list_.front ()->data () + offset_, bytes_to_send);
309 ACE_DEBUG ((LM_DEBUG,
310 ACE_TEXT ("(%t) Server::handle_output; handle = %d")
311 ACE_TEXT (" bytes sent %d\n"),
312 handle, bytes_sent));
314 if (bytes_sent == -1)
316 if (errno == EWOULDBLOCK)
317 return 0;
318 else if (errno == EPIPE || errno == ECONNRESET)
320 ACE_DEBUG ((LM_DEBUG,
321 ACE_TEXT ("(%t) Client::handle_output; server ")
322 ACE_TEXT ("closed handle %d\n"),
323 this->peer ().get_handle ()));
324 return -1;
326 else
327 ACE_ERROR_RETURN ((LM_ERROR,
328 ACE_TEXT ("(%t) %p\n"),
329 ACE_TEXT ("Client::handle_output")),
330 -1);
332 else if (bytes_sent == 0)
333 return -1;
334 else
336 if (bytes_sent == static_cast<ssize_t> (bytes_to_send))
338 delete buffer_list_.front ();
339 buffer_list_.pop ();
340 offset_ = 0;
342 else
344 offset_ += bytes_sent;
349 return -1;
353 Server::handle_close (ACE_HANDLE handle,
354 ACE_Reactor_Mask mask)
356 ACE_DEBUG ((LM_DEBUG,
357 ACE_TEXT ("(%t) Server::handle_close handle = %d mask = %xd\n"), handle, mask));
358 return 0;
359 //return SVC_HANDLER::handle_close (handle, mask);
362 // ----------------------------------------------------
364 typedef ACE_Acceptor<Server, ACE_SOCK_ACCEPTOR> ACCEPTOR;
365 typedef ACE_Connector<Client, ACE_SOCK_CONNECTOR> CONNECTOR;
367 // ----------------------------------------------------
369 class TestAcceptor : public ACCEPTOR
371 public:
373 virtual int accept_svc_handler (Server * handler)
375 int result = this->ACCEPTOR::accept_svc_handler (handler);
377 if (result != 0)
379 if (errno != EWOULDBLOCK)
380 ACE_ERROR ((LM_ERROR,
381 ACE_TEXT ("(%t) %p\n"),
382 ACE_TEXT ("Unable to accept connection")));
384 return result;
387 ACE_DEBUG ((LM_DEBUG,
388 ACE_TEXT ("(%t) Accepted connection. ")
389 ACE_TEXT ("Stream handle: <%d>\n"),
390 handler->get_handle ()));
392 return result;
397 // ----------------------------------------------------
399 class TestConnector : public CONNECTOR
401 public:
403 virtual int connect_svc_handler (
404 CONNECTOR::handler_type *& handler,
405 const CONNECTOR::addr_type &remote_addr,
406 ACE_Time_Value *timeout,
407 const CONNECTOR::addr_type &local_addr,
408 int reuse_addr,
409 int flags,
410 int perms)
412 const int result = this->CONNECTOR::connect_svc_handler (handler,
413 remote_addr,
414 timeout,
415 local_addr,
416 reuse_addr,
417 flags,
418 perms);
420 if (result != 0)
421 return result;
423 ACE_TCHAR hostname[MAXHOSTNAMELEN];
424 if (remote_addr.get_host_name (hostname,
425 sizeof (hostname)) != 0)
427 ACE_ERROR_RETURN ((LM_ERROR,
428 ACE_TEXT ("(%t) %p\n"),
429 ACE_TEXT ("Unable to retrieve hostname")),
430 -1);
433 ACE_DEBUG ((LM_DEBUG,
434 ACE_TEXT ("(%t) Connected to <%s:%d>.\n"),
435 hostname,
436 (int) remote_addr.get_port_number ()));
438 return result;
441 virtual int connect_svc_handler (
442 CONNECTOR::handler_type *& handler,
443 CONNECTOR::handler_type *& sh_copy,
444 const CONNECTOR::addr_type &remote_addr,
445 ACE_Time_Value *timeout,
446 const CONNECTOR::addr_type &local_addr,
447 int reuse_addr,
448 int flags,
449 int perms) {
450 sh_copy = handler;
451 return this->connect_svc_handler (handler, remote_addr, timeout,
452 local_addr, reuse_addr, flags,
453 perms);
457 // ----------------------------------------------------
459 static int
460 disable_signal (int sigmin, int sigmax)
462 #if !defined (ACE_LACKS_UNIX_SIGNALS)
463 sigset_t signal_set;
464 if (ACE_OS::sigemptyset (&signal_set) == - 1)
465 ACE_ERROR ((LM_ERROR,
466 ACE_TEXT ("Error: (%P|%t):%p\n"),
467 ACE_TEXT ("sigemptyset failed")));
469 for (int i = sigmin; i <= sigmax; i++)
470 ACE_OS::sigaddset (&signal_set, i);
472 // Put the <signal_set>.
473 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
474 // In multi-threaded application this is not POSIX compliant
475 // but let's leave it just in case.
476 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
477 # else
478 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
479 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
480 ACE_ERROR_RETURN ((LM_ERROR,
481 ACE_TEXT ("Error: (%P|%t): %p\n"),
482 ACE_TEXT ("SIG_BLOCK failed")),
483 -1);
484 #else
485 ACE_UNUSED_ARG (sigmin);
486 ACE_UNUSED_ARG (sigmax);
487 #endif /* ACE_LACKS_UNIX_SIGNALS */
489 return 0;
492 // ----------------------------------------------------
495 run_main (int, ACE_TCHAR *[])
497 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Echo_Test"));
499 // Make sure we ignore SIGPIPE
500 disable_signal (SIGPIPE, SIGPIPE);
502 ACE_Dev_Poll_Reactor dp_reactor;
503 dp_reactor.restart (1); // Restart on EINTR
504 ACE_Reactor reactor (&dp_reactor);
506 TestConnector client;
508 int flags = 0;
509 ACE_SET_BITS (flags, ACE_NONBLOCK); // Enable non-blocking in the
510 // Svc_Handlers.
512 if (client.open (&reactor, flags) != 0)
513 ACE_ERROR_RETURN ((LM_ERROR,
514 ACE_TEXT ("(%t) %p\n"),
515 ACE_TEXT ("Unable to open client service handler")),
516 -1);
518 unsigned short port = 54678;
520 ACE_INET_Addr addr;
522 if (addr.set (port, INADDR_LOOPBACK) != 0)
523 ACE_ERROR_RETURN ((LM_ERROR,
524 ACE_TEXT ("(%t) %p\n"),
525 ACE_TEXT ("server_worker - ACE_INET_Addr::set")),
526 -1);
528 TestAcceptor server;
530 if (server.open (addr, &reactor, flags) != 0)
531 ACE_ERROR_RETURN ((LM_ERROR,
532 ACE_TEXT ("(%t) %p\n"),
533 ACE_TEXT ("Unable to open server service handler")),
534 -1);
536 Client *client_handler = 0;
538 if (client.connect (client_handler, addr) != 0)
539 ACE_ERROR_RETURN ((LM_ERROR,
540 ACE_TEXT ("(%t) %p\n"),
541 ACE_TEXT ("Unable to connect to server")),
542 -1);
544 if (reactor.run_reactor_event_loop () != 0)
545 ACE_ERROR_RETURN ((LM_ERROR,
546 ACE_TEXT ("(%t) %p\n"),
547 ACE_TEXT ("Error when running client ")
548 ACE_TEXT ("reactor event loop")),
549 -1);
551 ACE_DEBUG((LM_DEBUG, "sent: %C\n", client_handler->sent.c_str ()));
552 ACE_DEBUG((LM_DEBUG, "received: %C\n", client_handler->received.c_str ()));
554 ACE_TEST_ASSERT (client_handler->sent == client_handler->received);
556 ACE_END_TEST;
558 return 0;
561 #else
564 run_main (int, ACE_TCHAR *[])
566 ACE_START_TEST (ACE_TEXT ("Dev_Poll_Reactor_Echo_Test"));
567 ACE_ERROR ((LM_INFO,
568 ACE_TEXT ("Dev Poll and Event Poll are not supported ")
569 ACE_TEXT ("on this platform\n")));
570 ACE_END_TEST;
571 return 0;
574 #endif /* ACE_HAS_DEV_POLL || ACE_HAS_EVENT_POLL */