Doxygen changes
[ACE_TAO.git] / ACE / tests / Proactor_UDP_Test.cpp
blobbb0ea4863cb7a3274c3085b7a09ed5b9e31068b5
1 // ============================================================================
2 /**
3 * @file Proactor_UDP_Test.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that uses UDP/IP communications.
8 * @author Steve Huston <shuston@riverace.com>, based on Proactor_Test.cpp
9 */
10 // ============================================================================
12 #include "test_config.h"
14 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
15 // This only works on Win32 platforms and on Unix platforms
16 // supporting POSIX aio calls.
18 #include "ace/Signal.h"
20 #include "ace/Service_Config.h"
21 #include "ace/INET_Addr.h"
22 #include "ace/SOCK_CODgram.h"
23 #include "ace/SOCK_Dgram.h"
24 #include "ace/Object_Manager.h"
25 #include "ace/Get_Opt.h"
27 #include "ace/Proactor.h"
28 #include "ace/Task.h"
29 #include "ace/Thread_Semaphore.h"
30 #include "ace/OS_NS_ctype.h"
31 #include "ace/OS_NS_errno.h"
32 #include "ace/OS_NS_signal.h"
33 #include "ace/OS_NS_string.h"
34 #include "ace/OS_NS_unistd.h"
35 #include "ace/OS_NS_sys_socket.h"
36 #include "ace/Sock_Connect.h"
37 #include "ace/os_include/netinet/os_tcp.h"
39 #include "ace/Atomic_Op.h"
40 #include "ace/Synch_Traits.h"
42 #if defined (ACE_WIN32)
44 # include "ace/WIN32_Proactor.h"
46 #elif defined (ACE_HAS_AIO_CALLS)
48 # include "ace/POSIX_Proactor.h"
49 # include "ace/POSIX_CB_Proactor.h"
50 # include "ace/SUN_Proactor.h"
52 #endif /* ACE_WIN32 */
54 // Proactor Type (UNIX only, Win32 ignored)
55 typedef enum { DEFAULT = 0, AIOCB, SIG, SUN, CB } ProactorType;
56 static ProactorType proactor_type = DEFAULT;
58 // POSIX : > 0 max number aio operations proactor,
59 static size_t max_aio_operations = 0;
61 // both: 0 run client or server / depends on host
62 // != 0 run client and server
63 static int both = 0;
65 // Host that we're connecting to.
66 static const ACE_TCHAR *host = 0;
68 // number of Client instances
69 static int clients = 1;
70 const int MAX_CLIENTS = 1000;
71 const int MAX_SERVERS = 1000;
73 // duplex mode: == 0 half-duplex
74 // != 0 full duplex
75 static int duplex = 0;
77 // number threads in the Proactor thread pool
78 static int threads = 1;
80 // Port that we're receiving session initiations on.
81 static u_short port = ACE_DEFAULT_SERVER_PORT;
83 // Log options
84 static int loglevel; // 0 full , 1 only errors
86 static size_t xfer_limit; // Number of bytes for Client to send.
88 static char complete_message[] =
89 "GET / HTTP/1.1\r\n"
90 "Accept: */*\r\n"
91 "Accept-Language: C++\r\n"
92 "Accept-Encoding: gzip, deflate\r\n"
93 "User-Agent: Proactor_Test/1.0 (non-compatible)\r\n"
94 "Connection: Keep-Alive\r\n"
95 "\r\n";
97 static char close_req_msg[] = "CLOSE";
98 static char close_ack_msg[] = "CLOSE-ACK";
100 class LogLocker
102 public:
104 LogLocker () { ACE_LOG_MSG->acquire (); }
105 virtual ~LogLocker () { ACE_LOG_MSG->release (); }
108 // Function to remove signals from the signal mask.
109 static int
110 disable_signal (int sigmin, int sigmax)
112 #if !defined (ACE_LACKS_UNIX_SIGNALS)
113 sigset_t signal_set;
114 if (ACE_OS::sigemptyset (&signal_set) == - 1)
115 ACE_ERROR ((LM_ERROR,
116 ACE_TEXT ("Error: (%P|%t):%p\n"),
117 ACE_TEXT ("sigemptyset failed")));
119 for (int i = sigmin; i <= sigmax; i++)
120 ACE_OS::sigaddset (&signal_set, i);
122 // Put the <signal_set>.
123 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
124 // In multi-threaded application this is not POSIX compliant
125 // but let's leave it just in case.
126 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
127 # else
128 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
129 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
130 ACE_ERROR_RETURN ((LM_ERROR,
131 ACE_TEXT ("Error: (%P|%t): %p\n"),
132 ACE_TEXT ("SIG_BLOCK failed")),
133 -1);
134 #else
135 ACE_UNUSED_ARG (sigmin);
136 ACE_UNUSED_ARG (sigmax);
137 #endif /* ACE_LACKS_UNIX_SIGNALS */
139 return 0;
142 // *************************************************************
143 // MyTask is ACE_Task resposible for :
144 // 1. creation and deletion of
145 // Proactor and Proactor thread pool
146 // 2. running Proactor event loop
147 // *************************************************************
150 * @class MyTask
152 * MyTask plays role for Proactor threads pool
154 * MyTask is ACE_Task resposible for:
155 * 1. Creation and deletion of Proactor and Proactor thread pool
156 * 2. Running Proactor event loop
158 class MyTask : public ACE_Task<ACE_MT_SYNCH>
160 public:
161 MyTask (void):
162 lock_ (),
163 sem_ ((unsigned int) 0),
164 proactor_(0) {}
166 virtual ~MyTask()
168 (void) this->stop ();
169 (void) this->delete_proactor();
172 virtual int svc (void);
174 int start (int num_threads,
175 ProactorType type_proactor,
176 size_t max_op );
177 int stop (void);
179 private:
180 int create_proactor (ProactorType type_proactor,
181 size_t max_op);
182 int delete_proactor (void);
184 ACE_SYNCH_RECURSIVE_MUTEX lock_;
185 ACE_Thread_Semaphore sem_;
186 ACE_Proactor * proactor_;
191 MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
193 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
194 monitor,
195 this->lock_,
196 -1);
198 ACE_TEST_ASSERT (this->proactor_ == 0);
200 #if defined (ACE_WIN32)
202 ACE_UNUSED_ARG (type_proactor);
203 ACE_UNUSED_ARG (max_op);
205 ACE_WIN32_Proactor *proactor_impl = 0;
207 ACE_NEW_RETURN (proactor_impl,
208 ACE_WIN32_Proactor,
209 -1);
211 ACE_DEBUG ((LM_DEBUG,
212 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
214 #elif defined (ACE_HAS_AIO_CALLS)
216 ACE_POSIX_Proactor * proactor_impl = 0;
218 switch (type_proactor)
220 case AIOCB:
221 ACE_NEW_RETURN (proactor_impl,
222 ACE_POSIX_AIOCB_Proactor (max_op),
223 -1);
224 ACE_DEBUG ((LM_DEBUG,
225 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
226 break;
228 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
229 case SIG:
230 ACE_NEW_RETURN (proactor_impl,
231 ACE_POSIX_SIG_Proactor (max_op),
232 -1);
233 ACE_DEBUG ((LM_DEBUG,
234 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
235 break;
236 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
238 # if defined (sun)
239 case SUN:
240 ACE_NEW_RETURN (proactor_impl,
241 ACE_SUN_Proactor (max_op),
242 -1);
243 ACE_DEBUG ((LM_DEBUG,
244 ACE_TEXT("(%t) Create Proactor Type = SUN\n")));
245 break;
246 # endif /* sun */
248 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
249 case CB:
250 ACE_NEW_RETURN (proactor_impl,
251 ACE_POSIX_CB_Proactor (max_op),
252 -1);
253 ACE_DEBUG ((LM_DEBUG,
254 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
255 break;
256 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
258 default:
259 ACE_DEBUG ((LM_DEBUG,
260 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
261 break;
264 #endif /* ACE_WIN32 */
266 // always delete implementation 1 , not !(proactor_impl == 0)
267 ACE_NEW_RETURN (this->proactor_,
268 ACE_Proactor (proactor_impl, 1 ),
269 -1);
270 // Set new singleton and delete it in close_singleton()
271 ACE_Proactor::instance (this->proactor_, 1);
272 return 0;
276 MyTask::delete_proactor (void)
278 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
279 monitor,
280 this->lock_,
281 -1);
283 ACE_DEBUG ((LM_DEBUG,
284 ACE_TEXT ("(%t) Delete Proactor\n")));
286 ACE_Proactor::close_singleton ();
287 this->proactor_ = 0;
289 return 0;
293 MyTask::start (int num_threads,
294 ProactorType type_proactor,
295 size_t max_op)
297 if (this->create_proactor (type_proactor, max_op) == -1)
298 ACE_ERROR_RETURN ((LM_ERROR,
299 ACE_TEXT ("%p.\n"),
300 ACE_TEXT ("unable to create proactor")),
301 -1);
303 if (this->activate (THR_NEW_LWP, num_threads) == -1)
304 ACE_ERROR_RETURN ((LM_ERROR,
305 ACE_TEXT ("%p.\n"),
306 ACE_TEXT ("unable to activate thread pool")),
307 -1);
309 for (; num_threads > 0; num_threads--)
311 sem_.acquire ();
314 return 0;
319 MyTask::stop ()
321 if (this->proactor_ != 0)
323 ACE_DEBUG ((LM_DEBUG,
324 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
326 this->proactor_->proactor_end_event_loop ();
329 if (this->wait () == -1)
330 ACE_ERROR ((LM_ERROR,
331 ACE_TEXT ("%p.\n"),
332 ACE_TEXT ("unable to stop thread pool")));
334 return 0;
338 MyTask::svc (void)
340 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n")));
342 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
343 disable_signal (SIGPIPE, SIGPIPE);
345 // signal that we are ready
346 sem_.release (1);
348 this->proactor_->proactor_run_event_loop ();
350 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n")));
351 return 0;
354 // forward declaration
355 class TestData;
357 // "Server" is one side of a session. It's the same idea as in TCP, but
358 // there's no acceptor in UDP; sessions are started by the client sending
359 // a "start" datagram to a well-known UDP port. The start datagram tells
360 // which port number the client is receiving on. The server then sends an
361 // "ack" datagram to indicate the session is set up successfully and to say
362 // which port the server is listening on. Thus, a unique pairing of server
363 // and client port numbers is established. Each session will require a
364 // separate server-side socket as well as the client. Note that experienced
365 // UDP programmers will be quivering at this point knowing that there's no
366 // reason to have multiple server-side sockets, and no real reason to
367 // pre-register the client ports either since all the addressing info is
368 // available on normal UDP programming. However, this is all necessary in
369 // the POSIX case since the POSIX aio functions were not designed with UDP
370 // in mind, and the addressing information is not available in UDP receive
371 // completion callbacks; thus, each socket needs to be fully connected before
372 // running session data. The addressing information needed to run this
373 // use-case in the "normal" way is available on Windows, but this test runs
374 // across many platforms, so can't rely on that information.
375 class Server : public ACE_Handler
377 public:
378 Server ();
379 Server (TestData *tester, int id);
380 ~Server (void);
382 int id (void) { return this->id_; }
383 size_t get_total_snd (void) { return this->total_snd_; }
384 size_t get_total_rcv (void) { return this->total_rcv_; }
385 long get_total_w (void) { return this->total_w_; }
386 long get_total_r (void) { return this->total_r_; }
388 /// This is called after the new session has been established.
389 void go (ACE_HANDLE handle, const ACE_INET_Addr &client);
391 void cancel ();
393 protected:
395 * @name AIO callback handling
397 * These methods are called by the framework
399 /// This is called when asynchronous <read> operation from the
400 /// socket completes.
401 virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
403 /// This is called when an asynchronous <write> to the socket
404 /// completes.
405 virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);
407 private:
408 int initiate_read (void);
409 int initiate_write (ACE_Message_Block *mb, size_t nbytes);
411 TestData *tester_;
412 int id_;
414 ACE_INET_Addr client_addr_;
415 ACE_Asynch_Read_Dgram rs_;
416 ACE_Asynch_Write_Dgram ws_;
417 ACE_SYNCH_MUTEX lock_;
419 int io_count_; // Number of currently outstanding I/O requests
420 bool flg_cancel_;
421 bool flg_closing_;
422 size_t total_snd_; // Number of bytes successfully sent
423 size_t total_rcv_; // Number of bytes successfully received
424 int total_w_; // Number of write operations
425 int total_r_; // Number of read operations
428 // *******************************************
429 // Client
430 // *******************************************
432 class Client : public ACE_Handler
434 public:
435 Client ();
436 Client (TestData *tester, int id);
437 ~Client (void);
439 void go (ACE_HANDLE h, const ACE_INET_Addr &server);
440 int id (void) { return this->id_; }
441 size_t get_total_snd (void) { return this->total_snd_; }
442 size_t get_total_rcv (void) { return this->total_rcv_; }
443 int get_total_w (void) { return this->total_w_; }
444 int get_total_r (void) { return this->total_r_; }
446 // This is called when asynchronous reads from the socket complete
447 virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
449 // This is called when asynchronous writes from the socket complete
450 virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);
452 void cancel (void);
454 private:
455 int initiate_read (void);
456 int initiate_write (void);
457 // FUZZ: disable check_for_lack_ACE_OS
458 void close (void);
459 // FUZZ: enable check_for_lack_ACE_OS
461 TestData *tester_;
462 int id_;
464 ACE_INET_Addr server_addr_;
465 ACE_Asynch_Read_Dgram rs_;
466 ACE_Asynch_Write_Dgram ws_;
468 ACE_SYNCH_MUTEX lock_;
470 int io_count_;
471 int stop_writing_; // Writes are shut down; just read.
472 bool flg_cancel_;
473 size_t total_snd_;
474 size_t total_rcv_;
475 int total_w_;
476 int total_r_;
479 // TestData collects and reports on test-related transfer and connection
480 // statistics.
481 class TestData
483 public:
484 TestData ();
485 bool testing_done (void);
486 Server *server_up (void);
487 Client *client_up (void);
488 void server_done (Server *s);
489 void client_done (Client *c);
490 void stop_all (void);
491 void report (void);
493 private:
494 struct Local_Stats
496 // Track number of sessions that report start, and those that report
497 // their end (and stats).
498 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_up_;
499 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_down_;
501 // Total read and write bytes for all sessions.
502 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_cnt_;
503 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_cnt_;
504 // Total read and write operations issues for all sessions.
505 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_ops_;
506 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_ops_;
507 } servers_, clients_;
509 ACE_SYNCH_MUTEX list_lock_;
510 Server *server_list_[MAX_SERVERS];
511 Client *client_list_[MAX_CLIENTS];
514 TestData::TestData ()
516 int i;
517 for (i = 0; i < MAX_SERVERS; ++i)
518 this->server_list_[i] = 0;
519 for (i = 0; i < MAX_CLIENTS; ++i)
520 this->client_list_[i] = 0;
523 bool
524 TestData::testing_done (void)
526 int svr_up = this->servers_.sessions_up_.value ();
527 int svr_dn = this->servers_.sessions_down_.value ();
528 int clt_up = this->clients_.sessions_up_.value ();
529 int clt_dn = this->clients_.sessions_down_.value ();
531 if (svr_up == 0 && clt_up == 0) // No connections up yet
532 return false;
534 return (svr_dn >= svr_up && clt_dn >= clt_up);
537 Server *
538 TestData::server_up (void)
540 ++this->servers_.sessions_up_;
541 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
543 for (int i = 0; i < MAX_SERVERS; ++i)
545 if (this->server_list_[i] == 0)
547 ACE_NEW_RETURN (this->server_list_[i], Server (this, i), 0);
548 ACE_DEBUG ((LM_DEBUG,
549 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
551 this->servers_.sessions_up_.value (),
552 this->servers_.sessions_down_.value ()));
553 return this->server_list_[i];
556 return 0;
559 Client *
560 TestData::client_up (void)
562 ++this->clients_.sessions_up_;
563 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
565 for (int i = 0; i < MAX_CLIENTS; ++i)
567 if (this->client_list_[i] == 0)
569 ACE_NEW_RETURN (this->client_list_[i], Client (this, i), 0);
570 ACE_DEBUG ((LM_DEBUG,
571 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
573 this->clients_.sessions_up_.value (),
574 this->clients_.sessions_down_.value ()));
575 return this->client_list_[i];
578 return 0;
581 void
582 TestData::server_done (Server *s)
584 this->servers_.w_cnt_ += s->get_total_snd ();
585 this->servers_.r_cnt_ += s->get_total_rcv ();
586 this->servers_.w_ops_ += s->get_total_w ();
587 this->servers_.r_ops_ += s->get_total_r ();
588 ++this->servers_.sessions_down_;
589 ACE_DEBUG ((LM_DEBUG,
590 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
591 s->id (),
592 this->servers_.sessions_up_.value (),
593 this->servers_.sessions_down_.value ()));
595 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
596 int i;
597 for (i = 0; i < MAX_SERVERS; ++i)
599 if (this->server_list_[i] == s)
601 if (s->id () != i)
602 ACE_ERROR ((LM_ERROR,
603 ACE_TEXT ("Server %d is pos %d in list\n"),
604 s->id (),
605 i));
606 this->server_list_[i] = 0;
607 break;
610 if (i >= MAX_SERVERS)
611 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Server %@ done but not listed\n"), s));
613 return;
616 void
617 TestData::client_done (Client *c)
619 this->clients_.w_cnt_ += c->get_total_snd ();
620 this->clients_.r_cnt_ += c->get_total_rcv ();
621 this->clients_.w_ops_ += c->get_total_w ();
622 this->clients_.r_ops_ += c->get_total_r ();
623 ++this->clients_.sessions_down_;
624 ACE_DEBUG ((LM_DEBUG,
625 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
626 c->id (),
627 this->clients_.sessions_up_.value (),
628 this->clients_.sessions_down_.value ()));
630 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
631 int i;
632 for (i = 0; i < MAX_CLIENTS; ++i)
634 if (this->client_list_[i] == c)
636 if (c->id () != i)
637 ACE_ERROR ((LM_ERROR,
638 ACE_TEXT ("Client %d is pos %d in list\n"),
639 c->id (),
640 i));
641 this->client_list_[i] = 0;
642 break;
645 if (i >= MAX_CLIENTS)
646 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Client %@ done but not listed\n"), c));
648 return;
651 void
652 TestData::stop_all (void)
654 int i;
656 // Lock and cancel everything. Then release the lock, possibly allowing
657 // cleanups, then grab it again and delete all Servers and Clients.
659 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
660 for (i = 0; i < MAX_CLIENTS; ++i)
662 if (this->client_list_[i] != 0)
663 this->client_list_[i]->cancel ();
666 for (i = 0; i < MAX_SERVERS; ++i)
668 if (this->server_list_[i] != 0)
669 this->server_list_[i]->cancel ();
673 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
674 for (i = 0; i < MAX_CLIENTS; ++i)
676 if (this->client_list_[i] != 0)
677 delete this->client_list_[i];
680 for (i = 0; i < MAX_SERVERS; ++i)
682 if (this->server_list_[i] != 0)
683 delete this->server_list_[i];
688 void
689 TestData::report (void)
691 // Print statistics
692 ACE_TCHAR bufs [256];
693 ACE_TCHAR bufr [256];
695 ACE_OS::snprintf (bufs, 256,
696 ACE_SIZE_T_FORMAT_SPECIFIER
697 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
698 this->clients_.w_cnt_.value (),
699 this->clients_.w_ops_.value ());
701 ACE_OS::snprintf (bufr, 256,
702 ACE_SIZE_T_FORMAT_SPECIFIER
703 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
704 this->clients_.r_cnt_.value (),
705 this->clients_.r_ops_.value ());
707 ACE_DEBUG ((LM_DEBUG,
708 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
709 bufs,
710 bufr));
712 ACE_OS::snprintf (bufs, 256,
713 ACE_SIZE_T_FORMAT_SPECIFIER
714 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
715 this->servers_.w_cnt_.value (),
716 this->servers_.w_ops_.value ());
718 ACE_OS::snprintf (bufr, 256,
719 ACE_SIZE_T_FORMAT_SPECIFIER
720 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
721 this->servers_.r_cnt_.value (),
722 this->servers_.r_ops_.value ());
724 ACE_DEBUG ((LM_DEBUG,
725 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
726 bufs,
727 bufr));
729 if (this->clients_.w_cnt_.value () == 0 ||
730 this->clients_.r_cnt_.value () == 0 ||
731 this->servers_.w_cnt_.value () == 0 ||
732 this->servers_.r_cnt_.value () == 0 )
733 ACE_ERROR ((LM_ERROR, ACE_TEXT ("It appears that this test didn't ")
734 ACE_TEXT ("really do anything. Something is very wrong.\n")));
737 // Session set-up struct.
738 struct Session_Data
740 ACE_INT32 direction_; // 0 == Start, 1 == Ack
741 ACE_INT32 addr_; // Network byte order, must be IPv4
742 ACE_UINT16 port_; // UDP port, network byte order
743 Session_Data() { ACE_OS::memset (this, 0, sizeof(*this)); }
746 // Master is the server-side receiver of session establishment requests.
747 // For each "start" dgram received, instantiates a new Server object,
748 // indicating the addressing info for the client.
749 // Master is initialized with a count of the number of expected sessions. After
750 // this number are set up, Master will stop listening for session requests.
751 // This is a bit fragile but is necessary because on HP-UX, AIX, et al., it
752 // is impossible to close/cancel a socket with an outstanding UDP recieve
753 // (on AIX the process is so wedged the machine needs to be rebooted to
754 // clear it!). So, this bit of messiness is necessary for portability.
755 // When the Master is destroyed, it will try to stop establishing sessions
756 // but this will only work on Windows.
757 class Master : public ACE_Handler
759 public:
760 Master (TestData *tester, const ACE_INET_Addr &recv_addr, int expected);
761 ~Master (void);
763 // Called when dgram receive operation completes.
764 virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
766 private:
767 void start_recv (void);
769 TestData *tester_;
770 ACE_INET_Addr recv_addr_;
771 ACE_SOCK_Dgram sock_;
772 ACE_Asynch_Read_Dgram rd_;
773 ACE_Message_Block *mb_;
774 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_expected_;
775 volatile bool recv_in_progress_;
778 // *************************************************************
779 Master::Master (TestData *tester, const ACE_INET_Addr &recv_addr, int expected)
780 : tester_ (tester),
781 recv_addr_ (recv_addr),
782 mb_ (0),
783 sessions_expected_ (expected),
784 recv_in_progress_ (false)
786 if (this->sock_.open (recv_addr) == -1)
787 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Master socket %p\n"), ACE_TEXT ("open")));
788 else
790 if (this->rd_.open (*this, this->sock_.get_handle ()) == -1)
791 ACE_ERROR ((LM_ERROR,
792 ACE_TEXT ("Master reader %p\n"),
793 ACE_TEXT ("open")));
794 this->mb_ = new ACE_Message_Block (sizeof (Session_Data));
795 start_recv ();
799 Master::~Master (void)
801 if (this->recv_in_progress_)
802 this->rd_.cancel ();
803 this->sock_.close ();
805 if (this->mb_ != 0)
807 this->mb_->release ();
808 this->mb_ = 0;
812 void
813 Master::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
815 // We should only receive Start datagrams with valid addresses to reply to.
816 if (result.success ())
818 if (result.bytes_transferred () != sizeof (Session_Data))
819 ACE_ERROR ((LM_ERROR,
820 ACE_TEXT ("(%t) Master session data expected %B bytes; ")
821 ACE_TEXT ("received %B\n"),
822 sizeof (Session_Data),
823 result.bytes_transferred ()));
824 else
826 ACE_Message_Block *mb = result.message_block ();
827 Session_Data *session =
828 reinterpret_cast<Session_Data*>(mb->rd_ptr ());
829 if (session->direction_ == 0)
831 ACE_INET_Addr client_addr, me_addr;
832 ACE_TCHAR client_str[80], me_str[80];
833 client_addr.set ((u_short)session->port_, session->addr_, 0);
834 client_addr.addr_to_string (client_str, 80);
836 // Set up the local and remote addresses - need fully-specified
837 // addresses to use UDP aio on Linux. This is the socket that
838 // the session will run over. The addressing info to be sent
839 // back to the Client will be sent over the receive socket
840 // to ensure it goes back to the client initiating the session.
841 ACE_SOCK_CODgram sock;
842 if (sock.open (client_addr) == -1)
844 ACE_ERROR ((LM_ERROR,
845 ACE_TEXT ("(%t) Master new socket for ")
846 ACE_TEXT ("client %s: %p\n"),
847 client_str,
848 ACE_TEXT ("open")));
850 else
852 sock.get_local_addr (me_addr);
853 me_addr.addr_to_string (me_str, 80);
854 ACE_DEBUG ((LM_DEBUG,
855 ACE_TEXT ("(%t) Master setting up server for ")
856 ACE_TEXT ("local %s, peer %s\n"),
857 me_str,
858 client_str));
860 Session_Data session;
861 session.direction_ = 1; // Ack
862 session.addr_ = ACE_HTONL (me_addr.get_ip_address ());
863 session.port_ = ACE_HTONS (me_addr.get_port_number ());
864 if (this->sock_.send (&session,
865 sizeof (session),
866 client_addr) == -1)
868 ACE_ERROR ((LM_ERROR,
869 ACE_TEXT ("(%t) Master reply %p\n"),
870 ACE_TEXT ("send")));
871 sock.close ();
873 else
875 Server *server = this->tester_->server_up ();
876 server->go (sock.get_handle (), client_addr);
879 if (--this->sessions_expected_ == 0)
881 ACE_DEBUG ((LM_DEBUG,
882 ACE_TEXT ("All expected sessions are up\n")));
885 else
887 ACE_ERROR ((LM_ERROR,
888 ACE_TEXT ("(%t) Badly formed Session request\n")));
892 else
894 ACE_Log_Priority prio = LM_ERROR;
895 #if defined (ACE_WIN32)
896 if (result.error () == ERROR_OPERATION_ABORTED)
897 prio = LM_DEBUG;
898 #else
899 if (result.error () == ECANCELED)
900 prio = LM_DEBUG;
901 #endif /* ACE_WIN32 */
902 // Multiple steps to log the error without squashing errno.
903 ACE_LOG_MSG->conditional_set (__FILE__,
904 __LINE__,
906 (int)(result.error ()));
907 ACE_LOG_MSG->log (prio,
908 ACE_TEXT ("(%t) Master %p\n"),
909 ACE_TEXT ("recv"));
910 // If canceled, don't try to restart.
911 if (prio == LM_DEBUG)
912 return;
914 this->start_recv ();
917 void
918 Master::start_recv (void)
920 if (this->mb_ == 0)
921 return;
923 size_t unused = 0;
924 this->mb_->reset ();
925 if (this->rd_.recv (this->mb_, unused, 0) == -1)
926 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Master %p\n"), ACE_TEXT ("recv")));
927 else
928 this->recv_in_progress_ = true;
931 // ***************************************************
932 Server::Server ()
934 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
937 Server::Server (TestData *tester, int id)
938 : tester_ (tester),
939 id_ (id),
940 io_count_ (0),
941 flg_cancel_(false),
942 flg_closing_ (false),
943 total_snd_(0),
944 total_rcv_(0),
945 total_w_ (0),
946 total_r_ (0)
950 Server::~Server (void)
952 ACE_DEBUG ((LM_DEBUG,
953 ACE_TEXT ("(%t) Server %d dtor; %d sends (%B bytes); ")
954 ACE_TEXT ("%d recvs (%B bytes)\n"),
955 this->id_,
956 this->total_w_, this->total_snd_,
957 this->total_r_, this->total_rcv_));
958 if (this->io_count_ != 0)
959 ACE_ERROR ((LM_WARNING,
960 ACE_TEXT ("(%t) Server %d deleted with ")
961 ACE_TEXT ("%d I/O outstanding\n"),
962 this->id_,
963 this->io_count_));
965 // This test bounces data back and forth between Clients and Servers.
966 // Therefore, if there was significantly more data in one direction, that's
967 // a problem. Remember, the byte counts are unsigned values.
968 int issue_data_warning = 0;
969 if (this->total_snd_ > this->total_rcv_)
971 if (this->total_rcv_ == 0)
972 issue_data_warning = 1;
973 else if (this->total_snd_ / this->total_rcv_ > 2)
974 issue_data_warning = 1;
976 else
978 if (this->total_snd_ == 0)
979 issue_data_warning = 1;
980 else if (this->total_rcv_ / this->total_snd_ > 2)
981 issue_data_warning = 1;
983 if (issue_data_warning)
984 ACE_DEBUG ((LM_WARNING,
985 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
987 if (this->tester_ != 0)
988 this->tester_->server_done (this);
990 if (this->handle () != ACE_INVALID_HANDLE)
991 ACE_OS::closesocket (this->handle ());
993 this->id_ = -1;
994 this->handle (ACE_INVALID_HANDLE);
997 void
998 Server::cancel ()
1000 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1002 this->flg_cancel_ = true;
1003 this->ws_.cancel ();
1004 this->rs_.cancel ();
1005 return;
1008 void
1009 Server::go (ACE_HANDLE handle, const ACE_INET_Addr &client)
1011 this->handle (handle);
1012 this->client_addr_.set (client);
1014 // Lock this before initiating I/O, else it may complete while we're
1015 // still setting up.
1017 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1019 if (this->ws_.open (*this, this->handle ()) == -1)
1020 ACE_ERROR ((LM_ERROR,
1021 ACE_TEXT ("(%t) %p\n"),
1022 ACE_TEXT ("Server::ACE_Asynch_Write_Dgram::open")));
1023 else if (this->rs_.open (*this, this->handle ()) == -1)
1024 ACE_ERROR ((LM_ERROR,
1025 ACE_TEXT ("(%t) %p\n"),
1026 ACE_TEXT ("Server::ACE_Asynch_Read_Dgram::open")));
1027 else
1028 this->initiate_read ();
1031 if (this->io_count_ > 0)
1032 return;
1034 delete this; // Error setting up I/O factories
1038 Server::initiate_read (void)
1040 if (this->flg_cancel_ || this->handle () == ACE_INVALID_HANDLE)
1041 return -1;
1043 ACE_Message_Block *mb = 0;
1044 ACE_NEW_RETURN (mb,
1045 ACE_Message_Block (1024), //BUFSIZ + 1),
1046 -1);
1048 // Inititiate receive
1049 size_t unused = 0;
1050 if (this->rs_.recv (mb, unused, 0) == -1)
1052 mb->release ();
1053 ACE_ERROR_RETURN ((LM_ERROR,
1054 ACE_TEXT ("(%t) Server %d, %p\n"),
1055 this->id_,
1056 ACE_TEXT ("read")),
1057 -1);
1060 this->io_count_++;
1061 this->total_r_++;
1062 return 0;
1066 Server::initiate_write (ACE_Message_Block *mb, size_t nbytes)
1068 if (this->flg_cancel_ || this->handle () == ACE_INVALID_HANDLE)
1070 mb->release ();
1071 return -1;
1074 if (nbytes == 0)
1076 mb->release ();
1077 ACE_ERROR_RETURN((LM_ERROR,
1078 ACE_TEXT ("(%t) Server %d write nbytes == 0\n"),
1079 this->id_),
1080 -1);
1083 if (this->ws_.send (mb, nbytes, 0, this->client_addr_) == -1)
1085 mb->release ();
1086 ACE_ERROR_RETURN((LM_ERROR,
1087 ACE_TEXT ("(%t) Server %d, %p\n"),
1088 this->id_,
1089 ACE_TEXT ("write")),
1090 -1);
1093 this->io_count_++;
1094 this->total_w_++;
1095 return 0;
1098 void
1099 Server::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
1102 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
1104 ACE_Message_Block *mb = result.message_block ();
1106 // Reset pointers.
1107 mb->rd_ptr ()[result.bytes_transferred ()] = '\0';
1109 if (loglevel > 1)
1111 LogLocker log_lock;
1113 ACE_DEBUG ((LM_DEBUG,
1114 ACE_TEXT ("(%t) **** Server %d: handle_read_dgram() ****\n"),
1115 this->id_));
1116 ACE_DEBUG ((LM_DEBUG,
1117 ACE_TEXT ("%s = %B\n"),
1118 ACE_TEXT ("bytes_to_read"),
1119 result.bytes_to_read ()));
1120 ACE_DEBUG ((LM_DEBUG,
1121 ACE_TEXT ("%s = %d\n"),
1122 ACE_TEXT ("handle"),
1123 result.handle ()));
1124 ACE_DEBUG ((LM_DEBUG,
1125 ACE_TEXT ("%s = %B\n"),
1126 ACE_TEXT ("bytes_transfered"),
1127 result.bytes_transferred ()));
1128 ACE_DEBUG ((LM_DEBUG,
1129 ACE_TEXT ("%s = %@\n"),
1130 ACE_TEXT ("act"),
1131 result.act ()));
1132 ACE_DEBUG ((LM_DEBUG,
1133 ACE_TEXT ("%s = %d\n"),
1134 ACE_TEXT ("success"),
1135 result.success ()));
1136 ACE_DEBUG ((LM_DEBUG,
1137 ACE_TEXT ("%s = %@\n"),
1138 ACE_TEXT ("completion_key"),
1139 result.completion_key ()));
1140 ACE_DEBUG ((LM_DEBUG,
1141 ACE_TEXT ("%s = %d\n"),
1142 ACE_TEXT ("error"),
1143 result.error ()));
1144 ACE_DEBUG ((LM_DEBUG,
1145 ACE_TEXT ("%s = %s\n"),
1146 ACE_TEXT ("message_block"),
1147 mb->rd_ptr ()));
1148 ACE_DEBUG ((LM_DEBUG,
1149 ACE_TEXT ("**** end of message ****************\n")));
1151 else if (result.error () != 0)
1153 ACE_Log_Priority prio;
1154 #if defined (ACE_WIN32)
1155 if (result.error () == ERROR_OPERATION_ABORTED)
1156 prio = LM_DEBUG;
1157 #else
1158 if (result.error () == ECANCELED)
1159 prio = LM_DEBUG;
1160 #endif /* ACE_WIN32 */
1161 else
1162 prio = LM_ERROR;
1163 ACE_LOG_MSG->errnum (result.error ());
1164 ACE_LOG_MSG->log (prio,
1165 ACE_TEXT ("(%t) Server %d; %p\n"),
1166 this->id_,
1167 ACE_TEXT ("read"));
1169 else if (loglevel > 0)
1171 ACE_DEBUG ((LM_DEBUG,
1172 ACE_TEXT ("(%t) Server %d: read %B bytes\n"),
1173 this->id_,
1174 result.bytes_transferred ()));
1177 if (result.error () == 0 && result.bytes_transferred () > 0)
1179 this->total_rcv_ += result.bytes_transferred ();
1181 // If client says we're done, ack it; we're done reading.
1182 size_t to_send = result.bytes_transferred ();
1183 if (ACE_OS::strcmp (mb->rd_ptr (), close_req_msg) == 0)
1185 ACE_DEBUG ((LM_DEBUG,
1186 ACE_TEXT ("(%t) Server %d saw close request; ack\n"),
1187 this->id_));
1188 this->flg_closing_ = true;
1189 mb->reset ();
1190 mb->copy (close_ack_msg);
1191 to_send = mb->length ();
1193 if (this->initiate_write (mb, to_send) == 0)
1195 if (duplex != 0 && !this->flg_closing_)
1196 this->initiate_read ();
1199 else
1200 mb->release ();
1202 --this->io_count_;
1203 if (this->io_count_ > 0)
1204 return;
1206 delete this;
1209 void
1210 Server::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
1213 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1215 ACE_Message_Block *mb = result.message_block ();
1217 if (loglevel > 1)
1219 LogLocker log_lock;
1221 //mb.rd_ptr () [0] = '\0';
1222 mb->rd_ptr (mb->rd_ptr () - result.bytes_transferred ());
1224 ACE_DEBUG ((LM_DEBUG,
1225 ACE_TEXT ("(%t) **** Server %d: handle_write_dgram() ****\n"),
1226 this->id_));
1227 ACE_DEBUG ((LM_DEBUG,
1228 ACE_TEXT ("%s = %B\n"),
1229 ACE_TEXT ("bytes_to_write"),
1230 result.bytes_to_write ()));
1231 ACE_DEBUG ((LM_DEBUG,
1232 ACE_TEXT ("%s = %d\n"),
1233 ACE_TEXT ("handle"),
1234 result.handle ()));
1235 ACE_DEBUG ((LM_DEBUG,
1236 ACE_TEXT ("%s = %B\n"),
1237 ACE_TEXT ("bytes_transfered"),
1238 result.bytes_transferred ()));
1239 ACE_DEBUG ((LM_DEBUG,
1240 ACE_TEXT ("%s = %@\n"),
1241 ACE_TEXT ("act"),
1242 result.act ()));
1243 ACE_DEBUG ((LM_DEBUG,
1244 ACE_TEXT ("%s = %d\n"),
1245 ACE_TEXT ("success"),
1246 result.success ()));
1247 ACE_DEBUG ((LM_DEBUG,
1248 ACE_TEXT ("%s = %@\n"),
1249 ACE_TEXT ("completion_key"),
1250 result.completion_key ()));
1251 ACE_DEBUG ((LM_DEBUG,
1252 ACE_TEXT ("%s = %d\n"),
1253 ACE_TEXT ("error"),
1254 result.error ()));
1255 ACE_DEBUG ((LM_DEBUG,
1256 ACE_TEXT ("%s = %s\n"),
1257 ACE_TEXT ("message_block"),
1258 mb->rd_ptr ()));
1259 ACE_DEBUG ((LM_DEBUG,
1260 ACE_TEXT ("**** end of message ****************\n")));
1262 else if (result.error () != 0)
1264 ACE_Log_Priority prio;
1265 #if defined (ACE_WIN32)
1266 if (result.error () == ERROR_OPERATION_ABORTED)
1267 prio = LM_DEBUG;
1268 #else
1269 if (result.error () == ECANCELED)
1270 prio = LM_DEBUG;
1271 #endif /* ACE_WIN32 */
1272 else
1273 prio = LM_ERROR;
1274 ACE_LOG_MSG->errnum (result.error ());
1275 ACE_LOG_MSG->log (prio,
1276 ACE_TEXT ("(%t) Server %d; %p\n"),
1277 this->id_,
1278 ACE_TEXT ("write"));
1280 else if (loglevel > 0)
1282 ACE_DEBUG ((LM_DEBUG,
1283 ACE_TEXT ("(%t) Server %d: wrote %B bytes ok\n"),
1284 this->id_,
1285 result.bytes_transferred ()));
1288 mb->release ();
1290 if (result.error () == 0 && result.bytes_transferred () > 0)
1292 this->total_snd_ += result.bytes_transferred ();
1294 if (duplex == 0 && !this->flg_closing_)
1295 this->initiate_read ();
1298 --this->io_count_;
1299 if (this->io_count_ > 0)
1300 return;
1302 delete this;
1305 // *******************************************
1306 // Connector
1308 // Connector creates the proper number of Clients and initiates
1309 // sessions on them.
1310 // *******************************************
1312 class Connector
1314 public:
1315 Connector (TestData *tester);
1317 int start (const ACE_INET_Addr &addr, int num);
1319 private:
1320 TestData *tester_;
1323 // *************************************************************
1325 Connector::Connector (TestData *tester)
1326 : tester_ (tester)
1331 Connector::start (const ACE_INET_Addr& addr, int num)
1333 ACE_OS::sleep(3); // Let Master get going
1334 if (num > MAX_CLIENTS)
1335 num = MAX_CLIENTS;
1337 if (num < 0)
1338 num = 1;
1340 int rc = 0;
1342 for (; rc < num; rc++)
1344 ACE_SOCK_CODgram sock;
1345 if (sock.open (addr) == -1)
1346 ACE_ERROR_BREAK ((LM_ERROR,
1347 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1349 ACE_TEXT ("open")));
1350 ACE_INET_Addr me;
1351 sock.get_local_addr (me);
1352 u_short my_port = ACE_HTONS (me.get_port_number ());
1353 ACE_INT32 my_addr = ACE_HTONL (me.get_ip_address ());
1354 Session_Data session;
1355 session.direction_ = 0; // Start
1356 session.addr_ = my_addr;
1357 session.port_ = my_port;
1358 if (sock.send (&session, sizeof (session)) == -1)
1359 ACE_ERROR_BREAK ((LM_ERROR,
1360 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1362 ACE_TEXT ("send")));
1363 if (sock.recv (&session, sizeof (session)) == -1)
1364 ACE_ERROR_BREAK ((LM_ERROR,
1365 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1367 ACE_TEXT ("recv")));
1368 ACE_INET_Addr server;
1369 server.set (session.port_, session.addr_, 0);
1370 Client *client = this->tester_->client_up ();
1371 ACE_TCHAR me_str[80], server_str[80];
1372 me.addr_to_string (me_str, 80);
1373 server.addr_to_string (server_str, 80);
1374 ACE_DEBUG ((LM_DEBUG,
1375 ACE_TEXT ("(%t) Client %d setting up local %s, peer %s\n"),
1376 client->id(),
1377 me_str,
1378 server_str));
1379 sock.close ();
1380 if (sock.open (server, me) == -1)
1381 ACE_ERROR_BREAK ((LM_ERROR,
1382 ACE_TEXT ("(%t) Re-opening %p\n"),
1383 ACE_TEXT ("client")));
1384 client->go (sock.get_handle (), server);
1385 sock.set_handle (ACE_INVALID_HANDLE);
1387 return rc;
1391 Client::Client ()
1393 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
1396 Client::Client (TestData *tester, int id)
1397 : tester_ (tester),
1398 id_ (id),
1399 io_count_ (0),
1400 stop_writing_ (0),
1401 flg_cancel_ (false),
1402 total_snd_ (0),
1403 total_rcv_ (0),
1404 total_w_ (0),
1405 total_r_ (0)
1409 Client::~Client (void)
1411 ACE_DEBUG ((LM_DEBUG,
1412 ACE_TEXT ("(%t) Client %d dtor; %d sends (%B bytes); ")
1413 ACE_TEXT ("%d recvs (%B bytes)\n"),
1414 this->id_,
1415 this->total_w_, this->total_snd_,
1416 this->total_r_, this->total_rcv_));
1417 if (this->io_count_ != 0)
1418 ACE_ERROR ((LM_WARNING,
1419 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1420 this->id_,
1421 this->io_count_));
1423 // This test bounces data back and forth between Clients and Servers.
1424 // Therefore, if there was significantly more data in one direction, that's
1425 // a problem. Remember, the byte counts are unsigned values.
1426 int issue_data_warning = 0;
1427 if (this->total_snd_ > this->total_rcv_)
1429 if (this->total_rcv_ == 0)
1430 issue_data_warning = 1;
1431 else if (this->total_snd_ / this->total_rcv_ > 2)
1432 issue_data_warning = 1;
1434 else
1436 if (this->total_snd_ == 0)
1437 issue_data_warning = 1;
1438 else if (this->total_rcv_ / this->total_snd_ > 2)
1439 issue_data_warning = 1;
1441 if (issue_data_warning)
1442 ACE_DEBUG ((LM_WARNING,
1443 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1445 if (this->tester_ != 0)
1446 this->tester_->client_done (this);
1448 this->id_ = -1;
1449 if (this->handle () != ACE_INVALID_HANDLE)
1451 ACE_OS::closesocket (this->handle ());
1453 this->handle (ACE_INVALID_HANDLE);
1456 void
1457 Client::cancel ()
1459 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1461 this->flg_cancel_ = true;
1462 this->ws_.cancel ();
1463 this->rs_.cancel ();
1464 return;
1467 void
1468 Client::close ()
1470 // This must be called with the lock_ held.
1471 ++this->stop_writing_;
1472 ACE_DEBUG ((LM_DEBUG,
1473 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1474 this->id_, this->io_count_));
1475 return;
1478 void
1479 Client::go (ACE_HANDLE handle, const ACE_INET_Addr &server)
1482 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1484 this->handle (handle);
1485 this->server_addr_.set (server);
1487 // Open send and receive factories.
1488 if (this->ws_.open (*this, handle) == -1)
1489 ACE_ERROR ((LM_ERROR,
1490 ACE_TEXT ("(%t) Client %d: %p\n"),
1491 this->id_,
1492 ACE_TEXT ("ACE_Asynch_Write_Dgram::open")));
1493 else if (this->rs_.open (*this, handle) == -1)
1494 ACE_ERROR ((LM_ERROR,
1495 ACE_TEXT ("(%t) Client %d: %p\n"),
1496 this->id_,
1497 ACE_TEXT ("ACE_Asynch_Read_Dgram::open")));
1498 else if (this->initiate_write () == 0)
1500 if (duplex != 0) // Start an asynchronous read
1501 this->initiate_read ();
1504 if (this->io_count_ > 0)
1505 return;
1507 delete this;
1511 Client::initiate_write (void)
1513 if (this->flg_cancel_ || this->handle () == ACE_INVALID_HANDLE)
1514 return -1;
1516 // stop_writing_ is set to 1 to say "stop". To avoid repeating the
1517 // close datagram for every echo, only send it once. Sure, there's a risk
1518 // it will get lost, but since this is most often intra-host, don't
1519 // worry about that very small risk.
1520 if (this->stop_writing_ > 0) // Need to tell server to "close"
1522 if (this->stop_writing_ > 1) // Already told server to close
1523 return 0;
1525 ++this->stop_writing_;
1526 ACE_DEBUG ((LM_DEBUG,
1527 ACE_TEXT ("(%t) Client %d requesting close\n"),
1528 this->id_));
1529 ACE_Message_Block *mb =
1530 new ACE_Message_Block (ACE_OS::strlen (close_req_msg) + 1);
1531 mb->copy (close_req_msg);
1532 size_t unused; // Number of bytes sent
1533 if (this->ws_.send (mb, unused, 0, this->server_addr_) == -1)
1535 mb->release ();
1536 ACE_ERROR_RETURN ((LM_ERROR,
1537 ACE_TEXT ("(%t) Client %d, %p\n"),
1538 this->id_,
1539 ACE_TEXT ("initiating closing send")),
1540 -1);
1543 this->io_count_++;
1544 this->total_w_++;
1545 return 0;
1548 static const size_t complete_message_length =
1549 ACE_OS::strlen (complete_message);
1551 #if defined (ACE_WIN32)
1553 ACE_Message_Block *mb1 = 0,
1554 *mb2 = 0,
1555 *mb3 = 0;
1557 // No need to allocate +1 for proper printing - the memory includes it already
1558 ACE_NEW_RETURN (mb1,
1559 ACE_Message_Block ((char *)complete_message,
1560 complete_message_length),
1561 -1);
1563 ACE_NEW_RETURN (mb2,
1564 ACE_Message_Block ((char *)complete_message,
1565 complete_message_length),
1566 -1);
1568 ACE_NEW_RETURN (mb3,
1569 ACE_Message_Block ((char *)complete_message,
1570 complete_message_length),
1571 -1);
1573 mb1->wr_ptr (complete_message_length);
1574 mb2->wr_ptr (complete_message_length);
1575 mb3->wr_ptr (complete_message_length);
1577 // chain them together
1578 mb1->cont (mb2);
1579 mb2->cont (mb3);
1581 size_t unused; // Number of bytes sent
1582 if (this->ws_.send (mb1, unused, 0, this->server_addr_) == -1)
1584 mb1->release ();
1585 ACE_ERROR_RETURN((LM_ERROR,
1586 ACE_TEXT ("(%t) %p\n"),
1587 ACE_TEXT ("Client::ACE_Asynch_Write_Dgram::send")),
1588 -1);
1590 #else /* ACE_WIN32 */
1592 ACE_Message_Block *mb = 0;
1594 // No need to allocate +1 for proper printing - the memory includes
1595 // it already
1596 ACE_NEW_RETURN (mb,
1597 ACE_Message_Block (complete_message,
1598 complete_message_length),
1599 -1);
1600 mb->wr_ptr (complete_message_length);
1601 size_t unused; // Number of bytes sent
1602 if (this->ws_.send (mb, unused, 0, this->server_addr_) == -1)
1604 mb->release ();
1605 ACE_ERROR_RETURN((LM_ERROR,
1606 ACE_TEXT ("(%t) Client %d, %p\n"),
1607 this->id_,
1608 ACE_TEXT ("send")),
1609 -1);
1611 #endif /* ACE_WIN32 */
1613 this->io_count_++;
1614 this->total_w_++;
1615 return 0;
1619 Client::initiate_read (void)
1621 if (this->flg_cancel_ || this->handle_ == ACE_INVALID_HANDLE)
1622 return -1;
1624 static const size_t complete_message_length =
1625 ACE_OS::strlen (complete_message);
1627 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
1628 ACE_Message_Block *mb1 = 0,
1629 *mb2 = 0,
1630 *mb3 = 0,
1631 *mb4 = 0,
1632 *mb5 = 0,
1633 *mb6 = 0;
1635 // We allocate +1 only for proper printing - we can just set the last byte
1636 // to '\0' before printing out
1637 ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1);
1638 ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1);
1639 ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1);
1641 // Let allocate memory for one more triplet,
1642 // This improves performance
1643 // as we can receive more the than one block at once
1644 // Generally, we can receive more triplets ....
1645 ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1);
1646 ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1);
1647 ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1);
1649 mb1->cont (mb2);
1650 mb2->cont (mb3);
1652 mb3->cont (mb4);
1653 mb4->cont (mb5);
1654 mb5->cont (mb6);
1657 // hide last byte in each message block, reserving it for later to set '\0'
1658 // for proper printouts
1659 mb1->size (mb1->size () - 1);
1660 mb2->size (mb2->size () - 1);
1661 mb3->size (mb3->size () - 1);
1663 mb4->size (mb4->size () - 1);
1664 mb5->size (mb5->size () - 1);
1665 mb6->size (mb6->size () - 1);
1667 // Inititiate read
1668 size_t unused = 0;
1669 if (this->rs_.recv (mb1, unused, 0) == -1)
1671 mb1->release ();
1672 ACE_ERROR_RETURN ((LM_ERROR,
1673 ACE_TEXT ("(%t) %p\n"),
1674 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1675 -1);
1677 #else /* ACE_HAS_WIN32_OVERLAPPED_IO */
1679 // Try to read more chunks
1680 size_t blksize = ( complete_message_length > BUFSIZ ) ?
1681 complete_message_length : BUFSIZ;
1683 ACE_Message_Block *mb = 0;
1685 // We allocate +1 only for proper printing - we can just set the last byte
1686 // to '\0' before printing out
1687 ACE_NEW_RETURN (mb,
1688 ACE_Message_Block (blksize + 1),
1689 -1);
1691 // Inititiate read
1692 size_t unused = 0;
1693 if (this->rs_.recv (mb, unused, 0) == -1)
1695 mb->release ();
1696 ACE_ERROR_RETURN ((LM_ERROR,
1697 ACE_TEXT ("(%t) Client %d, %p\n"),
1698 this->id_,
1699 ACE_TEXT ("read")),
1700 -1);
1702 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
1704 this->io_count_++;
1705 this->total_r_++;
1706 return 0;
1709 void
1710 Client::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
1713 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1715 ACE_Message_Block *mb = result.message_block ();
1717 if (loglevel > 1)
1719 LogLocker log_lock;
1721 ACE_DEBUG ((LM_DEBUG,
1722 ACE_TEXT ("(%t) **** Client %d: handle_write_dgram() ****\n"),
1723 this->id_));
1724 ACE_DEBUG ((LM_DEBUG,
1725 ACE_TEXT ("%s = %B\n"),
1726 ACE_TEXT ("bytes_to_write"),
1727 result.bytes_to_write ()));
1728 ACE_DEBUG ((LM_DEBUG,
1729 ACE_TEXT ("%s = %d\n"),
1730 ACE_TEXT ("handle"),
1731 result.handle ()));
1732 ACE_DEBUG ((LM_DEBUG,
1733 ACE_TEXT ("%s = %B\n"),
1734 ACE_TEXT ("bytes_transfered"),
1735 result.bytes_transferred ()));
1736 ACE_DEBUG ((LM_DEBUG,
1737 ACE_TEXT ("%s = %@\n"),
1738 ACE_TEXT ("act"),
1739 result.act ()));
1740 ACE_DEBUG ((LM_DEBUG,
1741 ACE_TEXT ("%s = %d\n"),
1742 ACE_TEXT ("success"),
1743 result.success ()));
1744 ACE_DEBUG ((LM_DEBUG,
1745 ACE_TEXT ("%s = %@\n"),
1746 ACE_TEXT ("completion_key"),
1747 result.completion_key ()));
1748 ACE_DEBUG ((LM_DEBUG,
1749 ACE_TEXT ("%s = %d\n"),
1750 ACE_TEXT ("error"),
1751 result.error ()));
1753 #if defined (ACE_WIN32)
1754 size_t bytes_transferred = result.bytes_transferred ();
1755 char index = 0;
1756 for (ACE_Message_Block* mb_i = mb;
1757 (mb_i != 0) && (bytes_transferred > 0);
1758 mb_i = mb_i->cont ())
1760 // write 0 at string end for proper printout (if end of mb,
1761 // it's 0 already)
1762 mb_i->rd_ptr()[0] = '\0';
1764 size_t len = mb_i->rd_ptr () - mb_i->base ();
1766 // move rd_ptr backwards as required for printout
1767 if (len >= bytes_transferred)
1769 mb_i->rd_ptr (0 - bytes_transferred);
1770 bytes_transferred = 0;
1772 else
1774 mb_i->rd_ptr (0 - len);
1775 bytes_transferred -= len;
1778 ++index;
1779 ACE_DEBUG ((LM_DEBUG,
1780 ACE_TEXT ("%s%d = %s\n"),
1781 ACE_TEXT ("message_block, part "),
1782 index,
1783 mb_i->rd_ptr ()));
1785 #else /* ACE_WIN32 */
1786 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1787 mb->rd_ptr()[0] = '\0';
1788 // move rd_ptr backwards as required for printout
1789 mb->rd_ptr (- result.bytes_transferred ());
1790 ACE_DEBUG ((LM_DEBUG,
1791 ACE_TEXT ("%s = %s\n"),
1792 ACE_TEXT ("message_block"),
1793 mb->rd_ptr ()));
1794 #endif /* ACE_WIN32 */
1796 ACE_DEBUG ((LM_DEBUG,
1797 ACE_TEXT ("**** end of message ****************\n")));
1799 else if (result.error () != 0)
1801 ACE_Log_Priority prio;
1802 #if defined (ACE_WIN32)
1803 if (result.error () == ERROR_OPERATION_ABORTED)
1804 prio = LM_DEBUG;
1805 #else
1806 if (result.error () == ECANCELED)
1807 prio = LM_DEBUG;
1808 #endif /* ACE_WIN32 */
1809 else
1810 prio = LM_ERROR;
1811 ACE_LOG_MSG->errnum (result.error ());
1812 ACE_LOG_MSG->log (prio,
1813 ACE_TEXT ("(%t) Client %d; %p\n"),
1814 this->id_,
1815 ACE_TEXT ("write"));
1817 else if (loglevel > 0)
1819 ACE_DEBUG ((LM_DEBUG,
1820 ACE_TEXT ("(%t) Client %d: wrote %B bytes ok\n"),
1821 this->id_,
1822 result.bytes_transferred ()));
1825 mb->release ();
1827 if (result.error () == 0 && result.bytes_transferred () > 0)
1829 this->total_snd_ += result.bytes_transferred ();
1830 if (this->total_snd_ >= xfer_limit)
1832 ACE_DEBUG ((LM_DEBUG,
1833 ACE_TEXT ("(%t) Client %d sent %B, limit %B\n"),
1834 this->id_, this->total_snd_, xfer_limit));
1835 this->close ();
1837 if (duplex != 0) // full duplex, continue write
1839 if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
1840 this->initiate_write ();
1842 else // half-duplex read reply, after read we will start write
1843 this->initiate_read ();
1846 --this->io_count_;
1847 if (this->io_count_ > 0)
1848 return;
1850 delete this;
1853 void
1854 Client::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
1857 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1859 ACE_Message_Block *mb = result.message_block ();
1861 if (loglevel > 1)
1863 LogLocker log_lock;
1865 ACE_DEBUG ((LM_DEBUG,
1866 ACE_TEXT ("(%t) **** Client %d: handle_read_dgram() ****\n"),
1867 this->id_));
1868 ACE_DEBUG ((LM_DEBUG,
1869 ACE_TEXT ("%s = %B\n"),
1870 ACE_TEXT ("bytes_to_read"),
1871 result.bytes_to_read ()));
1872 ACE_DEBUG ((LM_DEBUG,
1873 ACE_TEXT ("%s = %d\n"),
1874 ACE_TEXT ("handle"),
1875 result.handle ()));
1876 ACE_DEBUG ((LM_DEBUG,
1877 ACE_TEXT ("%s = %B\n"),
1878 ACE_TEXT ("bytes_transfered"),
1879 result.bytes_transferred ()));
1880 ACE_DEBUG ((LM_DEBUG,
1881 ACE_TEXT ("%s = %@\n"),
1882 ACE_TEXT ("act"),
1883 result.act ()));
1884 ACE_DEBUG ((LM_DEBUG,
1885 ACE_TEXT ("%s = %d\n"),
1886 ACE_TEXT ("success"),
1887 result.success ()));
1888 ACE_DEBUG ((LM_DEBUG,
1889 ACE_TEXT ("%s = %@\n"),
1890 ACE_TEXT ("completion_key"),
1891 result.completion_key ()));
1892 ACE_DEBUG ((LM_DEBUG,
1893 ACE_TEXT ("%s = %d\n"),
1894 ACE_TEXT ("error"),
1895 result.error ()));
1897 #if defined (ACE_WIN32)
1898 char index = 0;
1899 for (ACE_Message_Block* mb_i = mb;
1900 mb_i != 0;
1901 mb_i = mb_i->cont ())
1903 ++index;
1904 // write 0 at string end for proper printout
1905 mb_i->wr_ptr()[0] = '\0';
1907 ACE_DEBUG ((LM_DEBUG,
1908 ACE_TEXT ("%s%d = %s\n"),
1909 ACE_TEXT ("message_block, part "),
1910 index,
1911 mb_i->rd_ptr ()));
1913 #else /* ACE_WIN32 */
1914 // write 0 at string end for proper printout
1915 mb->rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout
1916 ACE_DEBUG ((LM_DEBUG,
1917 ACE_TEXT ("%s = %s\n"),
1918 ACE_TEXT ("message_block"),
1919 mb->rd_ptr ()));
1920 #endif /* ACE_WIN32 */
1922 ACE_DEBUG ((LM_DEBUG,
1923 ACE_TEXT ("**** end of message ****************\n")));
1925 else if (result.error () != 0)
1927 ACE_Log_Priority prio;
1928 #if defined (ACE_WIN32)
1929 if (result.error () == ERROR_OPERATION_ABORTED)
1930 prio = LM_DEBUG;
1931 #else
1932 if (result.error () == ECANCELED)
1933 prio = LM_DEBUG;
1934 #endif /* ACE_WIN32 */
1935 else
1936 prio = LM_ERROR;
1937 ACE_Log_Msg::instance ()->errnum (result.error ());
1938 ACE_Log_Msg::instance ()->log (prio,
1939 ACE_TEXT ("(%t) Client %d; %p\n"),
1940 this->id_,
1941 ACE_TEXT ("read"));
1943 else if (loglevel > 0)
1945 ACE_DEBUG ((LM_DEBUG,
1946 ACE_TEXT ("(%t) Client %d: read %B bytes ok\n"),
1947 this->id_,
1948 result.bytes_transferred ()));
1951 if (result.error () == 0 && result.bytes_transferred () > 0)
1953 this->total_rcv_ += result.bytes_transferred ();
1955 // If we've closed and the server acked, we're done.
1956 if (this->stop_writing_ &&
1957 ACE_OS::strcmp (mb->rd_ptr (), close_ack_msg) == 0)
1959 ACE_DEBUG ((LM_DEBUG,
1960 ACE_TEXT ("(%t) Client %d recvd close-ack\n"),
1961 this->id_));
1963 else
1965 if (duplex != 0)
1966 this->initiate_read ();
1967 else // half-duplex write, after write we will start read
1968 this->initiate_write ();
1972 mb->release ();
1973 --this->io_count_;
1974 if (this->io_count_ > 0)
1975 return;
1977 delete this;
1980 // *************************************************************
1981 // Configuration helpers
1982 // *************************************************************
1984 print_usage (int /* argc */, ACE_TCHAR *argv[])
1986 ACE_ERROR
1987 ((LM_ERROR,
1988 ACE_TEXT ("\nusage: %s")
1989 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1990 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1991 ACE_TEXT ("\n a AIOCB")
1992 ACE_TEXT ("\n i SIG")
1993 ACE_TEXT ("\n c CB")
1994 ACE_TEXT ("\n s SUN")
1995 ACE_TEXT ("\n d default")
1996 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1997 ACE_TEXT ("\n-h <host> for Client mode")
1998 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1999 ACE_TEXT ("\n-p <port to listen/connect>")
2000 ACE_TEXT ("\n-c <number of client instances>")
2001 ACE_TEXT ("\n-b run client and server at the same time")
2002 ACE_TEXT ("\n f file")
2003 ACE_TEXT ("\n c console")
2004 ACE_TEXT ("\n-v log level")
2005 ACE_TEXT ("\n 0 - log errors and highlights")
2006 ACE_TEXT ("\n 1 - log level 0 plus progress information")
2007 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
2008 ACE_TEXT ("\n-x max transfer byte count per Client")
2009 ACE_TEXT ("\n-u show this message")
2010 ACE_TEXT ("\n"),
2011 argv[0]
2013 return -1;
2016 static int
2017 set_proactor_type (const ACE_TCHAR *ptype)
2019 if (!ptype)
2020 return 0;
2022 switch (ACE_OS::ace_toupper (*ptype))
2024 case 'D':
2025 proactor_type = DEFAULT;
2026 return 1;
2027 case 'A':
2028 proactor_type = AIOCB;
2029 return 1;
2030 case 'I':
2031 proactor_type = SIG;
2032 return 1;
2033 #if defined (sun)
2034 case 'S':
2035 proactor_type = SUN;
2036 return 1;
2037 #endif /* sun */
2038 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
2039 case 'C':
2040 proactor_type = CB;
2041 return 1;
2042 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
2043 default:
2044 break;
2046 return 0;
2049 static int
2050 parse_args (int argc, ACE_TCHAR *argv[])
2052 // First, set up all the defaults then let any args change them.
2053 both = 1; // client and server simultaneosly
2054 duplex = 1; // full duplex is on
2055 host = ACE_LOCALHOST; // server to connect
2056 port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
2057 max_aio_operations = 512; // POSIX Proactor params
2058 proactor_type = DEFAULT; // Proactor type = default
2059 threads = 3; // size of Proactor thread pool
2060 clients = 10; // number of clients
2061 loglevel = 0; // log level : only errors and highlights
2062 // Default transfer limit 50 messages per Sender
2063 xfer_limit = 50 * ACE_OS::strlen (complete_message);
2065 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
2066 # if defined (ACE_LINUX)
2067 duplex = 0;
2068 #endif
2070 if (argc == 1) // no arguments , so one button test
2071 return 0;
2073 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
2074 int c;
2076 while ((c = get_opt ()) != EOF)
2078 switch (c)
2080 case 'x': // xfer limit
2081 xfer_limit = static_cast<size_t> (ACE_OS::atoi (get_opt.opt_arg ()));
2082 if (xfer_limit == 0)
2083 xfer_limit = 1; // Bare minimum.
2084 break;
2085 case 'b': // both client and server
2086 both = 1;
2087 break;
2088 case 'v': // log level
2089 loglevel = ACE_OS::atoi (get_opt.opt_arg ());
2090 break;
2091 case 'd': // duplex
2092 duplex = ACE_OS::atoi (get_opt.opt_arg ());
2093 break;
2094 case 'h': // host for sender
2095 host = get_opt.opt_arg ();
2096 break;
2097 case 'p': // port number
2098 port = ACE_OS::atoi (get_opt.opt_arg ());
2099 break;
2100 case 'n': // thread pool size
2101 threads = ACE_OS::atoi (get_opt.opt_arg ());
2102 break;
2103 case 'c': // number of clients
2104 clients = ACE_OS::atoi (get_opt.opt_arg ());
2105 if (clients > MAX_CLIENTS)
2106 clients = MAX_CLIENTS;
2107 break;
2108 case 'o': // max number of aio for proactor
2109 max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
2110 break;
2111 case 't': // Proactor Type
2112 if (set_proactor_type (get_opt.opt_arg ()))
2113 break;
2114 return print_usage (argc, argv);
2115 case 'u':
2116 default:
2117 return print_usage (argc, argv);
2118 } // switch
2119 } // while
2121 if (proactor_type == SUN && threads > 1)
2123 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sun aiowait is not thread-safe; ")
2124 ACE_TEXT ("changing to 1 thread\n")));
2125 threads = 1;
2128 return 0;
2132 run_main (int argc, ACE_TCHAR *argv[])
2134 ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
2136 if (::parse_args (argc, argv) == -1)
2137 return -1;
2139 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
2140 disable_signal (SIGPIPE, SIGPIPE);
2142 MyTask task1;
2143 TestData test;
2145 if (task1.start (threads, proactor_type, max_aio_operations) == 0)
2147 // NOTE - there's no real reason this test is limited to IPv4 other
2148 // than the way Session_Data is set up - to expand this test to work
2149 // on IPv6 as well as IPv4, you need to do some work on passing the
2150 // Session_Data address differently.
2151 ACE_INET_Addr addr (port, ACE_LOCALHOST, AF_INET);
2152 Master master (&test, addr, clients);
2153 Connector connector (&test);
2154 int rc = 0;
2156 if (both != 0 || host == 0) // Acceptor
2158 // Already running; if not needed will be deleted soon.
2159 rc = 1;
2162 if (both != 0 || host != 0)
2164 if (host == 0)
2165 host = ACE_LOCALHOST;
2167 if (addr.set (port, host, 1, addr.get_type ()) == -1)
2168 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
2169 else
2170 rc += connector.start (addr, clients);
2173 // Wait a few seconds to let things get going, then poll til
2174 // all sessions are done. Note that when we exit this scope, the
2175 // Acceptor and Connector will be destroyed, which should prevent
2176 // further connections and also test how well destroyed handlers
2177 // are handled.
2178 ACE_OS::sleep (3);
2180 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
2181 while (!test.testing_done ())
2182 ACE_OS::sleep (1);
2184 test.stop_all ();
2186 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
2187 task1.stop ();
2189 ACE_END_TEST;
2191 return 0;
2194 #else
2197 run_main (int, ACE_TCHAR *[])
2199 ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
2201 ACE_DEBUG ((LM_INFO,
2202 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
2203 ACE_TEXT ("Proactor_UDP_Test will not be run.\n")));
2205 ACE_END_TEST;
2207 return 0;
2210 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */