Use =default for skeleton copy constructor
[ACE_TAO.git] / ACE / tests / Proactor_UDP_Test.cpp
blob30457b30ac3a11c9029489649df6eea5865ce502
1 // ============================================================================
2 /**
3 * @file Proactor_UDP_Test.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that uses UDP/IP communications.
8 * @author Steve Huston <shuston@riverace.com>, based on Proactor_Test.cpp
9 */
10 // ============================================================================
12 #include "test_config.h"
14 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
15 // This only works on Win32 platforms and on Unix platforms
16 // supporting POSIX aio calls.
18 #include "ace/Signal.h"
20 #include "ace/Service_Config.h"
21 #include "ace/INET_Addr.h"
22 #include "ace/SOCK_CODgram.h"
23 #include "ace/SOCK_Dgram.h"
24 #include "ace/Object_Manager.h"
25 #include "ace/Get_Opt.h"
27 #include "ace/Proactor.h"
28 #include "ace/Task.h"
29 #include "ace/Thread_Semaphore.h"
30 #include "ace/OS_NS_ctype.h"
31 #include "ace/OS_NS_errno.h"
32 #include "ace/OS_NS_signal.h"
33 #include "ace/OS_NS_string.h"
34 #include "ace/OS_NS_unistd.h"
35 #include "ace/OS_NS_sys_socket.h"
36 #include "ace/Sock_Connect.h"
37 #include "ace/os_include/netinet/os_tcp.h"
39 #include "ace/Atomic_Op.h"
40 #include "ace/Synch_Traits.h"
42 #if defined (ACE_WIN32)
44 # include "ace/WIN32_Proactor.h"
46 #elif defined (ACE_HAS_AIO_CALLS)
48 # include "ace/POSIX_Proactor.h"
49 # include "ace/POSIX_CB_Proactor.h"
51 #endif /* ACE_WIN32 */
53 // Proactor Type (UNIX only, Win32 ignored)
54 using ProactorType = enum { DEFAULT = 0, AIOCB, SIG, CB };
55 static ProactorType proactor_type = DEFAULT;
57 // POSIX : > 0 max number aio operations proactor,
58 static size_t max_aio_operations = 0;
60 // both: 0 run client or server / depends on host
61 // != 0 run client and server
62 static int both = 0;
64 // Host that we're connecting to.
65 static const ACE_TCHAR *host = 0;
67 // number of Client instances
68 static int clients = 1;
69 const int MAX_CLIENTS = 1000;
70 const int MAX_SERVERS = 1000;
72 // duplex mode: == 0 half-duplex
73 // != 0 full duplex
74 static int duplex = 0;
76 // number threads in the Proactor thread pool
77 static int threads = 1;
79 // Port that we're receiving session initiations on.
80 static u_short port = ACE_DEFAULT_SERVER_PORT;
82 // Log options
83 static int loglevel; // 0 full , 1 only errors
85 static size_t xfer_limit; // Number of bytes for Client to send.
87 static char complete_message[] =
88 "GET / HTTP/1.1\r\n"
89 "Accept: */*\r\n"
90 "Accept-Language: C++\r\n"
91 "Accept-Encoding: gzip, deflate\r\n"
92 "User-Agent: Proactor_Test/1.0 (non-compatible)\r\n"
93 "Connection: Keep-Alive\r\n"
94 "\r\n";
96 static char close_req_msg[] = "CLOSE";
97 static char close_ack_msg[] = "CLOSE-ACK";
99 class LogLocker
101 public:
102 LogLocker () { ACE_LOG_MSG->acquire (); }
103 virtual ~LogLocker () { ACE_LOG_MSG->release (); }
106 // Function to remove signals from the signal mask.
107 static int
108 disable_signal (int sigmin, int sigmax)
110 #if !defined (ACE_LACKS_UNIX_SIGNALS)
111 sigset_t signal_set;
112 if (ACE_OS::sigemptyset (&signal_set) == - 1)
113 ACE_ERROR ((LM_ERROR,
114 ACE_TEXT ("Error: (%P|%t):%p\n"),
115 ACE_TEXT ("sigemptyset failed")));
117 for (int i = sigmin; i <= sigmax; i++)
118 ACE_OS::sigaddset (&signal_set, i);
120 // Put the <signal_set>.
121 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
122 // In multi-threaded application this is not POSIX compliant
123 // but let's leave it just in case.
124 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
125 # else
126 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
127 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
128 ACE_ERROR_RETURN ((LM_ERROR,
129 ACE_TEXT ("Error: (%P|%t): %p\n"),
130 ACE_TEXT ("SIG_BLOCK failed")),
131 -1);
132 #else
133 ACE_UNUSED_ARG (sigmin);
134 ACE_UNUSED_ARG (sigmax);
135 #endif /* ACE_LACKS_UNIX_SIGNALS */
137 return 0;
140 // *************************************************************
141 // MyTask is ACE_Task resposible for :
142 // 1. creation and deletion of
143 // Proactor and Proactor thread pool
144 // 2. running Proactor event loop
145 // *************************************************************
148 * @class MyTask
150 * MyTask plays role for Proactor threads pool
152 * MyTask is ACE_Task resposible for:
153 * 1. Creation and deletion of Proactor and Proactor thread pool
154 * 2. Running Proactor event loop
156 class MyTask : public ACE_Task<ACE_MT_SYNCH>
158 public:
159 MyTask ():
160 lock_ (),
161 sem_ ((unsigned int) 0),
162 proactor_(0) {}
164 ~MyTask() override
166 (void) this->stop ();
167 (void) this->delete_proactor();
170 int svc () override;
172 int start (int num_threads,
173 ProactorType type_proactor,
174 size_t max_op );
175 int stop ();
177 private:
178 int create_proactor (ProactorType type_proactor,
179 size_t max_op);
180 int delete_proactor ();
182 ACE_SYNCH_RECURSIVE_MUTEX lock_;
183 ACE_Thread_Semaphore sem_;
184 ACE_Proactor * proactor_;
188 MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
190 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
191 monitor,
192 this->lock_,
193 -1);
195 ACE_TEST_ASSERT (this->proactor_ == 0);
197 #if defined (ACE_WIN32)
199 ACE_UNUSED_ARG (type_proactor);
200 ACE_UNUSED_ARG (max_op);
202 ACE_WIN32_Proactor *proactor_impl = 0;
204 ACE_NEW_RETURN (proactor_impl,
205 ACE_WIN32_Proactor,
206 -1);
208 ACE_DEBUG ((LM_DEBUG,
209 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
211 #elif defined (ACE_HAS_AIO_CALLS)
213 ACE_POSIX_Proactor * proactor_impl = 0;
215 switch (type_proactor)
217 case AIOCB:
218 ACE_NEW_RETURN (proactor_impl,
219 ACE_POSIX_AIOCB_Proactor (max_op),
220 -1);
221 ACE_DEBUG ((LM_DEBUG,
222 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
223 break;
225 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
226 case SIG:
227 ACE_NEW_RETURN (proactor_impl,
228 ACE_POSIX_SIG_Proactor (max_op),
229 -1);
230 ACE_DEBUG ((LM_DEBUG,
231 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
232 break;
233 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
235 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
236 case CB:
237 ACE_NEW_RETURN (proactor_impl,
238 ACE_POSIX_CB_Proactor (max_op),
239 -1);
240 ACE_DEBUG ((LM_DEBUG,
241 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
242 break;
243 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
245 default:
246 ACE_DEBUG ((LM_DEBUG,
247 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
248 break;
251 #endif /* ACE_WIN32 */
253 // always delete implementation 1 , not !(proactor_impl == 0)
254 ACE_NEW_RETURN (this->proactor_,
255 ACE_Proactor (proactor_impl, 1 ),
256 -1);
257 // Set new singleton and delete it in close_singleton()
258 ACE_Proactor::instance (this->proactor_, 1);
259 return 0;
263 MyTask::delete_proactor ()
265 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
266 monitor,
267 this->lock_,
268 -1);
270 ACE_DEBUG ((LM_DEBUG,
271 ACE_TEXT ("(%t) Delete Proactor\n")));
273 ACE_Proactor::close_singleton ();
274 this->proactor_ = 0;
276 return 0;
280 MyTask::start (int num_threads,
281 ProactorType type_proactor,
282 size_t max_op)
284 if (this->create_proactor (type_proactor, max_op) == -1)
285 ACE_ERROR_RETURN ((LM_ERROR,
286 ACE_TEXT ("%p.\n"),
287 ACE_TEXT ("unable to create proactor")),
288 -1);
290 if (this->activate (THR_NEW_LWP, num_threads) == -1)
291 ACE_ERROR_RETURN ((LM_ERROR,
292 ACE_TEXT ("%p.\n"),
293 ACE_TEXT ("unable to activate thread pool")),
294 -1);
296 for (; num_threads > 0; num_threads--)
298 sem_.acquire ();
301 return 0;
306 MyTask::stop ()
308 if (this->proactor_ != 0)
310 ACE_DEBUG ((LM_DEBUG,
311 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
313 this->proactor_->proactor_end_event_loop ();
316 if (this->wait () == -1)
317 ACE_ERROR ((LM_ERROR,
318 ACE_TEXT ("%p.\n"),
319 ACE_TEXT ("unable to stop thread pool")));
321 return 0;
325 MyTask::svc ()
327 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n")));
329 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
330 disable_signal (SIGPIPE, SIGPIPE);
332 // signal that we are ready
333 sem_.release (1);
335 this->proactor_->proactor_run_event_loop ();
337 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n")));
338 return 0;
341 // forward declaration
342 class TestData;
344 // "Server" is one side of a session. It's the same idea as in TCP, but
345 // there's no acceptor in UDP; sessions are started by the client sending
346 // a "start" datagram to a well-known UDP port. The start datagram tells
347 // which port number the client is receiving on. The server then sends an
348 // "ack" datagram to indicate the session is set up successfully and to say
349 // which port the server is listening on. Thus, a unique pairing of server
350 // and client port numbers is established. Each session will require a
351 // separate server-side socket as well as the client. Note that experienced
352 // UDP programmers will be quivering at this point knowing that there's no
353 // reason to have multiple server-side sockets, and no real reason to
354 // pre-register the client ports either since all the addressing info is
355 // available on normal UDP programming. However, this is all necessary in
356 // the POSIX case since the POSIX aio functions were not designed with UDP
357 // in mind, and the addressing information is not available in UDP receive
358 // completion callbacks; thus, each socket needs to be fully connected before
359 // running session data. The addressing information needed to run this
360 // use-case in the "normal" way is available on Windows, but this test runs
361 // across many platforms, so can't rely on that information.
362 class Server : public ACE_Handler
364 public:
365 Server ();
366 Server (TestData *tester, int id);
367 ~Server () override;
369 int id () { return this->id_; }
370 size_t get_total_snd () { return this->total_snd_; }
371 size_t get_total_rcv () { return this->total_rcv_; }
372 long get_total_w () { return this->total_w_; }
373 long get_total_r () { return this->total_r_; }
375 /// This is called after the new session has been established.
376 void go (ACE_HANDLE handle, const ACE_INET_Addr &client);
378 void cancel ();
380 protected:
382 * @name AIO callback handling
384 * These methods are called by the framework
386 /// This is called when asynchronous <read> operation from the
387 /// socket completes.
388 void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result) override;
390 /// This is called when an asynchronous <write> to the socket
391 /// completes.
392 void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result) override;
394 private:
395 int initiate_read ();
396 int initiate_write (ACE_Message_Block *mb, size_t nbytes);
398 TestData *tester_;
399 int id_;
401 ACE_INET_Addr client_addr_;
402 ACE_Asynch_Read_Dgram rs_;
403 ACE_Asynch_Write_Dgram ws_;
404 ACE_SYNCH_MUTEX lock_;
406 int io_count_; // Number of currently outstanding I/O requests
407 bool flg_cancel_;
408 bool flg_closing_;
409 size_t total_snd_; // Number of bytes successfully sent
410 size_t total_rcv_; // Number of bytes successfully received
411 int total_w_; // Number of write operations
412 int total_r_; // Number of read operations
415 // *******************************************
416 // Client
417 // *******************************************
419 class Client : public ACE_Handler
421 public:
422 Client ();
423 Client (TestData *tester, int id);
424 ~Client () override;
426 void go (ACE_HANDLE h, const ACE_INET_Addr &server);
427 int id () { return this->id_; }
428 size_t get_total_snd () { return this->total_snd_; }
429 size_t get_total_rcv () { return this->total_rcv_; }
430 int get_total_w () { return this->total_w_; }
431 int get_total_r () { return this->total_r_; }
433 // This is called when asynchronous reads from the socket complete
434 void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result) override;
436 // This is called when asynchronous writes from the socket complete
437 void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result) override;
439 void cancel ();
441 private:
442 int initiate_read ();
443 int initiate_write ();
444 // FUZZ: disable check_for_lack_ACE_OS
445 void close ();
446 // FUZZ: enable check_for_lack_ACE_OS
448 TestData *tester_;
449 int id_;
451 ACE_INET_Addr server_addr_;
452 ACE_Asynch_Read_Dgram rs_;
453 ACE_Asynch_Write_Dgram ws_;
455 ACE_SYNCH_MUTEX lock_;
457 int io_count_;
458 int stop_writing_; // Writes are shut down; just read.
459 bool flg_cancel_;
460 size_t total_snd_;
461 size_t total_rcv_;
462 int total_w_;
463 int total_r_;
466 // TestData collects and reports on test-related transfer and connection
467 // statistics.
468 class TestData
470 public:
471 TestData ();
472 bool testing_done ();
473 Server *server_up ();
474 Client *client_up ();
475 void server_done (Server *s);
476 void client_done (Client *c);
477 void stop_all ();
478 void report ();
480 private:
481 struct Local_Stats
483 // Track number of sessions that report start, and those that report
484 // their end (and stats).
485 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_up_;
486 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_down_;
488 // Total read and write bytes for all sessions.
489 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_cnt_;
490 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_cnt_;
491 // Total read and write operations issues for all sessions.
492 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_ops_;
493 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_ops_;
494 } servers_, clients_;
496 ACE_SYNCH_MUTEX list_lock_;
497 Server *server_list_[MAX_SERVERS];
498 Client *client_list_[MAX_CLIENTS];
501 TestData::TestData ()
503 int i;
504 for (i = 0; i < MAX_SERVERS; ++i)
505 this->server_list_[i] = 0;
506 for (i = 0; i < MAX_CLIENTS; ++i)
507 this->client_list_[i] = 0;
510 bool
511 TestData::testing_done ()
513 int svr_up = this->servers_.sessions_up_.value ();
514 int svr_dn = this->servers_.sessions_down_.value ();
515 int clt_up = this->clients_.sessions_up_.value ();
516 int clt_dn = this->clients_.sessions_down_.value ();
518 if (svr_up == 0 && clt_up == 0) // No connections up yet
519 return false;
521 return (svr_dn >= svr_up && clt_dn >= clt_up);
524 Server *
525 TestData::server_up ()
527 ++this->servers_.sessions_up_;
528 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
530 for (int i = 0; i < MAX_SERVERS; ++i)
532 if (this->server_list_[i] == 0)
534 ACE_NEW_RETURN (this->server_list_[i], Server (this, i), 0);
535 ACE_DEBUG ((LM_DEBUG,
536 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
538 this->servers_.sessions_up_.value (),
539 this->servers_.sessions_down_.value ()));
540 return this->server_list_[i];
543 return 0;
546 Client *
547 TestData::client_up ()
549 ++this->clients_.sessions_up_;
550 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
552 for (int i = 0; i < MAX_CLIENTS; ++i)
554 if (this->client_list_[i] == 0)
556 ACE_NEW_RETURN (this->client_list_[i], Client (this, i), 0);
557 ACE_DEBUG ((LM_DEBUG,
558 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
560 this->clients_.sessions_up_.value (),
561 this->clients_.sessions_down_.value ()));
562 return this->client_list_[i];
565 return 0;
568 void
569 TestData::server_done (Server *s)
571 this->servers_.w_cnt_ += s->get_total_snd ();
572 this->servers_.r_cnt_ += s->get_total_rcv ();
573 this->servers_.w_ops_ += s->get_total_w ();
574 this->servers_.r_ops_ += s->get_total_r ();
575 ++this->servers_.sessions_down_;
576 ACE_DEBUG ((LM_DEBUG,
577 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
578 s->id (),
579 this->servers_.sessions_up_.value (),
580 this->servers_.sessions_down_.value ()));
582 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
583 int i;
584 for (i = 0; i < MAX_SERVERS; ++i)
586 if (this->server_list_[i] == s)
588 if (s->id () != i)
589 ACE_ERROR ((LM_ERROR,
590 ACE_TEXT ("Server %d is pos %d in list\n"),
591 s->id (),
592 i));
593 this->server_list_[i] = 0;
594 break;
597 if (i >= MAX_SERVERS)
598 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Server %@ done but not listed\n"), s));
600 return;
603 void
604 TestData::client_done (Client *c)
606 this->clients_.w_cnt_ += c->get_total_snd ();
607 this->clients_.r_cnt_ += c->get_total_rcv ();
608 this->clients_.w_ops_ += c->get_total_w ();
609 this->clients_.r_ops_ += c->get_total_r ();
610 ++this->clients_.sessions_down_;
611 ACE_DEBUG ((LM_DEBUG,
612 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
613 c->id (),
614 this->clients_.sessions_up_.value (),
615 this->clients_.sessions_down_.value ()));
617 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
618 int i;
619 for (i = 0; i < MAX_CLIENTS; ++i)
621 if (this->client_list_[i] == c)
623 if (c->id () != i)
624 ACE_ERROR ((LM_ERROR,
625 ACE_TEXT ("Client %d is pos %d in list\n"),
626 c->id (),
627 i));
628 this->client_list_[i] = 0;
629 break;
632 if (i >= MAX_CLIENTS)
633 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Client %@ done but not listed\n"), c));
635 return;
638 void
639 TestData::stop_all ()
641 int i;
643 // Lock and cancel everything. Then release the lock, possibly allowing
644 // cleanups, then grab it again and delete all Servers and Clients.
646 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
647 for (i = 0; i < MAX_CLIENTS; ++i)
649 if (this->client_list_[i] != 0)
650 this->client_list_[i]->cancel ();
653 for (i = 0; i < MAX_SERVERS; ++i)
655 if (this->server_list_[i] != 0)
656 this->server_list_[i]->cancel ();
660 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
661 for (i = 0; i < MAX_CLIENTS; ++i)
663 if (this->client_list_[i] != 0)
664 delete this->client_list_[i];
667 for (i = 0; i < MAX_SERVERS; ++i)
669 if (this->server_list_[i] != 0)
670 delete this->server_list_[i];
675 void
676 TestData::report ()
678 // Print statistics
679 ACE_TCHAR bufs [256];
680 ACE_TCHAR bufr [256];
682 ACE_OS::snprintf (bufs, 256,
683 ACE_SIZE_T_FORMAT_SPECIFIER
684 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
685 this->clients_.w_cnt_.value (),
686 this->clients_.w_ops_.value ());
688 ACE_OS::snprintf (bufr, 256,
689 ACE_SIZE_T_FORMAT_SPECIFIER
690 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
691 this->clients_.r_cnt_.value (),
692 this->clients_.r_ops_.value ());
694 ACE_DEBUG ((LM_DEBUG,
695 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
696 bufs,
697 bufr));
699 ACE_OS::snprintf (bufs, 256,
700 ACE_SIZE_T_FORMAT_SPECIFIER
701 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
702 this->servers_.w_cnt_.value (),
703 this->servers_.w_ops_.value ());
705 ACE_OS::snprintf (bufr, 256,
706 ACE_SIZE_T_FORMAT_SPECIFIER
707 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
708 this->servers_.r_cnt_.value (),
709 this->servers_.r_ops_.value ());
711 ACE_DEBUG ((LM_DEBUG,
712 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
713 bufs,
714 bufr));
716 if (this->clients_.w_cnt_.value () == 0 ||
717 this->clients_.r_cnt_.value () == 0 ||
718 this->servers_.w_cnt_.value () == 0 ||
719 this->servers_.r_cnt_.value () == 0 )
720 ACE_ERROR ((LM_ERROR, ACE_TEXT ("It appears that this test didn't ")
721 ACE_TEXT ("really do anything. Something is very wrong.\n")));
724 // Session set-up struct.
725 struct Session_Data
727 ACE_INT32 direction_; // 0 == Start, 1 == Ack
728 ACE_INT32 addr_; // Network byte order, must be IPv4
729 ACE_UINT16 port_; // UDP port, network byte order
730 Session_Data() { ACE_OS::memset (this, 0, sizeof(*this)); }
733 // Master is the server-side receiver of session establishment requests.
734 // For each "start" dgram received, instantiates a new Server object,
735 // indicating the addressing info for the client.
736 // Master is initialized with a count of the number of expected sessions. After
737 // this number are set up, Master will stop listening for session requests.
738 // This is a bit fragile but is necessary because on HP-UX, et al., it
739 // is impossible to close/cancel a socket with an outstanding UDP receive
740 // So, this bit of messiness is necessary for portability.
741 // When the Master is destroyed, it will try to stop establishing sessions
742 // but this will only work on Windows.
743 class Master : public ACE_Handler
745 public:
746 Master (TestData *tester, const ACE_INET_Addr &recv_addr, int expected);
747 ~Master () override;
749 // Called when dgram receive operation completes.
750 void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result) override;
752 private:
753 void start_recv ();
755 TestData *tester_;
756 ACE_INET_Addr recv_addr_;
757 ACE_SOCK_Dgram sock_;
758 ACE_Asynch_Read_Dgram rd_;
759 ACE_Message_Block *mb_;
760 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_expected_;
761 volatile bool recv_in_progress_;
764 // *************************************************************
765 Master::Master (TestData *tester, const ACE_INET_Addr &recv_addr, int expected)
766 : tester_ (tester),
767 recv_addr_ (recv_addr),
768 mb_ (0),
769 sessions_expected_ (expected),
770 recv_in_progress_ (false)
772 if (this->sock_.open (recv_addr) == -1)
773 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Master socket %p\n"), ACE_TEXT ("open")));
774 else
776 if (this->rd_.open (*this, this->sock_.get_handle ()) == -1)
777 ACE_ERROR ((LM_ERROR,
778 ACE_TEXT ("Master reader %p\n"),
779 ACE_TEXT ("open")));
780 this->mb_ = new ACE_Message_Block (sizeof (Session_Data));
781 start_recv ();
785 Master::~Master ()
787 if (this->recv_in_progress_)
788 this->rd_.cancel ();
789 this->sock_.close ();
791 if (this->mb_ != 0)
793 this->mb_->release ();
794 this->mb_ = 0;
798 void
799 Master::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
801 // We should only receive Start datagrams with valid addresses to reply to.
802 if (result.success ())
804 if (result.bytes_transferred () != sizeof (Session_Data))
805 ACE_ERROR ((LM_ERROR,
806 ACE_TEXT ("(%t) Master session data expected %B bytes; ")
807 ACE_TEXT ("received %B\n"),
808 sizeof (Session_Data),
809 result.bytes_transferred ()));
810 else
812 ACE_Message_Block *mb = result.message_block ();
813 Session_Data *session =
814 reinterpret_cast<Session_Data*>(mb->rd_ptr ());
815 if (session->direction_ == 0)
817 ACE_INET_Addr client_addr, me_addr;
818 ACE_TCHAR client_str[80], me_str[80];
819 client_addr.set ((u_short)session->port_, session->addr_, 0);
820 client_addr.addr_to_string (client_str, 80);
822 // Set up the local and remote addresses - need fully-specified
823 // addresses to use UDP aio on Linux. This is the socket that
824 // the session will run over. The addressing info to be sent
825 // back to the Client will be sent over the receive socket
826 // to ensure it goes back to the client initiating the session.
827 ACE_SOCK_CODgram sock;
828 if (sock.open (client_addr) == -1)
830 ACE_ERROR ((LM_ERROR,
831 ACE_TEXT ("(%t) Master new socket for ")
832 ACE_TEXT ("client %s: %p\n"),
833 client_str,
834 ACE_TEXT ("open")));
836 else
838 sock.get_local_addr (me_addr);
839 me_addr.addr_to_string (me_str, 80);
840 ACE_DEBUG ((LM_DEBUG,
841 ACE_TEXT ("(%t) Master setting up server for ")
842 ACE_TEXT ("local %s, peer %s\n"),
843 me_str,
844 client_str));
846 Session_Data session;
847 session.direction_ = 1; // Ack
848 session.addr_ = ACE_HTONL (me_addr.get_ip_address ());
849 session.port_ = ACE_HTONS (me_addr.get_port_number ());
850 if (this->sock_.send (&session,
851 sizeof (session),
852 client_addr) == -1)
854 ACE_ERROR ((LM_ERROR,
855 ACE_TEXT ("(%t) Master reply %p\n"),
856 ACE_TEXT ("send")));
857 sock.close ();
859 else
861 Server *server = this->tester_->server_up ();
862 server->go (sock.get_handle (), client_addr);
865 if (--this->sessions_expected_ == 0)
867 ACE_DEBUG ((LM_DEBUG,
868 ACE_TEXT ("All expected sessions are up\n")));
871 else
873 ACE_ERROR ((LM_ERROR,
874 ACE_TEXT ("(%t) Badly formed Session request\n")));
878 else
880 ACE_Log_Priority prio = LM_ERROR;
881 #if defined (ACE_WIN32)
882 if (result.error () == ERROR_OPERATION_ABORTED)
883 prio = LM_DEBUG;
884 #else
885 if (result.error () == ECANCELED)
886 prio = LM_DEBUG;
887 #endif /* ACE_WIN32 */
888 // Multiple steps to log the error without squashing errno.
889 ACE_LOG_MSG->conditional_set (__FILE__,
890 __LINE__,
892 (int)(result.error ()));
893 ACE_LOG_MSG->log (prio,
894 ACE_TEXT ("(%t) Master %p\n"),
895 ACE_TEXT ("recv"));
896 // If canceled, don't try to restart.
897 if (prio == LM_DEBUG)
898 return;
900 this->start_recv ();
903 void
904 Master::start_recv ()
906 if (this->mb_ == 0)
907 return;
909 size_t unused = 0;
910 this->mb_->reset ();
911 if (this->rd_.recv (this->mb_, unused, 0) == -1)
912 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Master %p\n"), ACE_TEXT ("recv")));
913 else
914 this->recv_in_progress_ = true;
917 // ***************************************************
918 Server::Server ()
920 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
923 Server::Server (TestData *tester, int id)
924 : tester_ (tester),
925 id_ (id),
926 io_count_ (0),
927 flg_cancel_(false),
928 flg_closing_ (false),
929 total_snd_(0),
930 total_rcv_(0),
931 total_w_ (0),
932 total_r_ (0)
936 Server::~Server ()
938 ACE_DEBUG ((LM_DEBUG,
939 ACE_TEXT ("(%t) Server %d dtor; %d sends (%B bytes); ")
940 ACE_TEXT ("%d recvs (%B bytes)\n"),
941 this->id_,
942 this->total_w_, this->total_snd_,
943 this->total_r_, this->total_rcv_));
944 if (this->io_count_ != 0)
945 ACE_ERROR ((LM_WARNING,
946 ACE_TEXT ("(%t) Server %d deleted with ")
947 ACE_TEXT ("%d I/O outstanding\n"),
948 this->id_,
949 this->io_count_));
951 // This test bounces data back and forth between Clients and Servers.
952 // Therefore, if there was significantly more data in one direction, that's
953 // a problem. Remember, the byte counts are unsigned values.
954 int issue_data_warning = 0;
955 if (this->total_snd_ > this->total_rcv_)
957 if (this->total_rcv_ == 0)
958 issue_data_warning = 1;
959 else if (this->total_snd_ / this->total_rcv_ > 2)
960 issue_data_warning = 1;
962 else
964 if (this->total_snd_ == 0)
965 issue_data_warning = 1;
966 else if (this->total_rcv_ / this->total_snd_ > 2)
967 issue_data_warning = 1;
969 if (issue_data_warning)
970 ACE_DEBUG ((LM_WARNING,
971 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
973 if (this->tester_ != 0)
974 this->tester_->server_done (this);
976 if (this->handle () != ACE_INVALID_HANDLE)
977 ACE_OS::closesocket (this->handle ());
979 this->id_ = -1;
980 this->handle (ACE_INVALID_HANDLE);
983 void
984 Server::cancel ()
986 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
988 this->flg_cancel_ = true;
989 this->ws_.cancel ();
990 this->rs_.cancel ();
991 return;
994 void
995 Server::go (ACE_HANDLE handle, const ACE_INET_Addr &client)
997 this->handle (handle);
998 this->client_addr_.set (client);
1000 // Lock this before initiating I/O, else it may complete while we're
1001 // still setting up.
1003 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1005 if (this->ws_.open (*this, this->handle ()) == -1)
1006 ACE_ERROR ((LM_ERROR,
1007 ACE_TEXT ("(%t) %p\n"),
1008 ACE_TEXT ("Server::ACE_Asynch_Write_Dgram::open")));
1009 else if (this->rs_.open (*this, this->handle ()) == -1)
1010 ACE_ERROR ((LM_ERROR,
1011 ACE_TEXT ("(%t) %p\n"),
1012 ACE_TEXT ("Server::ACE_Asynch_Read_Dgram::open")));
1013 else
1014 this->initiate_read ();
1017 if (this->io_count_ > 0)
1018 return;
1020 delete this; // Error setting up I/O factories
1024 Server::initiate_read ()
1026 if (this->flg_cancel_ || this->handle () == ACE_INVALID_HANDLE)
1027 return -1;
1029 ACE_Message_Block *mb = 0;
1030 ACE_NEW_RETURN (mb,
1031 ACE_Message_Block (1024), //BUFSIZ + 1),
1032 -1);
1034 // Inititiate receive
1035 size_t unused = 0;
1036 if (this->rs_.recv (mb, unused, 0) == -1)
1038 mb->release ();
1039 ACE_ERROR_RETURN ((LM_ERROR,
1040 ACE_TEXT ("(%t) Server %d, %p\n"),
1041 this->id_,
1042 ACE_TEXT ("read")),
1043 -1);
1046 this->io_count_++;
1047 this->total_r_++;
1048 return 0;
1052 Server::initiate_write (ACE_Message_Block *mb, size_t nbytes)
1054 if (this->flg_cancel_ || this->handle () == ACE_INVALID_HANDLE)
1056 mb->release ();
1057 return -1;
1060 if (nbytes == 0)
1062 mb->release ();
1063 ACE_ERROR_RETURN((LM_ERROR,
1064 ACE_TEXT ("(%t) Server %d write nbytes == 0\n"),
1065 this->id_),
1066 -1);
1069 if (this->ws_.send (mb, nbytes, 0, this->client_addr_) == -1)
1071 mb->release ();
1072 ACE_ERROR_RETURN((LM_ERROR,
1073 ACE_TEXT ("(%t) Server %d, %p\n"),
1074 this->id_,
1075 ACE_TEXT ("write")),
1076 -1);
1079 this->io_count_++;
1080 this->total_w_++;
1081 return 0;
1084 void
1085 Server::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
1088 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
1090 ACE_Message_Block *mb = result.message_block ();
1092 // Reset pointers.
1093 mb->rd_ptr ()[result.bytes_transferred ()] = '\0';
1095 if (loglevel > 1)
1097 LogLocker log_lock;
1099 ACE_DEBUG ((LM_DEBUG,
1100 ACE_TEXT ("(%t) **** Server %d: handle_read_dgram() ****\n"),
1101 this->id_));
1102 ACE_DEBUG ((LM_DEBUG,
1103 ACE_TEXT ("%s = %B\n"),
1104 ACE_TEXT ("bytes_to_read"),
1105 result.bytes_to_read ()));
1106 ACE_DEBUG ((LM_DEBUG,
1107 ACE_TEXT ("%s = %d\n"),
1108 ACE_TEXT ("handle"),
1109 result.handle ()));
1110 ACE_DEBUG ((LM_DEBUG,
1111 ACE_TEXT ("%s = %B\n"),
1112 ACE_TEXT ("bytes_transfered"),
1113 result.bytes_transferred ()));
1114 ACE_DEBUG ((LM_DEBUG,
1115 ACE_TEXT ("%s = %@\n"),
1116 ACE_TEXT ("act"),
1117 result.act ()));
1118 ACE_DEBUG ((LM_DEBUG,
1119 ACE_TEXT ("%s = %d\n"),
1120 ACE_TEXT ("success"),
1121 result.success ()));
1122 ACE_DEBUG ((LM_DEBUG,
1123 ACE_TEXT ("%s = %@\n"),
1124 ACE_TEXT ("completion_key"),
1125 result.completion_key ()));
1126 ACE_DEBUG ((LM_DEBUG,
1127 ACE_TEXT ("%s = %d\n"),
1128 ACE_TEXT ("error"),
1129 result.error ()));
1130 ACE_DEBUG ((LM_DEBUG,
1131 ACE_TEXT ("%s = %s\n"),
1132 ACE_TEXT ("message_block"),
1133 mb->rd_ptr ()));
1134 ACE_DEBUG ((LM_DEBUG,
1135 ACE_TEXT ("**** end of message ****************\n")));
1137 else if (result.error () != 0)
1139 ACE_Log_Priority prio;
1140 #if defined (ACE_WIN32)
1141 if (result.error () == ERROR_OPERATION_ABORTED)
1142 prio = LM_DEBUG;
1143 #else
1144 if (result.error () == ECANCELED)
1145 prio = LM_DEBUG;
1146 #endif /* ACE_WIN32 */
1147 else
1148 prio = LM_ERROR;
1149 ACE_LOG_MSG->errnum (result.error ());
1150 ACE_LOG_MSG->log (prio,
1151 ACE_TEXT ("(%t) Server %d; %p\n"),
1152 this->id_,
1153 ACE_TEXT ("read"));
1155 else if (loglevel > 0)
1157 ACE_DEBUG ((LM_DEBUG,
1158 ACE_TEXT ("(%t) Server %d: read %B bytes\n"),
1159 this->id_,
1160 result.bytes_transferred ()));
1163 if (result.error () == 0 && result.bytes_transferred () > 0)
1165 this->total_rcv_ += result.bytes_transferred ();
1167 // If client says we're done, ack it; we're done reading.
1168 size_t to_send = result.bytes_transferred ();
1169 if (ACE_OS::strcmp (mb->rd_ptr (), close_req_msg) == 0)
1171 ACE_DEBUG ((LM_DEBUG,
1172 ACE_TEXT ("(%t) Server %d saw close request; ack\n"),
1173 this->id_));
1174 this->flg_closing_ = true;
1175 mb->reset ();
1176 mb->copy (close_ack_msg);
1177 to_send = mb->length ();
1179 if (this->initiate_write (mb, to_send) == 0)
1181 if (duplex != 0 && !this->flg_closing_)
1182 this->initiate_read ();
1185 else
1186 mb->release ();
1188 --this->io_count_;
1189 if (this->io_count_ > 0)
1190 return;
1192 delete this;
1195 void
1196 Server::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
1199 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1201 ACE_Message_Block *mb = result.message_block ();
1203 if (loglevel > 1)
1205 LogLocker log_lock;
1207 //mb.rd_ptr () [0] = '\0';
1208 mb->rd_ptr (mb->rd_ptr () - result.bytes_transferred ());
1210 ACE_DEBUG ((LM_DEBUG,
1211 ACE_TEXT ("(%t) **** Server %d: handle_write_dgram() ****\n"),
1212 this->id_));
1213 ACE_DEBUG ((LM_DEBUG,
1214 ACE_TEXT ("%s = %B\n"),
1215 ACE_TEXT ("bytes_to_write"),
1216 result.bytes_to_write ()));
1217 ACE_DEBUG ((LM_DEBUG,
1218 ACE_TEXT ("%s = %d\n"),
1219 ACE_TEXT ("handle"),
1220 result.handle ()));
1221 ACE_DEBUG ((LM_DEBUG,
1222 ACE_TEXT ("%s = %B\n"),
1223 ACE_TEXT ("bytes_transfered"),
1224 result.bytes_transferred ()));
1225 ACE_DEBUG ((LM_DEBUG,
1226 ACE_TEXT ("%s = %@\n"),
1227 ACE_TEXT ("act"),
1228 result.act ()));
1229 ACE_DEBUG ((LM_DEBUG,
1230 ACE_TEXT ("%s = %d\n"),
1231 ACE_TEXT ("success"),
1232 result.success ()));
1233 ACE_DEBUG ((LM_DEBUG,
1234 ACE_TEXT ("%s = %@\n"),
1235 ACE_TEXT ("completion_key"),
1236 result.completion_key ()));
1237 ACE_DEBUG ((LM_DEBUG,
1238 ACE_TEXT ("%s = %d\n"),
1239 ACE_TEXT ("error"),
1240 result.error ()));
1241 ACE_DEBUG ((LM_DEBUG,
1242 ACE_TEXT ("%s = %s\n"),
1243 ACE_TEXT ("message_block"),
1244 mb->rd_ptr ()));
1245 ACE_DEBUG ((LM_DEBUG,
1246 ACE_TEXT ("**** end of message ****************\n")));
1248 else if (result.error () != 0)
1250 ACE_Log_Priority prio;
1251 #if defined (ACE_WIN32)
1252 if (result.error () == ERROR_OPERATION_ABORTED)
1253 prio = LM_DEBUG;
1254 #else
1255 if (result.error () == ECANCELED)
1256 prio = LM_DEBUG;
1257 #endif /* ACE_WIN32 */
1258 else
1259 prio = LM_ERROR;
1260 ACE_LOG_MSG->errnum (result.error ());
1261 ACE_LOG_MSG->log (prio,
1262 ACE_TEXT ("(%t) Server %d; %p\n"),
1263 this->id_,
1264 ACE_TEXT ("write"));
1266 else if (loglevel > 0)
1268 ACE_DEBUG ((LM_DEBUG,
1269 ACE_TEXT ("(%t) Server %d: wrote %B bytes ok\n"),
1270 this->id_,
1271 result.bytes_transferred ()));
1274 mb->release ();
1276 if (result.error () == 0 && result.bytes_transferred () > 0)
1278 this->total_snd_ += result.bytes_transferred ();
1280 if (duplex == 0 && !this->flg_closing_)
1281 this->initiate_read ();
1284 --this->io_count_;
1285 if (this->io_count_ > 0)
1286 return;
1288 delete this;
1291 // *******************************************
1292 // Connector
1294 // Connector creates the proper number of Clients and initiates
1295 // sessions on them.
1296 // *******************************************
1298 class Connector
1300 public:
1301 Connector (TestData *tester);
1303 int start (const ACE_INET_Addr &addr, int num);
1305 private:
1306 TestData *tester_;
1309 // *************************************************************
1311 Connector::Connector (TestData *tester)
1312 : tester_ (tester)
1317 Connector::start (const ACE_INET_Addr& addr, int num)
1319 ACE_OS::sleep(3); // Let Master get going
1320 if (num > MAX_CLIENTS)
1321 num = MAX_CLIENTS;
1323 if (num < 0)
1324 num = 1;
1326 int rc = 0;
1328 for (; rc < num; rc++)
1330 ACE_SOCK_CODgram sock;
1331 if (sock.open (addr) == -1)
1332 ACE_ERROR_BREAK ((LM_ERROR,
1333 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1335 ACE_TEXT ("open")));
1336 ACE_INET_Addr me;
1337 sock.get_local_addr (me);
1338 u_short my_port = ACE_HTONS (me.get_port_number ());
1339 ACE_INT32 my_addr = ACE_HTONL (me.get_ip_address ());
1340 Session_Data session;
1341 session.direction_ = 0; // Start
1342 session.addr_ = my_addr;
1343 session.port_ = my_port;
1344 if (sock.send (&session, sizeof (session)) == -1)
1345 ACE_ERROR_BREAK ((LM_ERROR,
1346 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1348 ACE_TEXT ("send")));
1349 if (sock.recv (&session, sizeof (session)) == -1)
1350 ACE_ERROR_BREAK ((LM_ERROR,
1351 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1353 ACE_TEXT ("recv")));
1354 ACE_INET_Addr server;
1355 server.set (session.port_, session.addr_, 0);
1356 Client *client = this->tester_->client_up ();
1357 ACE_TCHAR me_str[80], server_str[80];
1358 me.addr_to_string (me_str, 80);
1359 server.addr_to_string (server_str, 80);
1360 ACE_DEBUG ((LM_DEBUG,
1361 ACE_TEXT ("(%t) Client %d setting up local %s, peer %s\n"),
1362 client->id(),
1363 me_str,
1364 server_str));
1365 sock.close ();
1366 if (sock.open (server, me) == -1)
1367 ACE_ERROR_BREAK ((LM_ERROR,
1368 ACE_TEXT ("(%t) Re-opening %p\n"),
1369 ACE_TEXT ("client")));
1370 client->go (sock.get_handle (), server);
1371 sock.set_handle (ACE_INVALID_HANDLE);
1373 return rc;
1377 Client::Client ()
1379 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
1382 Client::Client (TestData *tester, int id)
1383 : tester_ (tester),
1384 id_ (id),
1385 io_count_ (0),
1386 stop_writing_ (0),
1387 flg_cancel_ (false),
1388 total_snd_ (0),
1389 total_rcv_ (0),
1390 total_w_ (0),
1391 total_r_ (0)
1395 Client::~Client ()
1397 ACE_DEBUG ((LM_DEBUG,
1398 ACE_TEXT ("(%t) Client %d dtor; %d sends (%B bytes); ")
1399 ACE_TEXT ("%d recvs (%B bytes)\n"),
1400 this->id_,
1401 this->total_w_, this->total_snd_,
1402 this->total_r_, this->total_rcv_));
1403 if (this->io_count_ != 0)
1404 ACE_ERROR ((LM_WARNING,
1405 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1406 this->id_,
1407 this->io_count_));
1409 // This test bounces data back and forth between Clients and Servers.
1410 // Therefore, if there was significantly more data in one direction, that's
1411 // a problem. Remember, the byte counts are unsigned values.
1412 int issue_data_warning = 0;
1413 if (this->total_snd_ > this->total_rcv_)
1415 if (this->total_rcv_ == 0)
1416 issue_data_warning = 1;
1417 else if (this->total_snd_ / this->total_rcv_ > 2)
1418 issue_data_warning = 1;
1420 else
1422 if (this->total_snd_ == 0)
1423 issue_data_warning = 1;
1424 else if (this->total_rcv_ / this->total_snd_ > 2)
1425 issue_data_warning = 1;
1427 if (issue_data_warning)
1428 ACE_DEBUG ((LM_WARNING,
1429 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1431 if (this->tester_ != 0)
1432 this->tester_->client_done (this);
1434 this->id_ = -1;
1435 if (this->handle () != ACE_INVALID_HANDLE)
1437 ACE_OS::closesocket (this->handle ());
1439 this->handle (ACE_INVALID_HANDLE);
1442 void
1443 Client::cancel ()
1445 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1447 this->flg_cancel_ = true;
1448 this->ws_.cancel ();
1449 this->rs_.cancel ();
1450 return;
1453 void
1454 Client::close ()
1456 // This must be called with the lock_ held.
1457 ++this->stop_writing_;
1458 ACE_DEBUG ((LM_DEBUG,
1459 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1460 this->id_, this->io_count_));
1461 return;
1464 void
1465 Client::go (ACE_HANDLE handle, const ACE_INET_Addr &server)
1468 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1470 this->handle (handle);
1471 this->server_addr_.set (server);
1473 // Open send and receive factories.
1474 if (this->ws_.open (*this, handle) == -1)
1475 ACE_ERROR ((LM_ERROR,
1476 ACE_TEXT ("(%t) Client %d: %p\n"),
1477 this->id_,
1478 ACE_TEXT ("ACE_Asynch_Write_Dgram::open")));
1479 else if (this->rs_.open (*this, handle) == -1)
1480 ACE_ERROR ((LM_ERROR,
1481 ACE_TEXT ("(%t) Client %d: %p\n"),
1482 this->id_,
1483 ACE_TEXT ("ACE_Asynch_Read_Dgram::open")));
1484 else if (this->initiate_write () == 0)
1486 if (duplex != 0) // Start an asynchronous read
1487 this->initiate_read ();
1490 if (this->io_count_ > 0)
1491 return;
1493 delete this;
1497 Client::initiate_write ()
1499 if (this->flg_cancel_ || this->handle () == ACE_INVALID_HANDLE)
1500 return -1;
1502 // stop_writing_ is set to 1 to say "stop". To avoid repeating the
1503 // close datagram for every echo, only send it once. Sure, there's a risk
1504 // it will get lost, but since this is most often intra-host, don't
1505 // worry about that very small risk.
1506 if (this->stop_writing_ > 0) // Need to tell server to "close"
1508 if (this->stop_writing_ > 1) // Already told server to close
1509 return 0;
1511 ++this->stop_writing_;
1512 ACE_DEBUG ((LM_DEBUG,
1513 ACE_TEXT ("(%t) Client %d requesting close\n"),
1514 this->id_));
1515 ACE_Message_Block *mb =
1516 new ACE_Message_Block (ACE_OS::strlen (close_req_msg) + 1);
1517 mb->copy (close_req_msg);
1518 size_t unused; // Number of bytes sent
1519 if (this->ws_.send (mb, unused, 0, this->server_addr_) == -1)
1521 mb->release ();
1522 ACE_ERROR_RETURN ((LM_ERROR,
1523 ACE_TEXT ("(%t) Client %d, %p\n"),
1524 this->id_,
1525 ACE_TEXT ("initiating closing send")),
1526 -1);
1529 this->io_count_++;
1530 this->total_w_++;
1531 return 0;
1534 static const size_t complete_message_length =
1535 ACE_OS::strlen (complete_message);
1537 #if defined (ACE_WIN32)
1539 ACE_Message_Block *mb1 = 0,
1540 *mb2 = 0,
1541 *mb3 = 0;
1543 // No need to allocate +1 for proper printing - the memory includes it already
1544 ACE_NEW_RETURN (mb1,
1545 ACE_Message_Block ((char *)complete_message,
1546 complete_message_length),
1547 -1);
1549 ACE_NEW_RETURN (mb2,
1550 ACE_Message_Block ((char *)complete_message,
1551 complete_message_length),
1552 -1);
1554 ACE_NEW_RETURN (mb3,
1555 ACE_Message_Block ((char *)complete_message,
1556 complete_message_length),
1557 -1);
1559 mb1->wr_ptr (complete_message_length);
1560 mb2->wr_ptr (complete_message_length);
1561 mb3->wr_ptr (complete_message_length);
1563 // chain them together
1564 mb1->cont (mb2);
1565 mb2->cont (mb3);
1567 size_t unused; // Number of bytes sent
1568 if (this->ws_.send (mb1, unused, 0, this->server_addr_) == -1)
1570 mb1->release ();
1571 ACE_ERROR_RETURN((LM_ERROR,
1572 ACE_TEXT ("(%t) %p\n"),
1573 ACE_TEXT ("Client::ACE_Asynch_Write_Dgram::send")),
1574 -1);
1576 #else /* ACE_WIN32 */
1578 ACE_Message_Block *mb = 0;
1580 // No need to allocate +1 for proper printing - the memory includes
1581 // it already
1582 ACE_NEW_RETURN (mb,
1583 ACE_Message_Block (complete_message,
1584 complete_message_length),
1585 -1);
1586 mb->wr_ptr (complete_message_length);
1587 size_t unused; // Number of bytes sent
1588 if (this->ws_.send (mb, unused, 0, this->server_addr_) == -1)
1590 mb->release ();
1591 ACE_ERROR_RETURN((LM_ERROR,
1592 ACE_TEXT ("(%t) Client %d, %p\n"),
1593 this->id_,
1594 ACE_TEXT ("send")),
1595 -1);
1597 #endif /* ACE_WIN32 */
1599 this->io_count_++;
1600 this->total_w_++;
1601 return 0;
1605 Client::initiate_read ()
1607 if (this->flg_cancel_ || this->handle_ == ACE_INVALID_HANDLE)
1608 return -1;
1610 static const size_t complete_message_length =
1611 ACE_OS::strlen (complete_message);
1613 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
1614 ACE_Message_Block *mb1 = 0,
1615 *mb2 = 0,
1616 *mb3 = 0,
1617 *mb4 = 0,
1618 *mb5 = 0,
1619 *mb6 = 0;
1621 // We allocate +1 only for proper printing - we can just set the last byte
1622 // to '\0' before printing out
1623 ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1);
1624 ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1);
1625 ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1);
1627 // Let allocate memory for one more triplet,
1628 // This improves performance
1629 // as we can receive more the than one block at once
1630 // Generally, we can receive more triplets ....
1631 ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1);
1632 ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1);
1633 ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1);
1635 mb1->cont (mb2);
1636 mb2->cont (mb3);
1638 mb3->cont (mb4);
1639 mb4->cont (mb5);
1640 mb5->cont (mb6);
1643 // hide last byte in each message block, reserving it for later to set '\0'
1644 // for proper printouts
1645 mb1->size (mb1->size () - 1);
1646 mb2->size (mb2->size () - 1);
1647 mb3->size (mb3->size () - 1);
1649 mb4->size (mb4->size () - 1);
1650 mb5->size (mb5->size () - 1);
1651 mb6->size (mb6->size () - 1);
1653 // Inititiate read
1654 size_t unused = 0;
1655 if (this->rs_.recv (mb1, unused, 0) == -1)
1657 mb1->release ();
1658 ACE_ERROR_RETURN ((LM_ERROR,
1659 ACE_TEXT ("(%t) %p\n"),
1660 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1661 -1);
1663 #else /* ACE_HAS_WIN32_OVERLAPPED_IO */
1665 // Try to read more chunks
1666 size_t blksize = ( complete_message_length > BUFSIZ ) ?
1667 complete_message_length : BUFSIZ;
1669 ACE_Message_Block *mb = 0;
1671 // We allocate +1 only for proper printing - we can just set the last byte
1672 // to '\0' before printing out
1673 ACE_NEW_RETURN (mb,
1674 ACE_Message_Block (blksize + 1),
1675 -1);
1677 // Inititiate read
1678 size_t unused = 0;
1679 if (this->rs_.recv (mb, unused, 0) == -1)
1681 mb->release ();
1682 ACE_ERROR_RETURN ((LM_ERROR,
1683 ACE_TEXT ("(%t) Client %d, %p\n"),
1684 this->id_,
1685 ACE_TEXT ("read")),
1686 -1);
1688 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
1690 this->io_count_++;
1691 this->total_r_++;
1692 return 0;
1695 void
1696 Client::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
1699 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1701 ACE_Message_Block *mb = result.message_block ();
1703 if (loglevel > 1)
1705 LogLocker log_lock;
1707 ACE_DEBUG ((LM_DEBUG,
1708 ACE_TEXT ("(%t) **** Client %d: handle_write_dgram() ****\n"),
1709 this->id_));
1710 ACE_DEBUG ((LM_DEBUG,
1711 ACE_TEXT ("%s = %B\n"),
1712 ACE_TEXT ("bytes_to_write"),
1713 result.bytes_to_write ()));
1714 ACE_DEBUG ((LM_DEBUG,
1715 ACE_TEXT ("%s = %d\n"),
1716 ACE_TEXT ("handle"),
1717 result.handle ()));
1718 ACE_DEBUG ((LM_DEBUG,
1719 ACE_TEXT ("%s = %B\n"),
1720 ACE_TEXT ("bytes_transfered"),
1721 result.bytes_transferred ()));
1722 ACE_DEBUG ((LM_DEBUG,
1723 ACE_TEXT ("%s = %@\n"),
1724 ACE_TEXT ("act"),
1725 result.act ()));
1726 ACE_DEBUG ((LM_DEBUG,
1727 ACE_TEXT ("%s = %d\n"),
1728 ACE_TEXT ("success"),
1729 result.success ()));
1730 ACE_DEBUG ((LM_DEBUG,
1731 ACE_TEXT ("%s = %@\n"),
1732 ACE_TEXT ("completion_key"),
1733 result.completion_key ()));
1734 ACE_DEBUG ((LM_DEBUG,
1735 ACE_TEXT ("%s = %d\n"),
1736 ACE_TEXT ("error"),
1737 result.error ()));
1739 #if defined (ACE_WIN32)
1740 size_t bytes_transferred = result.bytes_transferred ();
1741 char index = 0;
1742 for (ACE_Message_Block* mb_i = mb;
1743 (mb_i != 0) && (bytes_transferred > 0);
1744 mb_i = mb_i->cont ())
1746 // write 0 at string end for proper printout (if end of mb,
1747 // it's 0 already)
1748 mb_i->rd_ptr()[0] = '\0';
1750 size_t len = mb_i->rd_ptr () - mb_i->base ();
1752 // move rd_ptr backwards as required for printout
1753 if (len >= bytes_transferred)
1755 mb_i->rd_ptr (0 - bytes_transferred);
1756 bytes_transferred = 0;
1758 else
1760 mb_i->rd_ptr (0 - len);
1761 bytes_transferred -= len;
1764 ++index;
1765 ACE_DEBUG ((LM_DEBUG,
1766 ACE_TEXT ("%s%d = %s\n"),
1767 ACE_TEXT ("message_block, part "),
1768 index,
1769 mb_i->rd_ptr ()));
1771 #else /* ACE_WIN32 */
1772 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1773 mb->rd_ptr()[0] = '\0';
1774 // move rd_ptr backwards as required for printout
1775 mb->rd_ptr (- result.bytes_transferred ());
1776 ACE_DEBUG ((LM_DEBUG,
1777 ACE_TEXT ("%s = %s\n"),
1778 ACE_TEXT ("message_block"),
1779 mb->rd_ptr ()));
1780 #endif /* ACE_WIN32 */
1782 ACE_DEBUG ((LM_DEBUG,
1783 ACE_TEXT ("**** end of message ****************\n")));
1785 else if (result.error () != 0)
1787 ACE_Log_Priority prio;
1788 #if defined (ACE_WIN32)
1789 if (result.error () == ERROR_OPERATION_ABORTED)
1790 prio = LM_DEBUG;
1791 #else
1792 if (result.error () == ECANCELED)
1793 prio = LM_DEBUG;
1794 #endif /* ACE_WIN32 */
1795 else
1796 prio = LM_ERROR;
1797 ACE_LOG_MSG->errnum (result.error ());
1798 ACE_LOG_MSG->log (prio,
1799 ACE_TEXT ("(%t) Client %d; %p\n"),
1800 this->id_,
1801 ACE_TEXT ("write"));
1803 else if (loglevel > 0)
1805 ACE_DEBUG ((LM_DEBUG,
1806 ACE_TEXT ("(%t) Client %d: wrote %B bytes ok\n"),
1807 this->id_,
1808 result.bytes_transferred ()));
1811 mb->release ();
1813 if (result.error () == 0 && result.bytes_transferred () > 0)
1815 this->total_snd_ += result.bytes_transferred ();
1816 if (this->total_snd_ >= xfer_limit)
1818 ACE_DEBUG ((LM_DEBUG,
1819 ACE_TEXT ("(%t) Client %d sent %B, limit %B\n"),
1820 this->id_, this->total_snd_, xfer_limit));
1821 this->close ();
1823 if (duplex != 0) // full duplex, continue write
1825 if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
1826 this->initiate_write ();
1828 else // half-duplex read reply, after read we will start write
1829 this->initiate_read ();
1832 --this->io_count_;
1833 if (this->io_count_ > 0)
1834 return;
1836 delete this;
1839 void
1840 Client::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
1843 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1845 ACE_Message_Block *mb = result.message_block ();
1847 if (loglevel > 1)
1849 LogLocker log_lock;
1851 ACE_DEBUG ((LM_DEBUG,
1852 ACE_TEXT ("(%t) **** Client %d: handle_read_dgram() ****\n"),
1853 this->id_));
1854 ACE_DEBUG ((LM_DEBUG,
1855 ACE_TEXT ("%s = %B\n"),
1856 ACE_TEXT ("bytes_to_read"),
1857 result.bytes_to_read ()));
1858 ACE_DEBUG ((LM_DEBUG,
1859 ACE_TEXT ("%s = %d\n"),
1860 ACE_TEXT ("handle"),
1861 result.handle ()));
1862 ACE_DEBUG ((LM_DEBUG,
1863 ACE_TEXT ("%s = %B\n"),
1864 ACE_TEXT ("bytes_transfered"),
1865 result.bytes_transferred ()));
1866 ACE_DEBUG ((LM_DEBUG,
1867 ACE_TEXT ("%s = %@\n"),
1868 ACE_TEXT ("act"),
1869 result.act ()));
1870 ACE_DEBUG ((LM_DEBUG,
1871 ACE_TEXT ("%s = %d\n"),
1872 ACE_TEXT ("success"),
1873 result.success ()));
1874 ACE_DEBUG ((LM_DEBUG,
1875 ACE_TEXT ("%s = %@\n"),
1876 ACE_TEXT ("completion_key"),
1877 result.completion_key ()));
1878 ACE_DEBUG ((LM_DEBUG,
1879 ACE_TEXT ("%s = %d\n"),
1880 ACE_TEXT ("error"),
1881 result.error ()));
1883 #if defined (ACE_WIN32)
1884 char index = 0;
1885 for (ACE_Message_Block* mb_i = mb;
1886 mb_i != 0;
1887 mb_i = mb_i->cont ())
1889 ++index;
1890 // write 0 at string end for proper printout
1891 mb_i->wr_ptr()[0] = '\0';
1893 ACE_DEBUG ((LM_DEBUG,
1894 ACE_TEXT ("%s%d = %s\n"),
1895 ACE_TEXT ("message_block, part "),
1896 index,
1897 mb_i->rd_ptr ()));
1899 #else /* ACE_WIN32 */
1900 // write 0 at string end for proper printout
1901 mb->rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout
1902 ACE_DEBUG ((LM_DEBUG,
1903 ACE_TEXT ("%s = %s\n"),
1904 ACE_TEXT ("message_block"),
1905 mb->rd_ptr ()));
1906 #endif /* ACE_WIN32 */
1908 ACE_DEBUG ((LM_DEBUG,
1909 ACE_TEXT ("**** end of message ****************\n")));
1911 else if (result.error () != 0)
1913 ACE_Log_Priority prio;
1914 #if defined (ACE_WIN32)
1915 if (result.error () == ERROR_OPERATION_ABORTED)
1916 prio = LM_DEBUG;
1917 #else
1918 if (result.error () == ECANCELED)
1919 prio = LM_DEBUG;
1920 #endif /* ACE_WIN32 */
1921 else
1922 prio = LM_ERROR;
1923 ACE_Log_Msg::instance ()->errnum (result.error ());
1924 ACE_Log_Msg::instance ()->log (prio,
1925 ACE_TEXT ("(%t) Client %d; %p\n"),
1926 this->id_,
1927 ACE_TEXT ("read"));
1929 else if (loglevel > 0)
1931 ACE_DEBUG ((LM_DEBUG,
1932 ACE_TEXT ("(%t) Client %d: read %B bytes ok\n"),
1933 this->id_,
1934 result.bytes_transferred ()));
1937 if (result.error () == 0 && result.bytes_transferred () > 0)
1939 this->total_rcv_ += result.bytes_transferred ();
1941 // If we've closed and the server acked, we're done.
1942 if (this->stop_writing_ &&
1943 ACE_OS::strcmp (mb->rd_ptr (), close_ack_msg) == 0)
1945 ACE_DEBUG ((LM_DEBUG,
1946 ACE_TEXT ("(%t) Client %d recvd close-ack\n"),
1947 this->id_));
1949 else
1951 if (duplex != 0)
1952 this->initiate_read ();
1953 else // half-duplex write, after write we will start read
1954 this->initiate_write ();
1958 mb->release ();
1959 --this->io_count_;
1960 if (this->io_count_ > 0)
1961 return;
1963 delete this;
1966 // *************************************************************
1967 // Configuration helpers
1968 // *************************************************************
1970 print_usage (int /* argc */, ACE_TCHAR *argv[])
1972 ACE_ERROR
1973 ((LM_ERROR,
1974 ACE_TEXT ("\nusage: %s")
1975 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1976 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1977 ACE_TEXT ("\n a AIOCB")
1978 ACE_TEXT ("\n i SIG")
1979 ACE_TEXT ("\n c CB")
1980 ACE_TEXT ("\n d default")
1981 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1982 ACE_TEXT ("\n-h <host> for Client mode")
1983 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1984 ACE_TEXT ("\n-p <port to listen/connect>")
1985 ACE_TEXT ("\n-c <number of client instances>")
1986 ACE_TEXT ("\n-b run client and server at the same time")
1987 ACE_TEXT ("\n f file")
1988 ACE_TEXT ("\n c console")
1989 ACE_TEXT ("\n-v log level")
1990 ACE_TEXT ("\n 0 - log errors and highlights")
1991 ACE_TEXT ("\n 1 - log level 0 plus progress information")
1992 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
1993 ACE_TEXT ("\n-x max transfer byte count per Client")
1994 ACE_TEXT ("\n-u show this message")
1995 ACE_TEXT ("\n"),
1996 argv[0]
1998 return -1;
2001 static int
2002 set_proactor_type (const ACE_TCHAR *ptype)
2004 if (!ptype)
2005 return 0;
2007 switch (ACE_OS::ace_toupper (*ptype))
2009 case 'D':
2010 proactor_type = DEFAULT;
2011 return 1;
2012 case 'A':
2013 proactor_type = AIOCB;
2014 return 1;
2015 case 'I':
2016 proactor_type = SIG;
2017 return 1;
2018 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
2019 case 'C':
2020 proactor_type = CB;
2021 return 1;
2022 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
2023 default:
2024 break;
2026 return 0;
2029 static int
2030 parse_args (int argc, ACE_TCHAR *argv[])
2032 // First, set up all the defaults then let any args change them.
2033 both = 1; // client and server simultaneosly
2034 duplex = 1; // full duplex is on
2035 host = ACE_LOCALHOST; // server to connect
2036 port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
2037 max_aio_operations = 512; // POSIX Proactor params
2038 proactor_type = DEFAULT; // Proactor type = default
2039 threads = 3; // size of Proactor thread pool
2040 clients = 10; // number of clients
2041 loglevel = 0; // log level : only errors and highlights
2042 // Default transfer limit 50 messages per Sender
2043 xfer_limit = 50 * ACE_OS::strlen (complete_message);
2045 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
2046 # if defined (ACE_LINUX)
2047 duplex = 0;
2048 #endif
2050 if (argc == 1) // no arguments , so one button test
2051 return 0;
2053 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
2054 int c;
2056 while ((c = get_opt ()) != EOF)
2058 switch (c)
2060 case 'x': // xfer limit
2061 xfer_limit = static_cast<size_t> (ACE_OS::atoi (get_opt.opt_arg ()));
2062 if (xfer_limit == 0)
2063 xfer_limit = 1; // Bare minimum.
2064 break;
2065 case 'b': // both client and server
2066 both = 1;
2067 break;
2068 case 'v': // log level
2069 loglevel = ACE_OS::atoi (get_opt.opt_arg ());
2070 break;
2071 case 'd': // duplex
2072 duplex = ACE_OS::atoi (get_opt.opt_arg ());
2073 break;
2074 case 'h': // host for sender
2075 host = get_opt.opt_arg ();
2076 break;
2077 case 'p': // port number
2078 port = ACE_OS::atoi (get_opt.opt_arg ());
2079 break;
2080 case 'n': // thread pool size
2081 threads = ACE_OS::atoi (get_opt.opt_arg ());
2082 break;
2083 case 'c': // number of clients
2084 clients = ACE_OS::atoi (get_opt.opt_arg ());
2085 if (clients > MAX_CLIENTS)
2086 clients = MAX_CLIENTS;
2087 break;
2088 case 'o': // max number of aio for proactor
2089 max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
2090 break;
2091 case 't': // Proactor Type
2092 if (set_proactor_type (get_opt.opt_arg ()))
2093 break;
2094 return print_usage (argc, argv);
2095 case 'u':
2096 default:
2097 return print_usage (argc, argv);
2098 } // switch
2099 } // while
2101 return 0;
2105 run_main (int argc, ACE_TCHAR *argv[])
2107 ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
2109 if (::parse_args (argc, argv) == -1)
2110 return -1;
2112 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
2113 disable_signal (SIGPIPE, SIGPIPE);
2115 MyTask task1;
2116 TestData test;
2118 if (task1.start (threads, proactor_type, max_aio_operations) == 0)
2120 // NOTE - there's no real reason this test is limited to IPv4 other
2121 // than the way Session_Data is set up - to expand this test to work
2122 // on IPv6 as well as IPv4, you need to do some work on passing the
2123 // Session_Data address differently.
2124 ACE_INET_Addr addr (port, ACE_LOCALHOST, AF_INET);
2125 Master master (&test, addr, clients);
2126 Connector connector (&test);
2127 int rc = 0;
2129 if (both != 0 || host == 0) // Acceptor
2131 // Already running; if not needed will be deleted soon.
2132 rc = 1;
2135 if (both != 0 || host != 0)
2137 if (host == 0)
2138 host = ACE_LOCALHOST;
2140 if (addr.set (port, host, 1, addr.get_type ()) == -1)
2141 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
2142 else
2143 rc += connector.start (addr, clients);
2146 // Wait a few seconds to let things get going, then poll til
2147 // all sessions are done. Note that when we exit this scope, the
2148 // Acceptor and Connector will be destroyed, which should prevent
2149 // further connections and also test how well destroyed handlers
2150 // are handled.
2151 ACE_OS::sleep (3);
2153 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
2154 while (!test.testing_done ())
2155 ACE_OS::sleep (1);
2157 test.stop_all ();
2159 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
2160 task1.stop ();
2162 ACE_END_TEST;
2164 return 0;
2167 #else
2170 run_main (int, ACE_TCHAR *[])
2172 ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
2174 ACE_DEBUG ((LM_INFO,
2175 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
2176 ACE_TEXT ("Proactor_UDP_Test will not be run.\n")));
2178 ACE_END_TEST;
2180 return 0;
2183 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */