1 // ============================================================================
3 * @file Proactor_UDP_Test.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that uses UDP/IP communications.
8 * @author Steve Huston <shuston@riverace.com>, based on Proactor_Test.cpp
10 // ============================================================================
12 #include "test_config.h"
14 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
15 // This only works on Win32 platforms and on Unix platforms
16 // supporting POSIX aio calls.
18 #include "ace/Signal.h"
20 #include "ace/Service_Config.h"
21 #include "ace/INET_Addr.h"
22 #include "ace/SOCK_CODgram.h"
23 #include "ace/SOCK_Dgram.h"
24 #include "ace/Object_Manager.h"
25 #include "ace/Get_Opt.h"
27 #include "ace/Proactor.h"
29 #include "ace/Thread_Semaphore.h"
30 #include "ace/OS_NS_ctype.h"
31 #include "ace/OS_NS_errno.h"
32 #include "ace/OS_NS_signal.h"
33 #include "ace/OS_NS_string.h"
34 #include "ace/OS_NS_unistd.h"
35 #include "ace/OS_NS_sys_socket.h"
36 #include "ace/Sock_Connect.h"
37 #include "ace/os_include/netinet/os_tcp.h"
39 #include "ace/Atomic_Op.h"
40 #include "ace/Synch_Traits.h"
42 #if defined (ACE_WIN32)
44 # include "ace/WIN32_Proactor.h"
46 #elif defined (ACE_HAS_AIO_CALLS)
48 # include "ace/POSIX_Proactor.h"
49 # include "ace/POSIX_CB_Proactor.h"
50 # include "ace/SUN_Proactor.h"
52 #endif /* ACE_WIN32 */
54 // Proactor Type (UNIX only, Win32 ignored)
55 typedef enum { DEFAULT
= 0, AIOCB
, SIG
, SUN
, CB
} ProactorType
;
56 static ProactorType proactor_type
= DEFAULT
;
58 // POSIX : > 0 max number aio operations proactor,
59 static size_t max_aio_operations
= 0;
61 // both: 0 run client or server / depends on host
62 // != 0 run client and server
65 // Host that we're connecting to.
66 static const ACE_TCHAR
*host
= 0;
68 // number of Client instances
69 static int clients
= 1;
70 const int MAX_CLIENTS
= 1000;
71 const int MAX_SERVERS
= 1000;
73 // duplex mode: == 0 half-duplex
75 static int duplex
= 0;
77 // number threads in the Proactor thread pool
78 static int threads
= 1;
80 // Port that we're receiving session initiations on.
81 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
84 static int loglevel
; // 0 full , 1 only errors
86 static size_t xfer_limit
; // Number of bytes for Client to send.
88 static char complete_message
[] =
91 "Accept-Language: C++\r\n"
92 "Accept-Encoding: gzip, deflate\r\n"
93 "User-Agent: Proactor_Test/1.0 (non-compatible)\r\n"
94 "Connection: Keep-Alive\r\n"
97 static char close_req_msg
[] = "CLOSE";
98 static char close_ack_msg
[] = "CLOSE-ACK";
104 LogLocker () { ACE_LOG_MSG
->acquire (); }
105 virtual ~LogLocker () { ACE_LOG_MSG
->release (); }
108 // Function to remove signals from the signal mask.
110 disable_signal (int sigmin
, int sigmax
)
112 #if !defined (ACE_LACKS_UNIX_SIGNALS)
114 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
115 ACE_ERROR ((LM_ERROR
,
116 ACE_TEXT ("Error: (%P|%t):%p\n"),
117 ACE_TEXT ("sigemptyset failed")));
119 for (int i
= sigmin
; i
<= sigmax
; i
++)
120 ACE_OS::sigaddset (&signal_set
, i
);
122 // Put the <signal_set>.
123 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
124 // In multi-threaded application this is not POSIX compliant
125 // but let's leave it just in case.
126 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
128 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
129 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
130 ACE_ERROR_RETURN ((LM_ERROR
,
131 ACE_TEXT ("Error: (%P|%t): %p\n"),
132 ACE_TEXT ("SIG_BLOCK failed")),
135 ACE_UNUSED_ARG (sigmin
);
136 ACE_UNUSED_ARG (sigmax
);
137 #endif /* ACE_LACKS_UNIX_SIGNALS */
142 // *************************************************************
143 // MyTask is ACE_Task resposible for :
144 // 1. creation and deletion of
145 // Proactor and Proactor thread pool
146 // 2. running Proactor event loop
147 // *************************************************************
152 * MyTask plays role for Proactor threads pool
154 * MyTask is ACE_Task resposible for:
155 * 1. Creation and deletion of Proactor and Proactor thread pool
156 * 2. Running Proactor event loop
158 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
163 sem_ ((unsigned int) 0),
168 (void) this->stop ();
169 (void) this->delete_proactor();
172 virtual int svc (void);
174 int start (int num_threads
,
175 ProactorType type_proactor
,
180 int create_proactor (ProactorType type_proactor
,
182 int delete_proactor (void);
184 ACE_SYNCH_RECURSIVE_MUTEX lock_
;
185 ACE_Thread_Semaphore sem_
;
186 ACE_Proactor
* proactor_
;
191 MyTask::create_proactor (ProactorType type_proactor
, size_t max_op
)
193 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
198 ACE_TEST_ASSERT (this->proactor_
== 0);
200 #if defined (ACE_WIN32)
202 ACE_UNUSED_ARG (type_proactor
);
203 ACE_UNUSED_ARG (max_op
);
205 ACE_WIN32_Proactor
*proactor_impl
= 0;
207 ACE_NEW_RETURN (proactor_impl
,
211 ACE_DEBUG ((LM_DEBUG
,
212 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
214 #elif defined (ACE_HAS_AIO_CALLS)
216 ACE_POSIX_Proactor
* proactor_impl
= 0;
218 switch (type_proactor
)
221 ACE_NEW_RETURN (proactor_impl
,
222 ACE_POSIX_AIOCB_Proactor (max_op
),
224 ACE_DEBUG ((LM_DEBUG
,
225 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
228 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
230 ACE_NEW_RETURN (proactor_impl
,
231 ACE_POSIX_SIG_Proactor (max_op
),
233 ACE_DEBUG ((LM_DEBUG
,
234 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
236 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
240 ACE_NEW_RETURN (proactor_impl
,
241 ACE_SUN_Proactor (max_op
),
243 ACE_DEBUG ((LM_DEBUG
,
244 ACE_TEXT("(%t) Create Proactor Type = SUN\n")));
248 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
250 ACE_NEW_RETURN (proactor_impl
,
251 ACE_POSIX_CB_Proactor (max_op
),
253 ACE_DEBUG ((LM_DEBUG
,
254 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
256 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
259 ACE_DEBUG ((LM_DEBUG
,
260 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
264 #endif /* ACE_WIN32 */
266 // always delete implementation 1 , not !(proactor_impl == 0)
267 ACE_NEW_RETURN (this->proactor_
,
268 ACE_Proactor (proactor_impl
, 1 ),
270 // Set new singleton and delete it in close_singleton()
271 ACE_Proactor::instance (this->proactor_
, 1);
276 MyTask::delete_proactor (void)
278 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
283 ACE_DEBUG ((LM_DEBUG
,
284 ACE_TEXT ("(%t) Delete Proactor\n")));
286 ACE_Proactor::close_singleton ();
293 MyTask::start (int num_threads
,
294 ProactorType type_proactor
,
297 if (this->create_proactor (type_proactor
, max_op
) == -1)
298 ACE_ERROR_RETURN ((LM_ERROR
,
300 ACE_TEXT ("unable to create proactor")),
303 if (this->activate (THR_NEW_LWP
, num_threads
) == -1)
304 ACE_ERROR_RETURN ((LM_ERROR
,
306 ACE_TEXT ("unable to activate thread pool")),
309 for (; num_threads
> 0; num_threads
--)
321 if (this->proactor_
!= 0)
323 ACE_DEBUG ((LM_DEBUG
,
324 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
326 this->proactor_
->proactor_end_event_loop ();
329 if (this->wait () == -1)
330 ACE_ERROR ((LM_ERROR
,
332 ACE_TEXT ("unable to stop thread pool")));
340 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask started\n")));
342 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
343 disable_signal (SIGPIPE
, SIGPIPE
);
345 // signal that we are ready
348 this->proactor_
->proactor_run_event_loop ();
350 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask finished\n")));
354 // forward declaration
357 // "Server" is one side of a session. It's the same idea as in TCP, but
358 // there's no acceptor in UDP; sessions are started by the client sending
359 // a "start" datagram to a well-known UDP port. The start datagram tells
360 // which port number the client is receiving on. The server then sends an
361 // "ack" datagram to indicate the session is set up successfully and to say
362 // which port the server is listening on. Thus, a unique pairing of server
363 // and client port numbers is established. Each session will require a
364 // separate server-side socket as well as the client. Note that experienced
365 // UDP programmers will be quivering at this point knowing that there's no
366 // reason to have multiple server-side sockets, and no real reason to
367 // pre-register the client ports either since all the addressing info is
368 // available on normal UDP programming. However, this is all necessary in
369 // the POSIX case since the POSIX aio functions were not designed with UDP
370 // in mind, and the addressing information is not available in UDP receive
371 // completion callbacks; thus, each socket needs to be fully connected before
372 // running session data. The addressing information needed to run this
373 // use-case in the "normal" way is available on Windows, but this test runs
374 // across many platforms, so can't rely on that information.
375 class Server
: public ACE_Handler
379 Server (TestData
*tester
, int id
);
382 int id (void) { return this->id_
; }
383 size_t get_total_snd (void) { return this->total_snd_
; }
384 size_t get_total_rcv (void) { return this->total_rcv_
; }
385 long get_total_w (void) { return this->total_w_
; }
386 long get_total_r (void) { return this->total_r_
; }
388 /// This is called after the new session has been established.
389 void go (ACE_HANDLE handle
, const ACE_INET_Addr
&client
);
395 * @name AIO callback handling
397 * These methods are called by the framework
399 /// This is called when asynchronous <read> operation from the
400 /// socket completes.
401 virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
);
403 /// This is called when an asynchronous <write> to the socket
405 virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result
&result
);
408 int initiate_read (void);
409 int initiate_write (ACE_Message_Block
*mb
, size_t nbytes
);
414 ACE_INET_Addr client_addr_
;
415 ACE_Asynch_Read_Dgram rs_
;
416 ACE_Asynch_Write_Dgram ws_
;
417 ACE_SYNCH_MUTEX lock_
;
419 int io_count_
; // Number of currently outstanding I/O requests
422 size_t total_snd_
; // Number of bytes successfully sent
423 size_t total_rcv_
; // Number of bytes successfully received
424 int total_w_
; // Number of write operations
425 int total_r_
; // Number of read operations
428 // *******************************************
430 // *******************************************
432 class Client
: public ACE_Handler
436 Client (TestData
*tester
, int id
);
439 void go (ACE_HANDLE h
, const ACE_INET_Addr
&server
);
440 int id (void) { return this->id_
; }
441 size_t get_total_snd (void) { return this->total_snd_
; }
442 size_t get_total_rcv (void) { return this->total_rcv_
; }
443 int get_total_w (void) { return this->total_w_
; }
444 int get_total_r (void) { return this->total_r_
; }
446 // This is called when asynchronous reads from the socket complete
447 virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
);
449 // This is called when asynchronous writes from the socket complete
450 virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result
&result
);
455 int initiate_read (void);
456 int initiate_write (void);
457 // FUZZ: disable check_for_lack_ACE_OS
459 // FUZZ: enable check_for_lack_ACE_OS
464 ACE_INET_Addr server_addr_
;
465 ACE_Asynch_Read_Dgram rs_
;
466 ACE_Asynch_Write_Dgram ws_
;
468 ACE_SYNCH_MUTEX lock_
;
471 int stop_writing_
; // Writes are shut down; just read.
479 // TestData collects and reports on test-related transfer and connection
485 bool testing_done (void);
486 Server
*server_up (void);
487 Client
*client_up (void);
488 void server_done (Server
*s
);
489 void client_done (Client
*c
);
490 void stop_all (void);
496 // Track number of sessions that report start, and those that report
497 // their end (and stats).
498 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_up_
;
499 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_down_
;
501 // Total read and write bytes for all sessions.
502 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_cnt_
;
503 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_cnt_
;
504 // Total read and write operations issues for all sessions.
505 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_ops_
;
506 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_ops_
;
507 } servers_
, clients_
;
509 ACE_SYNCH_MUTEX list_lock_
;
510 Server
*server_list_
[MAX_SERVERS
];
511 Client
*client_list_
[MAX_CLIENTS
];
514 TestData::TestData ()
517 for (i
= 0; i
< MAX_SERVERS
; ++i
)
518 this->server_list_
[i
] = 0;
519 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
520 this->client_list_
[i
] = 0;
524 TestData::testing_done (void)
526 int svr_up
= this->servers_
.sessions_up_
.value ();
527 int svr_dn
= this->servers_
.sessions_down_
.value ();
528 int clt_up
= this->clients_
.sessions_up_
.value ();
529 int clt_dn
= this->clients_
.sessions_down_
.value ();
531 if (svr_up
== 0 && clt_up
== 0) // No connections up yet
534 return (svr_dn
>= svr_up
&& clt_dn
>= clt_up
);
538 TestData::server_up (void)
540 ++this->servers_
.sessions_up_
;
541 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
543 for (int i
= 0; i
< MAX_SERVERS
; ++i
)
545 if (this->server_list_
[i
] == 0)
547 ACE_NEW_RETURN (this->server_list_
[i
], Server (this, i
), 0);
548 ACE_DEBUG ((LM_DEBUG
,
549 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
551 this->servers_
.sessions_up_
.value (),
552 this->servers_
.sessions_down_
.value ()));
553 return this->server_list_
[i
];
560 TestData::client_up (void)
562 ++this->clients_
.sessions_up_
;
563 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
565 for (int i
= 0; i
< MAX_CLIENTS
; ++i
)
567 if (this->client_list_
[i
] == 0)
569 ACE_NEW_RETURN (this->client_list_
[i
], Client (this, i
), 0);
570 ACE_DEBUG ((LM_DEBUG
,
571 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
573 this->clients_
.sessions_up_
.value (),
574 this->clients_
.sessions_down_
.value ()));
575 return this->client_list_
[i
];
582 TestData::server_done (Server
*s
)
584 this->servers_
.w_cnt_
+= s
->get_total_snd ();
585 this->servers_
.r_cnt_
+= s
->get_total_rcv ();
586 this->servers_
.w_ops_
+= s
->get_total_w ();
587 this->servers_
.r_ops_
+= s
->get_total_r ();
588 ++this->servers_
.sessions_down_
;
589 ACE_DEBUG ((LM_DEBUG
,
590 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
592 this->servers_
.sessions_up_
.value (),
593 this->servers_
.sessions_down_
.value ()));
595 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
597 for (i
= 0; i
< MAX_SERVERS
; ++i
)
599 if (this->server_list_
[i
] == s
)
602 ACE_ERROR ((LM_ERROR
,
603 ACE_TEXT ("Server %d is pos %d in list\n"),
606 this->server_list_
[i
] = 0;
610 if (i
>= MAX_SERVERS
)
611 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Server %@ done but not listed\n"), s
));
617 TestData::client_done (Client
*c
)
619 this->clients_
.w_cnt_
+= c
->get_total_snd ();
620 this->clients_
.r_cnt_
+= c
->get_total_rcv ();
621 this->clients_
.w_ops_
+= c
->get_total_w ();
622 this->clients_
.r_ops_
+= c
->get_total_r ();
623 ++this->clients_
.sessions_down_
;
624 ACE_DEBUG ((LM_DEBUG
,
625 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
627 this->clients_
.sessions_up_
.value (),
628 this->clients_
.sessions_down_
.value ()));
630 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
632 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
634 if (this->client_list_
[i
] == c
)
637 ACE_ERROR ((LM_ERROR
,
638 ACE_TEXT ("Client %d is pos %d in list\n"),
641 this->client_list_
[i
] = 0;
645 if (i
>= MAX_CLIENTS
)
646 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Client %@ done but not listed\n"), c
));
652 TestData::stop_all (void)
656 // Lock and cancel everything. Then release the lock, possibly allowing
657 // cleanups, then grab it again and delete all Servers and Clients.
659 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
660 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
662 if (this->client_list_
[i
] != 0)
663 this->client_list_
[i
]->cancel ();
666 for (i
= 0; i
< MAX_SERVERS
; ++i
)
668 if (this->server_list_
[i
] != 0)
669 this->server_list_
[i
]->cancel ();
673 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
674 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
676 if (this->client_list_
[i
] != 0)
677 delete this->client_list_
[i
];
680 for (i
= 0; i
< MAX_SERVERS
; ++i
)
682 if (this->server_list_
[i
] != 0)
683 delete this->server_list_
[i
];
689 TestData::report (void)
692 ACE_TCHAR bufs
[256];
693 ACE_TCHAR bufr
[256];
695 ACE_OS::snprintf (bufs
, 256,
696 ACE_SIZE_T_FORMAT_SPECIFIER
697 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
698 this->clients_
.w_cnt_
.value (),
699 this->clients_
.w_ops_
.value ());
701 ACE_OS::snprintf (bufr
, 256,
702 ACE_SIZE_T_FORMAT_SPECIFIER
703 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
704 this->clients_
.r_cnt_
.value (),
705 this->clients_
.r_ops_
.value ());
707 ACE_DEBUG ((LM_DEBUG
,
708 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
712 ACE_OS::snprintf (bufs
, 256,
713 ACE_SIZE_T_FORMAT_SPECIFIER
714 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
715 this->servers_
.w_cnt_
.value (),
716 this->servers_
.w_ops_
.value ());
718 ACE_OS::snprintf (bufr
, 256,
719 ACE_SIZE_T_FORMAT_SPECIFIER
720 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
721 this->servers_
.r_cnt_
.value (),
722 this->servers_
.r_ops_
.value ());
724 ACE_DEBUG ((LM_DEBUG
,
725 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
729 if (this->clients_
.w_cnt_
.value () == 0 ||
730 this->clients_
.r_cnt_
.value () == 0 ||
731 this->servers_
.w_cnt_
.value () == 0 ||
732 this->servers_
.r_cnt_
.value () == 0 )
733 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("It appears that this test didn't ")
734 ACE_TEXT ("really do anything. Something is very wrong.\n")));
737 // Session set-up struct.
740 ACE_INT32 direction_
; // 0 == Start, 1 == Ack
741 ACE_INT32 addr_
; // Network byte order, must be IPv4
742 ACE_UINT16 port_
; // UDP port, network byte order
743 Session_Data() { ACE_OS::memset (this, 0, sizeof(*this)); }
746 // Master is the server-side receiver of session establishment requests.
747 // For each "start" dgram received, instantiates a new Server object,
748 // indicating the addressing info for the client.
749 // Master is initialized with a count of the number of expected sessions. After
750 // this number are set up, Master will stop listening for session requests.
751 // This is a bit fragile but is necessary because on HP-UX, AIX, et al., it
752 // is impossible to close/cancel a socket with an outstanding UDP recieve
753 // (on AIX the process is so wedged the machine needs to be rebooted to
754 // clear it!). So, this bit of messiness is necessary for portability.
755 // When the Master is destroyed, it will try to stop establishing sessions
756 // but this will only work on Windows.
757 class Master
: public ACE_Handler
760 Master (TestData
*tester
, const ACE_INET_Addr
&recv_addr
, int expected
);
763 // Called when dgram receive operation completes.
764 virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
);
767 void start_recv (void);
770 ACE_INET_Addr recv_addr_
;
771 ACE_SOCK_Dgram sock_
;
772 ACE_Asynch_Read_Dgram rd_
;
773 ACE_Message_Block
*mb_
;
774 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_expected_
;
775 volatile bool recv_in_progress_
;
778 // *************************************************************
779 Master::Master (TestData
*tester
, const ACE_INET_Addr
&recv_addr
, int expected
)
781 recv_addr_ (recv_addr
),
783 sessions_expected_ (expected
),
784 recv_in_progress_ (false)
786 if (this->sock_
.open (recv_addr
) == -1)
787 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Master socket %p\n"), ACE_TEXT ("open")));
790 if (this->rd_
.open (*this, this->sock_
.get_handle ()) == -1)
791 ACE_ERROR ((LM_ERROR
,
792 ACE_TEXT ("Master reader %p\n"),
794 this->mb_
= new ACE_Message_Block (sizeof (Session_Data
));
799 Master::~Master (void)
801 if (this->recv_in_progress_
)
803 this->sock_
.close ();
807 this->mb_
->release ();
813 Master::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
)
815 // We should only receive Start datagrams with valid addresses to reply to.
816 if (result
.success ())
818 if (result
.bytes_transferred () != sizeof (Session_Data
))
819 ACE_ERROR ((LM_ERROR
,
820 ACE_TEXT ("(%t) Master session data expected %B bytes; ")
821 ACE_TEXT ("received %B\n"),
822 sizeof (Session_Data
),
823 result
.bytes_transferred ()));
826 ACE_Message_Block
*mb
= result
.message_block ();
827 Session_Data
*session
=
828 reinterpret_cast<Session_Data
*>(mb
->rd_ptr ());
829 if (session
->direction_
== 0)
831 ACE_INET_Addr client_addr
, me_addr
;
832 ACE_TCHAR client_str
[80], me_str
[80];
833 client_addr
.set ((u_short
)session
->port_
, session
->addr_
, 0);
834 client_addr
.addr_to_string (client_str
, 80);
836 // Set up the local and remote addresses - need fully-specified
837 // addresses to use UDP aio on Linux. This is the socket that
838 // the session will run over. The addressing info to be sent
839 // back to the Client will be sent over the receive socket
840 // to ensure it goes back to the client initiating the session.
841 ACE_SOCK_CODgram sock
;
842 if (sock
.open (client_addr
) == -1)
844 ACE_ERROR ((LM_ERROR
,
845 ACE_TEXT ("(%t) Master new socket for ")
846 ACE_TEXT ("client %s: %p\n"),
852 sock
.get_local_addr (me_addr
);
853 me_addr
.addr_to_string (me_str
, 80);
854 ACE_DEBUG ((LM_DEBUG
,
855 ACE_TEXT ("(%t) Master setting up server for ")
856 ACE_TEXT ("local %s, peer %s\n"),
860 Session_Data session
;
861 session
.direction_
= 1; // Ack
862 session
.addr_
= ACE_HTONL (me_addr
.get_ip_address ());
863 session
.port_
= ACE_HTONS (me_addr
.get_port_number ());
864 if (this->sock_
.send (&session
,
868 ACE_ERROR ((LM_ERROR
,
869 ACE_TEXT ("(%t) Master reply %p\n"),
875 Server
*server
= this->tester_
->server_up ();
876 server
->go (sock
.get_handle (), client_addr
);
879 if (--this->sessions_expected_
== 0)
881 ACE_DEBUG ((LM_DEBUG
,
882 ACE_TEXT ("All expected sessions are up\n")));
887 ACE_ERROR ((LM_ERROR
,
888 ACE_TEXT ("(%t) Badly formed Session request\n")));
894 ACE_Log_Priority prio
= LM_ERROR
;
895 #if defined (ACE_WIN32)
896 if (result
.error () == ERROR_OPERATION_ABORTED
)
899 if (result
.error () == ECANCELED
)
901 #endif /* ACE_WIN32 */
902 // Multiple steps to log the error without squashing errno.
903 ACE_LOG_MSG
->conditional_set (__FILE__
,
906 (int)(result
.error ()));
907 ACE_LOG_MSG
->log (prio
,
908 ACE_TEXT ("(%t) Master %p\n"),
910 // If canceled, don't try to restart.
911 if (prio
== LM_DEBUG
)
918 Master::start_recv (void)
925 if (this->rd_
.recv (this->mb_
, unused
, 0) == -1)
926 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) Master %p\n"), ACE_TEXT ("recv")));
928 this->recv_in_progress_
= true;
931 // ***************************************************
934 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
937 Server::Server (TestData
*tester
, int id
)
942 flg_closing_ (false),
950 Server::~Server (void)
952 ACE_DEBUG ((LM_DEBUG
,
953 ACE_TEXT ("(%t) Server %d dtor; %d sends (%B bytes); ")
954 ACE_TEXT ("%d recvs (%B bytes)\n"),
956 this->total_w_
, this->total_snd_
,
957 this->total_r_
, this->total_rcv_
));
958 if (this->io_count_
!= 0)
959 ACE_ERROR ((LM_WARNING
,
960 ACE_TEXT ("(%t) Server %d deleted with ")
961 ACE_TEXT ("%d I/O outstanding\n"),
965 // This test bounces data back and forth between Clients and Servers.
966 // Therefore, if there was significantly more data in one direction, that's
967 // a problem. Remember, the byte counts are unsigned values.
968 int issue_data_warning
= 0;
969 if (this->total_snd_
> this->total_rcv_
)
971 if (this->total_rcv_
== 0)
972 issue_data_warning
= 1;
973 else if (this->total_snd_
/ this->total_rcv_
> 2)
974 issue_data_warning
= 1;
978 if (this->total_snd_
== 0)
979 issue_data_warning
= 1;
980 else if (this->total_rcv_
/ this->total_snd_
> 2)
981 issue_data_warning
= 1;
983 if (issue_data_warning
)
984 ACE_DEBUG ((LM_WARNING
,
985 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
987 if (this->tester_
!= 0)
988 this->tester_
->server_done (this);
990 if (this->handle () != ACE_INVALID_HANDLE
)
991 ACE_OS::closesocket (this->handle ());
994 this->handle (ACE_INVALID_HANDLE
);
1000 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1002 this->flg_cancel_
= true;
1003 this->ws_
.cancel ();
1004 this->rs_
.cancel ();
1009 Server::go (ACE_HANDLE handle
, const ACE_INET_Addr
&client
)
1011 this->handle (handle
);
1012 this->client_addr_
.set (client
);
1014 // Lock this before initiating I/O, else it may complete while we're
1015 // still setting up.
1017 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1019 if (this->ws_
.open (*this, this->handle ()) == -1)
1020 ACE_ERROR ((LM_ERROR
,
1021 ACE_TEXT ("(%t) %p\n"),
1022 ACE_TEXT ("Server::ACE_Asynch_Write_Dgram::open")));
1023 else if (this->rs_
.open (*this, this->handle ()) == -1)
1024 ACE_ERROR ((LM_ERROR
,
1025 ACE_TEXT ("(%t) %p\n"),
1026 ACE_TEXT ("Server::ACE_Asynch_Read_Dgram::open")));
1028 this->initiate_read ();
1031 if (this->io_count_
> 0)
1034 delete this; // Error setting up I/O factories
1038 Server::initiate_read (void)
1040 if (this->flg_cancel_
|| this->handle () == ACE_INVALID_HANDLE
)
1043 ACE_Message_Block
*mb
= 0;
1045 ACE_Message_Block (1024), //BUFSIZ + 1),
1048 // Inititiate receive
1050 if (this->rs_
.recv (mb
, unused
, 0) == -1)
1053 ACE_ERROR_RETURN ((LM_ERROR
,
1054 ACE_TEXT ("(%t) Server %d, %p\n"),
1066 Server::initiate_write (ACE_Message_Block
*mb
, size_t nbytes
)
1068 if (this->flg_cancel_
|| this->handle () == ACE_INVALID_HANDLE
)
1077 ACE_ERROR_RETURN((LM_ERROR
,
1078 ACE_TEXT ("(%t) Server %d write nbytes == 0\n"),
1083 if (this->ws_
.send (mb
, nbytes
, 0, this->client_addr_
) == -1)
1086 ACE_ERROR_RETURN((LM_ERROR
,
1087 ACE_TEXT ("(%t) Server %d, %p\n"),
1089 ACE_TEXT ("write")),
1099 Server::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
)
1102 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1104 ACE_Message_Block
*mb
= result
.message_block ();
1107 mb
->rd_ptr ()[result
.bytes_transferred ()] = '\0';
1113 ACE_DEBUG ((LM_DEBUG
,
1114 ACE_TEXT ("(%t) **** Server %d: handle_read_dgram() ****\n"),
1116 ACE_DEBUG ((LM_DEBUG
,
1117 ACE_TEXT ("%s = %B\n"),
1118 ACE_TEXT ("bytes_to_read"),
1119 result
.bytes_to_read ()));
1120 ACE_DEBUG ((LM_DEBUG
,
1121 ACE_TEXT ("%s = %d\n"),
1122 ACE_TEXT ("handle"),
1124 ACE_DEBUG ((LM_DEBUG
,
1125 ACE_TEXT ("%s = %B\n"),
1126 ACE_TEXT ("bytes_transfered"),
1127 result
.bytes_transferred ()));
1128 ACE_DEBUG ((LM_DEBUG
,
1129 ACE_TEXT ("%s = %@\n"),
1132 ACE_DEBUG ((LM_DEBUG
,
1133 ACE_TEXT ("%s = %d\n"),
1134 ACE_TEXT ("success"),
1135 result
.success ()));
1136 ACE_DEBUG ((LM_DEBUG
,
1137 ACE_TEXT ("%s = %@\n"),
1138 ACE_TEXT ("completion_key"),
1139 result
.completion_key ()));
1140 ACE_DEBUG ((LM_DEBUG
,
1141 ACE_TEXT ("%s = %d\n"),
1144 ACE_DEBUG ((LM_DEBUG
,
1145 ACE_TEXT ("%s = %s\n"),
1146 ACE_TEXT ("message_block"),
1148 ACE_DEBUG ((LM_DEBUG
,
1149 ACE_TEXT ("**** end of message ****************\n")));
1151 else if (result
.error () != 0)
1153 ACE_Log_Priority prio
;
1154 #if defined (ACE_WIN32)
1155 if (result
.error () == ERROR_OPERATION_ABORTED
)
1158 if (result
.error () == ECANCELED
)
1160 #endif /* ACE_WIN32 */
1163 ACE_LOG_MSG
->errnum (result
.error ());
1164 ACE_LOG_MSG
->log (prio
,
1165 ACE_TEXT ("(%t) Server %d; %p\n"),
1169 else if (loglevel
> 0)
1171 ACE_DEBUG ((LM_DEBUG
,
1172 ACE_TEXT ("(%t) Server %d: read %B bytes\n"),
1174 result
.bytes_transferred ()));
1177 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1179 this->total_rcv_
+= result
.bytes_transferred ();
1181 // If client says we're done, ack it; we're done reading.
1182 size_t to_send
= result
.bytes_transferred ();
1183 if (ACE_OS::strcmp (mb
->rd_ptr (), close_req_msg
) == 0)
1185 ACE_DEBUG ((LM_DEBUG
,
1186 ACE_TEXT ("(%t) Server %d saw close request; ack\n"),
1188 this->flg_closing_
= true;
1190 mb
->copy (close_ack_msg
);
1191 to_send
= mb
->length ();
1193 if (this->initiate_write (mb
, to_send
) == 0)
1195 if (duplex
!= 0 && !this->flg_closing_
)
1196 this->initiate_read ();
1203 if (this->io_count_
> 0)
1210 Server::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result
&result
)
1213 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1215 ACE_Message_Block
*mb
= result
.message_block ();
1221 //mb.rd_ptr () [0] = '\0';
1222 mb
->rd_ptr (mb
->rd_ptr () - result
.bytes_transferred ());
1224 ACE_DEBUG ((LM_DEBUG
,
1225 ACE_TEXT ("(%t) **** Server %d: handle_write_dgram() ****\n"),
1227 ACE_DEBUG ((LM_DEBUG
,
1228 ACE_TEXT ("%s = %B\n"),
1229 ACE_TEXT ("bytes_to_write"),
1230 result
.bytes_to_write ()));
1231 ACE_DEBUG ((LM_DEBUG
,
1232 ACE_TEXT ("%s = %d\n"),
1233 ACE_TEXT ("handle"),
1235 ACE_DEBUG ((LM_DEBUG
,
1236 ACE_TEXT ("%s = %B\n"),
1237 ACE_TEXT ("bytes_transfered"),
1238 result
.bytes_transferred ()));
1239 ACE_DEBUG ((LM_DEBUG
,
1240 ACE_TEXT ("%s = %@\n"),
1243 ACE_DEBUG ((LM_DEBUG
,
1244 ACE_TEXT ("%s = %d\n"),
1245 ACE_TEXT ("success"),
1246 result
.success ()));
1247 ACE_DEBUG ((LM_DEBUG
,
1248 ACE_TEXT ("%s = %@\n"),
1249 ACE_TEXT ("completion_key"),
1250 result
.completion_key ()));
1251 ACE_DEBUG ((LM_DEBUG
,
1252 ACE_TEXT ("%s = %d\n"),
1255 ACE_DEBUG ((LM_DEBUG
,
1256 ACE_TEXT ("%s = %s\n"),
1257 ACE_TEXT ("message_block"),
1259 ACE_DEBUG ((LM_DEBUG
,
1260 ACE_TEXT ("**** end of message ****************\n")));
1262 else if (result
.error () != 0)
1264 ACE_Log_Priority prio
;
1265 #if defined (ACE_WIN32)
1266 if (result
.error () == ERROR_OPERATION_ABORTED
)
1269 if (result
.error () == ECANCELED
)
1271 #endif /* ACE_WIN32 */
1274 ACE_LOG_MSG
->errnum (result
.error ());
1275 ACE_LOG_MSG
->log (prio
,
1276 ACE_TEXT ("(%t) Server %d; %p\n"),
1278 ACE_TEXT ("write"));
1280 else if (loglevel
> 0)
1282 ACE_DEBUG ((LM_DEBUG
,
1283 ACE_TEXT ("(%t) Server %d: wrote %B bytes ok\n"),
1285 result
.bytes_transferred ()));
1290 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1292 this->total_snd_
+= result
.bytes_transferred ();
1294 if (duplex
== 0 && !this->flg_closing_
)
1295 this->initiate_read ();
1299 if (this->io_count_
> 0)
1305 // *******************************************
1308 // Connector creates the proper number of Clients and initiates
1309 // sessions on them.
1310 // *******************************************
1315 Connector (TestData
*tester
);
1317 int start (const ACE_INET_Addr
&addr
, int num
);
1323 // *************************************************************
1325 Connector::Connector (TestData
*tester
)
1331 Connector::start (const ACE_INET_Addr
& addr
, int num
)
1333 ACE_OS::sleep(3); // Let Master get going
1334 if (num
> MAX_CLIENTS
)
1342 for (; rc
< num
; rc
++)
1344 ACE_SOCK_CODgram sock
;
1345 if (sock
.open (addr
) == -1)
1346 ACE_ERROR_BREAK ((LM_ERROR
,
1347 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1349 ACE_TEXT ("open")));
1351 sock
.get_local_addr (me
);
1352 u_short my_port
= ACE_HTONS (me
.get_port_number ());
1353 ACE_INT32 my_addr
= ACE_HTONL (me
.get_ip_address ());
1354 Session_Data session
;
1355 session
.direction_
= 0; // Start
1356 session
.addr_
= my_addr
;
1357 session
.port_
= my_port
;
1358 if (sock
.send (&session
, sizeof (session
)) == -1)
1359 ACE_ERROR_BREAK ((LM_ERROR
,
1360 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1362 ACE_TEXT ("send")));
1363 if (sock
.recv (&session
, sizeof (session
)) == -1)
1364 ACE_ERROR_BREAK ((LM_ERROR
,
1365 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1367 ACE_TEXT ("recv")));
1368 ACE_INET_Addr server
;
1369 server
.set (session
.port_
, session
.addr_
, 0);
1370 Client
*client
= this->tester_
->client_up ();
1371 ACE_TCHAR me_str
[80], server_str
[80];
1372 me
.addr_to_string (me_str
, 80);
1373 server
.addr_to_string (server_str
, 80);
1374 ACE_DEBUG ((LM_DEBUG
,
1375 ACE_TEXT ("(%t) Client %d setting up local %s, peer %s\n"),
1380 if (sock
.open (server
, me
) == -1)
1381 ACE_ERROR_BREAK ((LM_ERROR
,
1382 ACE_TEXT ("(%t) Re-opening %p\n"),
1383 ACE_TEXT ("client")));
1384 client
->go (sock
.get_handle (), server
);
1385 sock
.set_handle (ACE_INVALID_HANDLE
);
1393 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
1396 Client::Client (TestData
*tester
, int id
)
1401 flg_cancel_ (false),
1409 Client::~Client (void)
1411 ACE_DEBUG ((LM_DEBUG
,
1412 ACE_TEXT ("(%t) Client %d dtor; %d sends (%B bytes); ")
1413 ACE_TEXT ("%d recvs (%B bytes)\n"),
1415 this->total_w_
, this->total_snd_
,
1416 this->total_r_
, this->total_rcv_
));
1417 if (this->io_count_
!= 0)
1418 ACE_ERROR ((LM_WARNING
,
1419 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1423 // This test bounces data back and forth between Clients and Servers.
1424 // Therefore, if there was significantly more data in one direction, that's
1425 // a problem. Remember, the byte counts are unsigned values.
1426 int issue_data_warning
= 0;
1427 if (this->total_snd_
> this->total_rcv_
)
1429 if (this->total_rcv_
== 0)
1430 issue_data_warning
= 1;
1431 else if (this->total_snd_
/ this->total_rcv_
> 2)
1432 issue_data_warning
= 1;
1436 if (this->total_snd_
== 0)
1437 issue_data_warning
= 1;
1438 else if (this->total_rcv_
/ this->total_snd_
> 2)
1439 issue_data_warning
= 1;
1441 if (issue_data_warning
)
1442 ACE_DEBUG ((LM_WARNING
,
1443 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1445 if (this->tester_
!= 0)
1446 this->tester_
->client_done (this);
1449 if (this->handle () != ACE_INVALID_HANDLE
)
1451 ACE_OS::closesocket (this->handle ());
1453 this->handle (ACE_INVALID_HANDLE
);
1459 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1461 this->flg_cancel_
= true;
1462 this->ws_
.cancel ();
1463 this->rs_
.cancel ();
1470 // This must be called with the lock_ held.
1471 ++this->stop_writing_
;
1472 ACE_DEBUG ((LM_DEBUG
,
1473 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1474 this->id_
, this->io_count_
));
1479 Client::go (ACE_HANDLE handle
, const ACE_INET_Addr
&server
)
1482 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1484 this->handle (handle
);
1485 this->server_addr_
.set (server
);
1487 // Open send and receive factories.
1488 if (this->ws_
.open (*this, handle
) == -1)
1489 ACE_ERROR ((LM_ERROR
,
1490 ACE_TEXT ("(%t) Client %d: %p\n"),
1492 ACE_TEXT ("ACE_Asynch_Write_Dgram::open")));
1493 else if (this->rs_
.open (*this, handle
) == -1)
1494 ACE_ERROR ((LM_ERROR
,
1495 ACE_TEXT ("(%t) Client %d: %p\n"),
1497 ACE_TEXT ("ACE_Asynch_Read_Dgram::open")));
1498 else if (this->initiate_write () == 0)
1500 if (duplex
!= 0) // Start an asynchronous read
1501 this->initiate_read ();
1504 if (this->io_count_
> 0)
1511 Client::initiate_write (void)
1513 if (this->flg_cancel_
|| this->handle () == ACE_INVALID_HANDLE
)
1516 // stop_writing_ is set to 1 to say "stop". To avoid repeating the
1517 // close datagram for every echo, only send it once. Sure, there's a risk
1518 // it will get lost, but since this is most often intra-host, don't
1519 // worry about that very small risk.
1520 if (this->stop_writing_
> 0) // Need to tell server to "close"
1522 if (this->stop_writing_
> 1) // Already told server to close
1525 ++this->stop_writing_
;
1526 ACE_DEBUG ((LM_DEBUG
,
1527 ACE_TEXT ("(%t) Client %d requesting close\n"),
1529 ACE_Message_Block
*mb
=
1530 new ACE_Message_Block (ACE_OS::strlen (close_req_msg
) + 1);
1531 mb
->copy (close_req_msg
);
1532 size_t unused
; // Number of bytes sent
1533 if (this->ws_
.send (mb
, unused
, 0, this->server_addr_
) == -1)
1536 ACE_ERROR_RETURN ((LM_ERROR
,
1537 ACE_TEXT ("(%t) Client %d, %p\n"),
1539 ACE_TEXT ("initiating closing send")),
1548 static const size_t complete_message_length
=
1549 ACE_OS::strlen (complete_message
);
1551 #if defined (ACE_WIN32)
1553 ACE_Message_Block
*mb1
= 0,
1557 // No need to allocate +1 for proper printing - the memory includes it already
1558 ACE_NEW_RETURN (mb1
,
1559 ACE_Message_Block ((char *)complete_message
,
1560 complete_message_length
),
1563 ACE_NEW_RETURN (mb2
,
1564 ACE_Message_Block ((char *)complete_message
,
1565 complete_message_length
),
1568 ACE_NEW_RETURN (mb3
,
1569 ACE_Message_Block ((char *)complete_message
,
1570 complete_message_length
),
1573 mb1
->wr_ptr (complete_message_length
);
1574 mb2
->wr_ptr (complete_message_length
);
1575 mb3
->wr_ptr (complete_message_length
);
1577 // chain them together
1581 size_t unused
; // Number of bytes sent
1582 if (this->ws_
.send (mb1
, unused
, 0, this->server_addr_
) == -1)
1585 ACE_ERROR_RETURN((LM_ERROR
,
1586 ACE_TEXT ("(%t) %p\n"),
1587 ACE_TEXT ("Client::ACE_Asynch_Write_Dgram::send")),
1590 #else /* ACE_WIN32 */
1592 ACE_Message_Block
*mb
= 0;
1594 // No need to allocate +1 for proper printing - the memory includes
1597 ACE_Message_Block (complete_message
,
1598 complete_message_length
),
1600 mb
->wr_ptr (complete_message_length
);
1601 size_t unused
; // Number of bytes sent
1602 if (this->ws_
.send (mb
, unused
, 0, this->server_addr_
) == -1)
1605 ACE_ERROR_RETURN((LM_ERROR
,
1606 ACE_TEXT ("(%t) Client %d, %p\n"),
1611 #endif /* ACE_WIN32 */
1619 Client::initiate_read (void)
1621 if (this->flg_cancel_
|| this->handle_
== ACE_INVALID_HANDLE
)
1624 static const size_t complete_message_length
=
1625 ACE_OS::strlen (complete_message
);
1627 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
1628 ACE_Message_Block
*mb1
= 0,
1635 // We allocate +1 only for proper printing - we can just set the last byte
1636 // to '\0' before printing out
1637 ACE_NEW_RETURN (mb1
, ACE_Message_Block (complete_message_length
+ 1), -1);
1638 ACE_NEW_RETURN (mb2
, ACE_Message_Block (complete_message_length
+ 1), -1);
1639 ACE_NEW_RETURN (mb3
, ACE_Message_Block (complete_message_length
+ 1), -1);
1641 // Let allocate memory for one more triplet,
1642 // This improves performance
1643 // as we can receive more the than one block at once
1644 // Generally, we can receive more triplets ....
1645 ACE_NEW_RETURN (mb4
, ACE_Message_Block (complete_message_length
+ 1), -1);
1646 ACE_NEW_RETURN (mb5
, ACE_Message_Block (complete_message_length
+ 1), -1);
1647 ACE_NEW_RETURN (mb6
, ACE_Message_Block (complete_message_length
+ 1), -1);
1657 // hide last byte in each message block, reserving it for later to set '\0'
1658 // for proper printouts
1659 mb1
->size (mb1
->size () - 1);
1660 mb2
->size (mb2
->size () - 1);
1661 mb3
->size (mb3
->size () - 1);
1663 mb4
->size (mb4
->size () - 1);
1664 mb5
->size (mb5
->size () - 1);
1665 mb6
->size (mb6
->size () - 1);
1669 if (this->rs_
.recv (mb1
, unused
, 0) == -1)
1672 ACE_ERROR_RETURN ((LM_ERROR
,
1673 ACE_TEXT ("(%t) %p\n"),
1674 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1677 #else /* ACE_HAS_WIN32_OVERLAPPED_IO */
1679 // Try to read more chunks
1680 size_t blksize
= ( complete_message_length
> BUFSIZ
) ?
1681 complete_message_length
: BUFSIZ
;
1683 ACE_Message_Block
*mb
= 0;
1685 // We allocate +1 only for proper printing - we can just set the last byte
1686 // to '\0' before printing out
1688 ACE_Message_Block (blksize
+ 1),
1693 if (this->rs_
.recv (mb
, unused
, 0) == -1)
1696 ACE_ERROR_RETURN ((LM_ERROR
,
1697 ACE_TEXT ("(%t) Client %d, %p\n"),
1702 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
1710 Client::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result
&result
)
1713 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1715 ACE_Message_Block
*mb
= result
.message_block ();
1721 ACE_DEBUG ((LM_DEBUG
,
1722 ACE_TEXT ("(%t) **** Client %d: handle_write_dgram() ****\n"),
1724 ACE_DEBUG ((LM_DEBUG
,
1725 ACE_TEXT ("%s = %B\n"),
1726 ACE_TEXT ("bytes_to_write"),
1727 result
.bytes_to_write ()));
1728 ACE_DEBUG ((LM_DEBUG
,
1729 ACE_TEXT ("%s = %d\n"),
1730 ACE_TEXT ("handle"),
1732 ACE_DEBUG ((LM_DEBUG
,
1733 ACE_TEXT ("%s = %B\n"),
1734 ACE_TEXT ("bytes_transfered"),
1735 result
.bytes_transferred ()));
1736 ACE_DEBUG ((LM_DEBUG
,
1737 ACE_TEXT ("%s = %@\n"),
1740 ACE_DEBUG ((LM_DEBUG
,
1741 ACE_TEXT ("%s = %d\n"),
1742 ACE_TEXT ("success"),
1743 result
.success ()));
1744 ACE_DEBUG ((LM_DEBUG
,
1745 ACE_TEXT ("%s = %@\n"),
1746 ACE_TEXT ("completion_key"),
1747 result
.completion_key ()));
1748 ACE_DEBUG ((LM_DEBUG
,
1749 ACE_TEXT ("%s = %d\n"),
1753 #if defined (ACE_WIN32)
1754 size_t bytes_transferred
= result
.bytes_transferred ();
1756 for (ACE_Message_Block
* mb_i
= mb
;
1757 (mb_i
!= 0) && (bytes_transferred
> 0);
1758 mb_i
= mb_i
->cont ())
1760 // write 0 at string end for proper printout (if end of mb,
1762 mb_i
->rd_ptr()[0] = '\0';
1764 size_t len
= mb_i
->rd_ptr () - mb_i
->base ();
1766 // move rd_ptr backwards as required for printout
1767 if (len
>= bytes_transferred
)
1769 mb_i
->rd_ptr (0 - bytes_transferred
);
1770 bytes_transferred
= 0;
1774 mb_i
->rd_ptr (0 - len
);
1775 bytes_transferred
-= len
;
1779 ACE_DEBUG ((LM_DEBUG
,
1780 ACE_TEXT ("%s%d = %s\n"),
1781 ACE_TEXT ("message_block, part "),
1785 #else /* ACE_WIN32 */
1786 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1787 mb
->rd_ptr()[0] = '\0';
1788 // move rd_ptr backwards as required for printout
1789 mb
->rd_ptr (- result
.bytes_transferred ());
1790 ACE_DEBUG ((LM_DEBUG
,
1791 ACE_TEXT ("%s = %s\n"),
1792 ACE_TEXT ("message_block"),
1794 #endif /* ACE_WIN32 */
1796 ACE_DEBUG ((LM_DEBUG
,
1797 ACE_TEXT ("**** end of message ****************\n")));
1799 else if (result
.error () != 0)
1801 ACE_Log_Priority prio
;
1802 #if defined (ACE_WIN32)
1803 if (result
.error () == ERROR_OPERATION_ABORTED
)
1806 if (result
.error () == ECANCELED
)
1808 #endif /* ACE_WIN32 */
1811 ACE_LOG_MSG
->errnum (result
.error ());
1812 ACE_LOG_MSG
->log (prio
,
1813 ACE_TEXT ("(%t) Client %d; %p\n"),
1815 ACE_TEXT ("write"));
1817 else if (loglevel
> 0)
1819 ACE_DEBUG ((LM_DEBUG
,
1820 ACE_TEXT ("(%t) Client %d: wrote %B bytes ok\n"),
1822 result
.bytes_transferred ()));
1827 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1829 this->total_snd_
+= result
.bytes_transferred ();
1830 if (this->total_snd_
>= xfer_limit
)
1832 ACE_DEBUG ((LM_DEBUG
,
1833 ACE_TEXT ("(%t) Client %d sent %B, limit %B\n"),
1834 this->id_
, this->total_snd_
, xfer_limit
));
1837 if (duplex
!= 0) // full duplex, continue write
1839 if ((this->total_snd_
- this->total_rcv_
) < 1024*32 ) //flow control
1840 this->initiate_write ();
1842 else // half-duplex read reply, after read we will start write
1843 this->initiate_read ();
1847 if (this->io_count_
> 0)
1854 Client::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
)
1857 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1859 ACE_Message_Block
*mb
= result
.message_block ();
1865 ACE_DEBUG ((LM_DEBUG
,
1866 ACE_TEXT ("(%t) **** Client %d: handle_read_dgram() ****\n"),
1868 ACE_DEBUG ((LM_DEBUG
,
1869 ACE_TEXT ("%s = %B\n"),
1870 ACE_TEXT ("bytes_to_read"),
1871 result
.bytes_to_read ()));
1872 ACE_DEBUG ((LM_DEBUG
,
1873 ACE_TEXT ("%s = %d\n"),
1874 ACE_TEXT ("handle"),
1876 ACE_DEBUG ((LM_DEBUG
,
1877 ACE_TEXT ("%s = %B\n"),
1878 ACE_TEXT ("bytes_transfered"),
1879 result
.bytes_transferred ()));
1880 ACE_DEBUG ((LM_DEBUG
,
1881 ACE_TEXT ("%s = %@\n"),
1884 ACE_DEBUG ((LM_DEBUG
,
1885 ACE_TEXT ("%s = %d\n"),
1886 ACE_TEXT ("success"),
1887 result
.success ()));
1888 ACE_DEBUG ((LM_DEBUG
,
1889 ACE_TEXT ("%s = %@\n"),
1890 ACE_TEXT ("completion_key"),
1891 result
.completion_key ()));
1892 ACE_DEBUG ((LM_DEBUG
,
1893 ACE_TEXT ("%s = %d\n"),
1897 #if defined (ACE_WIN32)
1899 for (ACE_Message_Block
* mb_i
= mb
;
1901 mb_i
= mb_i
->cont ())
1904 // write 0 at string end for proper printout
1905 mb_i
->wr_ptr()[0] = '\0';
1907 ACE_DEBUG ((LM_DEBUG
,
1908 ACE_TEXT ("%s%d = %s\n"),
1909 ACE_TEXT ("message_block, part "),
1913 #else /* ACE_WIN32 */
1914 // write 0 at string end for proper printout
1915 mb
->rd_ptr()[result
.bytes_transferred ()] = '\0'; // for proper printout
1916 ACE_DEBUG ((LM_DEBUG
,
1917 ACE_TEXT ("%s = %s\n"),
1918 ACE_TEXT ("message_block"),
1920 #endif /* ACE_WIN32 */
1922 ACE_DEBUG ((LM_DEBUG
,
1923 ACE_TEXT ("**** end of message ****************\n")));
1925 else if (result
.error () != 0)
1927 ACE_Log_Priority prio
;
1928 #if defined (ACE_WIN32)
1929 if (result
.error () == ERROR_OPERATION_ABORTED
)
1932 if (result
.error () == ECANCELED
)
1934 #endif /* ACE_WIN32 */
1937 ACE_Log_Msg::instance ()->errnum (result
.error ());
1938 ACE_Log_Msg::instance ()->log (prio
,
1939 ACE_TEXT ("(%t) Client %d; %p\n"),
1943 else if (loglevel
> 0)
1945 ACE_DEBUG ((LM_DEBUG
,
1946 ACE_TEXT ("(%t) Client %d: read %B bytes ok\n"),
1948 result
.bytes_transferred ()));
1951 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1953 this->total_rcv_
+= result
.bytes_transferred ();
1955 // If we've closed and the server acked, we're done.
1956 if (this->stop_writing_
&&
1957 ACE_OS::strcmp (mb
->rd_ptr (), close_ack_msg
) == 0)
1959 ACE_DEBUG ((LM_DEBUG
,
1960 ACE_TEXT ("(%t) Client %d recvd close-ack\n"),
1966 this->initiate_read ();
1967 else // half-duplex write, after write we will start read
1968 this->initiate_write ();
1974 if (this->io_count_
> 0)
1980 // *************************************************************
1981 // Configuration helpers
1982 // *************************************************************
1984 print_usage (int /* argc */, ACE_TCHAR
*argv
[])
1988 ACE_TEXT ("\nusage: %s")
1989 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1990 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1991 ACE_TEXT ("\n a AIOCB")
1992 ACE_TEXT ("\n i SIG")
1993 ACE_TEXT ("\n c CB")
1994 ACE_TEXT ("\n s SUN")
1995 ACE_TEXT ("\n d default")
1996 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1997 ACE_TEXT ("\n-h <host> for Client mode")
1998 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1999 ACE_TEXT ("\n-p <port to listen/connect>")
2000 ACE_TEXT ("\n-c <number of client instances>")
2001 ACE_TEXT ("\n-b run client and server at the same time")
2002 ACE_TEXT ("\n f file")
2003 ACE_TEXT ("\n c console")
2004 ACE_TEXT ("\n-v log level")
2005 ACE_TEXT ("\n 0 - log errors and highlights")
2006 ACE_TEXT ("\n 1 - log level 0 plus progress information")
2007 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
2008 ACE_TEXT ("\n-x max transfer byte count per Client")
2009 ACE_TEXT ("\n-u show this message")
2017 set_proactor_type (const ACE_TCHAR
*ptype
)
2022 switch (ACE_OS::ace_toupper (*ptype
))
2025 proactor_type
= DEFAULT
;
2028 proactor_type
= AIOCB
;
2031 proactor_type
= SIG
;
2035 proactor_type
= SUN
;
2038 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
2042 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
2050 parse_args (int argc
, ACE_TCHAR
*argv
[])
2052 // First, set up all the defaults then let any args change them.
2053 both
= 1; // client and server simultaneosly
2054 duplex
= 1; // full duplex is on
2055 host
= ACE_LOCALHOST
; // server to connect
2056 port
= ACE_DEFAULT_SERVER_PORT
; // port to connect/listen
2057 max_aio_operations
= 512; // POSIX Proactor params
2058 proactor_type
= DEFAULT
; // Proactor type = default
2059 threads
= 3; // size of Proactor thread pool
2060 clients
= 10; // number of clients
2061 loglevel
= 0; // log level : only errors and highlights
2062 // Default transfer limit 50 messages per Sender
2063 xfer_limit
= 50 * ACE_OS::strlen (complete_message
);
2065 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
2066 # if defined (ACE_LINUX)
2070 if (argc
== 1) // no arguments , so one button test
2073 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
2076 while ((c
= get_opt ()) != EOF
)
2080 case 'x': // xfer limit
2081 xfer_limit
= static_cast<size_t> (ACE_OS::atoi (get_opt
.opt_arg ()));
2082 if (xfer_limit
== 0)
2083 xfer_limit
= 1; // Bare minimum.
2085 case 'b': // both client and server
2088 case 'v': // log level
2089 loglevel
= ACE_OS::atoi (get_opt
.opt_arg ());
2092 duplex
= ACE_OS::atoi (get_opt
.opt_arg ());
2094 case 'h': // host for sender
2095 host
= get_opt
.opt_arg ();
2097 case 'p': // port number
2098 port
= ACE_OS::atoi (get_opt
.opt_arg ());
2100 case 'n': // thread pool size
2101 threads
= ACE_OS::atoi (get_opt
.opt_arg ());
2103 case 'c': // number of clients
2104 clients
= ACE_OS::atoi (get_opt
.opt_arg ());
2105 if (clients
> MAX_CLIENTS
)
2106 clients
= MAX_CLIENTS
;
2108 case 'o': // max number of aio for proactor
2109 max_aio_operations
= ACE_OS::atoi (get_opt
.opt_arg ());
2111 case 't': // Proactor Type
2112 if (set_proactor_type (get_opt
.opt_arg ()))
2114 return print_usage (argc
, argv
);
2117 return print_usage (argc
, argv
);
2121 if (proactor_type
== SUN
&& threads
> 1)
2123 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Sun aiowait is not thread-safe; ")
2124 ACE_TEXT ("changing to 1 thread\n")));
2132 run_main (int argc
, ACE_TCHAR
*argv
[])
2134 ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
2136 if (::parse_args (argc
, argv
) == -1)
2139 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
2140 disable_signal (SIGPIPE
, SIGPIPE
);
2145 if (task1
.start (threads
, proactor_type
, max_aio_operations
) == 0)
2147 // NOTE - there's no real reason this test is limited to IPv4 other
2148 // than the way Session_Data is set up - to expand this test to work
2149 // on IPv6 as well as IPv4, you need to do some work on passing the
2150 // Session_Data address differently.
2151 ACE_INET_Addr
addr (port
, ACE_LOCALHOST
, AF_INET
);
2152 Master
master (&test
, addr
, clients
);
2153 Connector
connector (&test
);
2156 if (both
!= 0 || host
== 0) // Acceptor
2158 // Already running; if not needed will be deleted soon.
2162 if (both
!= 0 || host
!= 0)
2165 host
= ACE_LOCALHOST
;
2167 if (addr
.set (port
, host
, 1, addr
.get_type ()) == -1)
2168 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), host
));
2170 rc
+= connector
.start (addr
, clients
);
2173 // Wait a few seconds to let things get going, then poll til
2174 // all sessions are done. Note that when we exit this scope, the
2175 // Acceptor and Connector will be destroyed, which should prevent
2176 // further connections and also test how well destroyed handlers
2180 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
2181 while (!test
.testing_done ())
2186 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
2197 run_main (int, ACE_TCHAR
*[])
2199 ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
2201 ACE_DEBUG ((LM_INFO
,
2202 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
2203 ACE_TEXT ("Proactor_UDP_Test will not be run.\n")));
2210 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */