1 // ============================================================================
3 * @file Proactor_UDP_Test.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that uses UDP/IP communications.
8 * @author Steve Huston <shuston@riverace.com>, based on Proactor_Test.cpp
10 // ============================================================================
12 #include "test_config.h"
14 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
15 // This only works on Win32 platforms and on Unix platforms
16 // supporting POSIX aio calls.
18 #include "ace/Signal.h"
20 #include "ace/Service_Config.h"
21 #include "ace/INET_Addr.h"
22 #include "ace/SOCK_CODgram.h"
23 #include "ace/SOCK_Dgram.h"
24 #include "ace/Object_Manager.h"
25 #include "ace/Get_Opt.h"
27 #include "ace/Proactor.h"
29 #include "ace/Thread_Semaphore.h"
30 #include "ace/OS_NS_ctype.h"
31 #include "ace/OS_NS_errno.h"
32 #include "ace/OS_NS_signal.h"
33 #include "ace/OS_NS_string.h"
34 #include "ace/OS_NS_unistd.h"
35 #include "ace/OS_NS_sys_socket.h"
36 #include "ace/Sock_Connect.h"
37 #include "ace/os_include/netinet/os_tcp.h"
39 #include "ace/Atomic_Op.h"
40 #include "ace/Synch_Traits.h"
42 #if defined (ACE_WIN32)
44 # include "ace/WIN32_Proactor.h"
46 #elif defined (ACE_HAS_AIO_CALLS)
48 # include "ace/POSIX_Proactor.h"
49 # include "ace/POSIX_CB_Proactor.h"
51 #endif /* ACE_WIN32 */
53 // Proactor Type (UNIX only, Win32 ignored)
54 using ProactorType
= enum { DEFAULT
= 0, AIOCB
, SIG
, CB
};
55 static ProactorType proactor_type
= DEFAULT
;
57 // POSIX : > 0 max number aio operations proactor,
58 static size_t max_aio_operations
= 0;
60 // both: 0 run client or server / depends on host
61 // != 0 run client and server
64 // Host that we're connecting to.
65 static const ACE_TCHAR
*host
= 0;
67 // number of Client instances
68 static int clients
= 1;
69 const int MAX_CLIENTS
= 1000;
70 const int MAX_SERVERS
= 1000;
72 // duplex mode: == 0 half-duplex
74 static int duplex
= 0;
76 // number threads in the Proactor thread pool
77 static int threads
= 1;
79 // Port that we're receiving session initiations on.
80 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
83 static int loglevel
; // 0 full , 1 only errors
85 static size_t xfer_limit
; // Number of bytes for Client to send.
87 static char complete_message
[] =
90 "Accept-Language: C++\r\n"
91 "Accept-Encoding: gzip, deflate\r\n"
92 "User-Agent: Proactor_Test/1.0 (non-compatible)\r\n"
93 "Connection: Keep-Alive\r\n"
96 static char close_req_msg
[] = "CLOSE";
97 static char close_ack_msg
[] = "CLOSE-ACK";
102 LogLocker () { ACE_LOG_MSG
->acquire (); }
103 virtual ~LogLocker () { ACE_LOG_MSG
->release (); }
106 // Function to remove signals from the signal mask.
108 disable_signal (int sigmin
, int sigmax
)
110 #if !defined (ACE_LACKS_UNIX_SIGNALS)
112 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
113 ACE_ERROR ((LM_ERROR
,
114 ACE_TEXT ("Error: (%P|%t):%p\n"),
115 ACE_TEXT ("sigemptyset failed")));
117 for (int i
= sigmin
; i
<= sigmax
; i
++)
118 ACE_OS::sigaddset (&signal_set
, i
);
120 // Put the <signal_set>.
121 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
122 // In multi-threaded application this is not POSIX compliant
123 // but let's leave it just in case.
124 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
126 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
127 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
128 ACE_ERROR_RETURN ((LM_ERROR
,
129 ACE_TEXT ("Error: (%P|%t): %p\n"),
130 ACE_TEXT ("SIG_BLOCK failed")),
133 ACE_UNUSED_ARG (sigmin
);
134 ACE_UNUSED_ARG (sigmax
);
135 #endif /* ACE_LACKS_UNIX_SIGNALS */
140 // *************************************************************
141 // MyTask is ACE_Task resposible for :
142 // 1. creation and deletion of
143 // Proactor and Proactor thread pool
144 // 2. running Proactor event loop
145 // *************************************************************
150 * MyTask plays role for Proactor threads pool
152 * MyTask is ACE_Task resposible for:
153 * 1. Creation and deletion of Proactor and Proactor thread pool
154 * 2. Running Proactor event loop
156 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
161 sem_ ((unsigned int) 0),
166 (void) this->stop ();
167 (void) this->delete_proactor();
172 int start (int num_threads
,
173 ProactorType type_proactor
,
178 int create_proactor (ProactorType type_proactor
,
180 int delete_proactor ();
182 ACE_SYNCH_RECURSIVE_MUTEX lock_
;
183 ACE_Thread_Semaphore sem_
;
184 ACE_Proactor
* proactor_
;
188 MyTask::create_proactor (ProactorType type_proactor
, size_t max_op
)
190 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
195 ACE_TEST_ASSERT (this->proactor_
== 0);
197 #if defined (ACE_WIN32)
199 ACE_UNUSED_ARG (type_proactor
);
200 ACE_UNUSED_ARG (max_op
);
202 ACE_WIN32_Proactor
*proactor_impl
= 0;
204 ACE_NEW_RETURN (proactor_impl
,
208 ACE_DEBUG ((LM_DEBUG
,
209 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
211 #elif defined (ACE_HAS_AIO_CALLS)
213 ACE_POSIX_Proactor
* proactor_impl
= 0;
215 switch (type_proactor
)
218 ACE_NEW_RETURN (proactor_impl
,
219 ACE_POSIX_AIOCB_Proactor (max_op
),
221 ACE_DEBUG ((LM_DEBUG
,
222 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
225 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
227 ACE_NEW_RETURN (proactor_impl
,
228 ACE_POSIX_SIG_Proactor (max_op
),
230 ACE_DEBUG ((LM_DEBUG
,
231 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
233 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
235 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
237 ACE_NEW_RETURN (proactor_impl
,
238 ACE_POSIX_CB_Proactor (max_op
),
240 ACE_DEBUG ((LM_DEBUG
,
241 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
243 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
246 ACE_DEBUG ((LM_DEBUG
,
247 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
251 #endif /* ACE_WIN32 */
253 // always delete implementation 1 , not !(proactor_impl == 0)
254 ACE_NEW_RETURN (this->proactor_
,
255 ACE_Proactor (proactor_impl
, 1 ),
257 // Set new singleton and delete it in close_singleton()
258 ACE_Proactor::instance (this->proactor_
, 1);
263 MyTask::delete_proactor ()
265 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
270 ACE_DEBUG ((LM_DEBUG
,
271 ACE_TEXT ("(%t) Delete Proactor\n")));
273 ACE_Proactor::close_singleton ();
280 MyTask::start (int num_threads
,
281 ProactorType type_proactor
,
284 if (this->create_proactor (type_proactor
, max_op
) == -1)
285 ACE_ERROR_RETURN ((LM_ERROR
,
287 ACE_TEXT ("unable to create proactor")),
290 if (this->activate (THR_NEW_LWP
, num_threads
) == -1)
291 ACE_ERROR_RETURN ((LM_ERROR
,
293 ACE_TEXT ("unable to activate thread pool")),
296 for (; num_threads
> 0; num_threads
--)
308 if (this->proactor_
!= 0)
310 ACE_DEBUG ((LM_DEBUG
,
311 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
313 this->proactor_
->proactor_end_event_loop ();
316 if (this->wait () == -1)
317 ACE_ERROR ((LM_ERROR
,
319 ACE_TEXT ("unable to stop thread pool")));
327 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask started\n")));
329 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
330 disable_signal (SIGPIPE
, SIGPIPE
);
332 // signal that we are ready
335 this->proactor_
->proactor_run_event_loop ();
337 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask finished\n")));
341 // forward declaration
344 // "Server" is one side of a session. It's the same idea as in TCP, but
345 // there's no acceptor in UDP; sessions are started by the client sending
346 // a "start" datagram to a well-known UDP port. The start datagram tells
347 // which port number the client is receiving on. The server then sends an
348 // "ack" datagram to indicate the session is set up successfully and to say
349 // which port the server is listening on. Thus, a unique pairing of server
350 // and client port numbers is established. Each session will require a
351 // separate server-side socket as well as the client. Note that experienced
352 // UDP programmers will be quivering at this point knowing that there's no
353 // reason to have multiple server-side sockets, and no real reason to
354 // pre-register the client ports either since all the addressing info is
355 // available on normal UDP programming. However, this is all necessary in
356 // the POSIX case since the POSIX aio functions were not designed with UDP
357 // in mind, and the addressing information is not available in UDP receive
358 // completion callbacks; thus, each socket needs to be fully connected before
359 // running session data. The addressing information needed to run this
360 // use-case in the "normal" way is available on Windows, but this test runs
361 // across many platforms, so can't rely on that information.
362 class Server
: public ACE_Handler
366 Server (TestData
*tester
, int id
);
369 int id () { return this->id_
; }
370 size_t get_total_snd () { return this->total_snd_
; }
371 size_t get_total_rcv () { return this->total_rcv_
; }
372 long get_total_w () { return this->total_w_
; }
373 long get_total_r () { return this->total_r_
; }
375 /// This is called after the new session has been established.
376 void go (ACE_HANDLE handle
, const ACE_INET_Addr
&client
);
382 * @name AIO callback handling
384 * These methods are called by the framework
386 /// This is called when asynchronous <read> operation from the
387 /// socket completes.
388 void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
) override
;
390 /// This is called when an asynchronous <write> to the socket
392 void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result
&result
) override
;
395 int initiate_read ();
396 int initiate_write (ACE_Message_Block
*mb
, size_t nbytes
);
401 ACE_INET_Addr client_addr_
;
402 ACE_Asynch_Read_Dgram rs_
;
403 ACE_Asynch_Write_Dgram ws_
;
404 ACE_SYNCH_MUTEX lock_
;
406 int io_count_
; // Number of currently outstanding I/O requests
409 size_t total_snd_
; // Number of bytes successfully sent
410 size_t total_rcv_
; // Number of bytes successfully received
411 int total_w_
; // Number of write operations
412 int total_r_
; // Number of read operations
415 // *******************************************
417 // *******************************************
419 class Client
: public ACE_Handler
423 Client (TestData
*tester
, int id
);
426 void go (ACE_HANDLE h
, const ACE_INET_Addr
&server
);
427 int id () { return this->id_
; }
428 size_t get_total_snd () { return this->total_snd_
; }
429 size_t get_total_rcv () { return this->total_rcv_
; }
430 int get_total_w () { return this->total_w_
; }
431 int get_total_r () { return this->total_r_
; }
433 // This is called when asynchronous reads from the socket complete
434 void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
) override
;
436 // This is called when asynchronous writes from the socket complete
437 void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result
&result
) override
;
442 int initiate_read ();
443 int initiate_write ();
444 // FUZZ: disable check_for_lack_ACE_OS
446 // FUZZ: enable check_for_lack_ACE_OS
451 ACE_INET_Addr server_addr_
;
452 ACE_Asynch_Read_Dgram rs_
;
453 ACE_Asynch_Write_Dgram ws_
;
455 ACE_SYNCH_MUTEX lock_
;
458 int stop_writing_
; // Writes are shut down; just read.
466 // TestData collects and reports on test-related transfer and connection
472 bool testing_done ();
473 Server
*server_up ();
474 Client
*client_up ();
475 void server_done (Server
*s
);
476 void client_done (Client
*c
);
483 // Track number of sessions that report start, and those that report
484 // their end (and stats).
485 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_up_
;
486 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_down_
;
488 // Total read and write bytes for all sessions.
489 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_cnt_
;
490 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_cnt_
;
491 // Total read and write operations issues for all sessions.
492 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_ops_
;
493 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_ops_
;
494 } servers_
, clients_
;
496 ACE_SYNCH_MUTEX list_lock_
;
497 Server
*server_list_
[MAX_SERVERS
];
498 Client
*client_list_
[MAX_CLIENTS
];
501 TestData::TestData ()
504 for (i
= 0; i
< MAX_SERVERS
; ++i
)
505 this->server_list_
[i
] = 0;
506 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
507 this->client_list_
[i
] = 0;
511 TestData::testing_done ()
513 int svr_up
= this->servers_
.sessions_up_
.value ();
514 int svr_dn
= this->servers_
.sessions_down_
.value ();
515 int clt_up
= this->clients_
.sessions_up_
.value ();
516 int clt_dn
= this->clients_
.sessions_down_
.value ();
518 if (svr_up
== 0 && clt_up
== 0) // No connections up yet
521 return (svr_dn
>= svr_up
&& clt_dn
>= clt_up
);
525 TestData::server_up ()
527 ++this->servers_
.sessions_up_
;
528 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
530 for (int i
= 0; i
< MAX_SERVERS
; ++i
)
532 if (this->server_list_
[i
] == 0)
534 ACE_NEW_RETURN (this->server_list_
[i
], Server (this, i
), 0);
535 ACE_DEBUG ((LM_DEBUG
,
536 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
538 this->servers_
.sessions_up_
.value (),
539 this->servers_
.sessions_down_
.value ()));
540 return this->server_list_
[i
];
547 TestData::client_up ()
549 ++this->clients_
.sessions_up_
;
550 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
552 for (int i
= 0; i
< MAX_CLIENTS
; ++i
)
554 if (this->client_list_
[i
] == 0)
556 ACE_NEW_RETURN (this->client_list_
[i
], Client (this, i
), 0);
557 ACE_DEBUG ((LM_DEBUG
,
558 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
560 this->clients_
.sessions_up_
.value (),
561 this->clients_
.sessions_down_
.value ()));
562 return this->client_list_
[i
];
569 TestData::server_done (Server
*s
)
571 this->servers_
.w_cnt_
+= s
->get_total_snd ();
572 this->servers_
.r_cnt_
+= s
->get_total_rcv ();
573 this->servers_
.w_ops_
+= s
->get_total_w ();
574 this->servers_
.r_ops_
+= s
->get_total_r ();
575 ++this->servers_
.sessions_down_
;
576 ACE_DEBUG ((LM_DEBUG
,
577 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
579 this->servers_
.sessions_up_
.value (),
580 this->servers_
.sessions_down_
.value ()));
582 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
584 for (i
= 0; i
< MAX_SERVERS
; ++i
)
586 if (this->server_list_
[i
] == s
)
589 ACE_ERROR ((LM_ERROR
,
590 ACE_TEXT ("Server %d is pos %d in list\n"),
593 this->server_list_
[i
] = 0;
597 if (i
>= MAX_SERVERS
)
598 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Server %@ done but not listed\n"), s
));
604 TestData::client_done (Client
*c
)
606 this->clients_
.w_cnt_
+= c
->get_total_snd ();
607 this->clients_
.r_cnt_
+= c
->get_total_rcv ();
608 this->clients_
.w_ops_
+= c
->get_total_w ();
609 this->clients_
.r_ops_
+= c
->get_total_r ();
610 ++this->clients_
.sessions_down_
;
611 ACE_DEBUG ((LM_DEBUG
,
612 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
614 this->clients_
.sessions_up_
.value (),
615 this->clients_
.sessions_down_
.value ()));
617 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
619 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
621 if (this->client_list_
[i
] == c
)
624 ACE_ERROR ((LM_ERROR
,
625 ACE_TEXT ("Client %d is pos %d in list\n"),
628 this->client_list_
[i
] = 0;
632 if (i
>= MAX_CLIENTS
)
633 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Client %@ done but not listed\n"), c
));
639 TestData::stop_all ()
643 // Lock and cancel everything. Then release the lock, possibly allowing
644 // cleanups, then grab it again and delete all Servers and Clients.
646 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
647 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
649 if (this->client_list_
[i
] != 0)
650 this->client_list_
[i
]->cancel ();
653 for (i
= 0; i
< MAX_SERVERS
; ++i
)
655 if (this->server_list_
[i
] != 0)
656 this->server_list_
[i
]->cancel ();
660 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
661 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
663 if (this->client_list_
[i
] != 0)
664 delete this->client_list_
[i
];
667 for (i
= 0; i
< MAX_SERVERS
; ++i
)
669 if (this->server_list_
[i
] != 0)
670 delete this->server_list_
[i
];
679 ACE_TCHAR bufs
[256];
680 ACE_TCHAR bufr
[256];
682 ACE_OS::snprintf (bufs
, 256,
683 ACE_SIZE_T_FORMAT_SPECIFIER
684 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
685 this->clients_
.w_cnt_
.value (),
686 this->clients_
.w_ops_
.value ());
688 ACE_OS::snprintf (bufr
, 256,
689 ACE_SIZE_T_FORMAT_SPECIFIER
690 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
691 this->clients_
.r_cnt_
.value (),
692 this->clients_
.r_ops_
.value ());
694 ACE_DEBUG ((LM_DEBUG
,
695 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
699 ACE_OS::snprintf (bufs
, 256,
700 ACE_SIZE_T_FORMAT_SPECIFIER
701 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
702 this->servers_
.w_cnt_
.value (),
703 this->servers_
.w_ops_
.value ());
705 ACE_OS::snprintf (bufr
, 256,
706 ACE_SIZE_T_FORMAT_SPECIFIER
707 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
708 this->servers_
.r_cnt_
.value (),
709 this->servers_
.r_ops_
.value ());
711 ACE_DEBUG ((LM_DEBUG
,
712 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
716 if (this->clients_
.w_cnt_
.value () == 0 ||
717 this->clients_
.r_cnt_
.value () == 0 ||
718 this->servers_
.w_cnt_
.value () == 0 ||
719 this->servers_
.r_cnt_
.value () == 0 )
720 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("It appears that this test didn't ")
721 ACE_TEXT ("really do anything. Something is very wrong.\n")));
724 // Session set-up struct.
727 ACE_INT32 direction_
; // 0 == Start, 1 == Ack
728 ACE_INT32 addr_
; // Network byte order, must be IPv4
729 ACE_UINT16 port_
; // UDP port, network byte order
730 Session_Data() { ACE_OS::memset (this, 0, sizeof(*this)); }
733 // Master is the server-side receiver of session establishment requests.
734 // For each "start" dgram received, instantiates a new Server object,
735 // indicating the addressing info for the client.
736 // Master is initialized with a count of the number of expected sessions. After
737 // this number are set up, Master will stop listening for session requests.
738 // This is a bit fragile but is necessary because on HP-UX, et al., it
739 // is impossible to close/cancel a socket with an outstanding UDP receive
740 // So, this bit of messiness is necessary for portability.
741 // When the Master is destroyed, it will try to stop establishing sessions
742 // but this will only work on Windows.
743 class Master
: public ACE_Handler
746 Master (TestData
*tester
, const ACE_INET_Addr
&recv_addr
, int expected
);
749 // Called when dgram receive operation completes.
750 void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
) override
;
756 ACE_INET_Addr recv_addr_
;
757 ACE_SOCK_Dgram sock_
;
758 ACE_Asynch_Read_Dgram rd_
;
759 ACE_Message_Block
*mb_
;
760 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_expected_
;
761 volatile bool recv_in_progress_
;
764 // *************************************************************
765 Master::Master (TestData
*tester
, const ACE_INET_Addr
&recv_addr
, int expected
)
767 recv_addr_ (recv_addr
),
769 sessions_expected_ (expected
),
770 recv_in_progress_ (false)
772 if (this->sock_
.open (recv_addr
) == -1)
773 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Master socket %p\n"), ACE_TEXT ("open")));
776 if (this->rd_
.open (*this, this->sock_
.get_handle ()) == -1)
777 ACE_ERROR ((LM_ERROR
,
778 ACE_TEXT ("Master reader %p\n"),
780 this->mb_
= new ACE_Message_Block (sizeof (Session_Data
));
787 if (this->recv_in_progress_
)
789 this->sock_
.close ();
793 this->mb_
->release ();
799 Master::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
)
801 // We should only receive Start datagrams with valid addresses to reply to.
802 if (result
.success ())
804 if (result
.bytes_transferred () != sizeof (Session_Data
))
805 ACE_ERROR ((LM_ERROR
,
806 ACE_TEXT ("(%t) Master session data expected %B bytes; ")
807 ACE_TEXT ("received %B\n"),
808 sizeof (Session_Data
),
809 result
.bytes_transferred ()));
812 ACE_Message_Block
*mb
= result
.message_block ();
813 Session_Data
*session
=
814 reinterpret_cast<Session_Data
*>(mb
->rd_ptr ());
815 if (session
->direction_
== 0)
817 ACE_INET_Addr client_addr
, me_addr
;
818 ACE_TCHAR client_str
[80], me_str
[80];
819 client_addr
.set ((u_short
)session
->port_
, session
->addr_
, 0);
820 client_addr
.addr_to_string (client_str
, 80);
822 // Set up the local and remote addresses - need fully-specified
823 // addresses to use UDP aio on Linux. This is the socket that
824 // the session will run over. The addressing info to be sent
825 // back to the Client will be sent over the receive socket
826 // to ensure it goes back to the client initiating the session.
827 ACE_SOCK_CODgram sock
;
828 if (sock
.open (client_addr
) == -1)
830 ACE_ERROR ((LM_ERROR
,
831 ACE_TEXT ("(%t) Master new socket for ")
832 ACE_TEXT ("client %s: %p\n"),
838 sock
.get_local_addr (me_addr
);
839 me_addr
.addr_to_string (me_str
, 80);
840 ACE_DEBUG ((LM_DEBUG
,
841 ACE_TEXT ("(%t) Master setting up server for ")
842 ACE_TEXT ("local %s, peer %s\n"),
846 Session_Data session
;
847 session
.direction_
= 1; // Ack
848 session
.addr_
= ACE_HTONL (me_addr
.get_ip_address ());
849 session
.port_
= ACE_HTONS (me_addr
.get_port_number ());
850 if (this->sock_
.send (&session
,
854 ACE_ERROR ((LM_ERROR
,
855 ACE_TEXT ("(%t) Master reply %p\n"),
861 Server
*server
= this->tester_
->server_up ();
862 server
->go (sock
.get_handle (), client_addr
);
865 if (--this->sessions_expected_
== 0)
867 ACE_DEBUG ((LM_DEBUG
,
868 ACE_TEXT ("All expected sessions are up\n")));
873 ACE_ERROR ((LM_ERROR
,
874 ACE_TEXT ("(%t) Badly formed Session request\n")));
880 ACE_Log_Priority prio
= LM_ERROR
;
881 #if defined (ACE_WIN32)
882 if (result
.error () == ERROR_OPERATION_ABORTED
)
885 if (result
.error () == ECANCELED
)
887 #endif /* ACE_WIN32 */
888 // Multiple steps to log the error without squashing errno.
889 ACE_LOG_MSG
->conditional_set (__FILE__
,
892 (int)(result
.error ()));
893 ACE_LOG_MSG
->log (prio
,
894 ACE_TEXT ("(%t) Master %p\n"),
896 // If canceled, don't try to restart.
897 if (prio
== LM_DEBUG
)
904 Master::start_recv ()
911 if (this->rd_
.recv (this->mb_
, unused
, 0) == -1)
912 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) Master %p\n"), ACE_TEXT ("recv")));
914 this->recv_in_progress_
= true;
917 // ***************************************************
920 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
923 Server::Server (TestData
*tester
, int id
)
928 flg_closing_ (false),
938 ACE_DEBUG ((LM_DEBUG
,
939 ACE_TEXT ("(%t) Server %d dtor; %d sends (%B bytes); ")
940 ACE_TEXT ("%d recvs (%B bytes)\n"),
942 this->total_w_
, this->total_snd_
,
943 this->total_r_
, this->total_rcv_
));
944 if (this->io_count_
!= 0)
945 ACE_ERROR ((LM_WARNING
,
946 ACE_TEXT ("(%t) Server %d deleted with ")
947 ACE_TEXT ("%d I/O outstanding\n"),
951 // This test bounces data back and forth between Clients and Servers.
952 // Therefore, if there was significantly more data in one direction, that's
953 // a problem. Remember, the byte counts are unsigned values.
954 int issue_data_warning
= 0;
955 if (this->total_snd_
> this->total_rcv_
)
957 if (this->total_rcv_
== 0)
958 issue_data_warning
= 1;
959 else if (this->total_snd_
/ this->total_rcv_
> 2)
960 issue_data_warning
= 1;
964 if (this->total_snd_
== 0)
965 issue_data_warning
= 1;
966 else if (this->total_rcv_
/ this->total_snd_
> 2)
967 issue_data_warning
= 1;
969 if (issue_data_warning
)
970 ACE_DEBUG ((LM_WARNING
,
971 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
973 if (this->tester_
!= 0)
974 this->tester_
->server_done (this);
976 if (this->handle () != ACE_INVALID_HANDLE
)
977 ACE_OS::closesocket (this->handle ());
980 this->handle (ACE_INVALID_HANDLE
);
986 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
988 this->flg_cancel_
= true;
995 Server::go (ACE_HANDLE handle
, const ACE_INET_Addr
&client
)
997 this->handle (handle
);
998 this->client_addr_
.set (client
);
1000 // Lock this before initiating I/O, else it may complete while we're
1001 // still setting up.
1003 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1005 if (this->ws_
.open (*this, this->handle ()) == -1)
1006 ACE_ERROR ((LM_ERROR
,
1007 ACE_TEXT ("(%t) %p\n"),
1008 ACE_TEXT ("Server::ACE_Asynch_Write_Dgram::open")));
1009 else if (this->rs_
.open (*this, this->handle ()) == -1)
1010 ACE_ERROR ((LM_ERROR
,
1011 ACE_TEXT ("(%t) %p\n"),
1012 ACE_TEXT ("Server::ACE_Asynch_Read_Dgram::open")));
1014 this->initiate_read ();
1017 if (this->io_count_
> 0)
1020 delete this; // Error setting up I/O factories
1024 Server::initiate_read ()
1026 if (this->flg_cancel_
|| this->handle () == ACE_INVALID_HANDLE
)
1029 ACE_Message_Block
*mb
= 0;
1031 ACE_Message_Block (1024), //BUFSIZ + 1),
1034 // Inititiate receive
1036 if (this->rs_
.recv (mb
, unused
, 0) == -1)
1039 ACE_ERROR_RETURN ((LM_ERROR
,
1040 ACE_TEXT ("(%t) Server %d, %p\n"),
1052 Server::initiate_write (ACE_Message_Block
*mb
, size_t nbytes
)
1054 if (this->flg_cancel_
|| this->handle () == ACE_INVALID_HANDLE
)
1063 ACE_ERROR_RETURN((LM_ERROR
,
1064 ACE_TEXT ("(%t) Server %d write nbytes == 0\n"),
1069 if (this->ws_
.send (mb
, nbytes
, 0, this->client_addr_
) == -1)
1072 ACE_ERROR_RETURN((LM_ERROR
,
1073 ACE_TEXT ("(%t) Server %d, %p\n"),
1075 ACE_TEXT ("write")),
1085 Server::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
)
1088 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1090 ACE_Message_Block
*mb
= result
.message_block ();
1093 mb
->rd_ptr ()[result
.bytes_transferred ()] = '\0';
1099 ACE_DEBUG ((LM_DEBUG
,
1100 ACE_TEXT ("(%t) **** Server %d: handle_read_dgram() ****\n"),
1102 ACE_DEBUG ((LM_DEBUG
,
1103 ACE_TEXT ("%s = %B\n"),
1104 ACE_TEXT ("bytes_to_read"),
1105 result
.bytes_to_read ()));
1106 ACE_DEBUG ((LM_DEBUG
,
1107 ACE_TEXT ("%s = %d\n"),
1108 ACE_TEXT ("handle"),
1110 ACE_DEBUG ((LM_DEBUG
,
1111 ACE_TEXT ("%s = %B\n"),
1112 ACE_TEXT ("bytes_transfered"),
1113 result
.bytes_transferred ()));
1114 ACE_DEBUG ((LM_DEBUG
,
1115 ACE_TEXT ("%s = %@\n"),
1118 ACE_DEBUG ((LM_DEBUG
,
1119 ACE_TEXT ("%s = %d\n"),
1120 ACE_TEXT ("success"),
1121 result
.success ()));
1122 ACE_DEBUG ((LM_DEBUG
,
1123 ACE_TEXT ("%s = %@\n"),
1124 ACE_TEXT ("completion_key"),
1125 result
.completion_key ()));
1126 ACE_DEBUG ((LM_DEBUG
,
1127 ACE_TEXT ("%s = %d\n"),
1130 ACE_DEBUG ((LM_DEBUG
,
1131 ACE_TEXT ("%s = %s\n"),
1132 ACE_TEXT ("message_block"),
1134 ACE_DEBUG ((LM_DEBUG
,
1135 ACE_TEXT ("**** end of message ****************\n")));
1137 else if (result
.error () != 0)
1139 ACE_Log_Priority prio
;
1140 #if defined (ACE_WIN32)
1141 if (result
.error () == ERROR_OPERATION_ABORTED
)
1144 if (result
.error () == ECANCELED
)
1146 #endif /* ACE_WIN32 */
1149 ACE_LOG_MSG
->errnum (result
.error ());
1150 ACE_LOG_MSG
->log (prio
,
1151 ACE_TEXT ("(%t) Server %d; %p\n"),
1155 else if (loglevel
> 0)
1157 ACE_DEBUG ((LM_DEBUG
,
1158 ACE_TEXT ("(%t) Server %d: read %B bytes\n"),
1160 result
.bytes_transferred ()));
1163 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1165 this->total_rcv_
+= result
.bytes_transferred ();
1167 // If client says we're done, ack it; we're done reading.
1168 size_t to_send
= result
.bytes_transferred ();
1169 if (ACE_OS::strcmp (mb
->rd_ptr (), close_req_msg
) == 0)
1171 ACE_DEBUG ((LM_DEBUG
,
1172 ACE_TEXT ("(%t) Server %d saw close request; ack\n"),
1174 this->flg_closing_
= true;
1176 mb
->copy (close_ack_msg
);
1177 to_send
= mb
->length ();
1179 if (this->initiate_write (mb
, to_send
) == 0)
1181 if (duplex
!= 0 && !this->flg_closing_
)
1182 this->initiate_read ();
1189 if (this->io_count_
> 0)
1196 Server::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result
&result
)
1199 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1201 ACE_Message_Block
*mb
= result
.message_block ();
1207 //mb.rd_ptr () [0] = '\0';
1208 mb
->rd_ptr (mb
->rd_ptr () - result
.bytes_transferred ());
1210 ACE_DEBUG ((LM_DEBUG
,
1211 ACE_TEXT ("(%t) **** Server %d: handle_write_dgram() ****\n"),
1213 ACE_DEBUG ((LM_DEBUG
,
1214 ACE_TEXT ("%s = %B\n"),
1215 ACE_TEXT ("bytes_to_write"),
1216 result
.bytes_to_write ()));
1217 ACE_DEBUG ((LM_DEBUG
,
1218 ACE_TEXT ("%s = %d\n"),
1219 ACE_TEXT ("handle"),
1221 ACE_DEBUG ((LM_DEBUG
,
1222 ACE_TEXT ("%s = %B\n"),
1223 ACE_TEXT ("bytes_transfered"),
1224 result
.bytes_transferred ()));
1225 ACE_DEBUG ((LM_DEBUG
,
1226 ACE_TEXT ("%s = %@\n"),
1229 ACE_DEBUG ((LM_DEBUG
,
1230 ACE_TEXT ("%s = %d\n"),
1231 ACE_TEXT ("success"),
1232 result
.success ()));
1233 ACE_DEBUG ((LM_DEBUG
,
1234 ACE_TEXT ("%s = %@\n"),
1235 ACE_TEXT ("completion_key"),
1236 result
.completion_key ()));
1237 ACE_DEBUG ((LM_DEBUG
,
1238 ACE_TEXT ("%s = %d\n"),
1241 ACE_DEBUG ((LM_DEBUG
,
1242 ACE_TEXT ("%s = %s\n"),
1243 ACE_TEXT ("message_block"),
1245 ACE_DEBUG ((LM_DEBUG
,
1246 ACE_TEXT ("**** end of message ****************\n")));
1248 else if (result
.error () != 0)
1250 ACE_Log_Priority prio
;
1251 #if defined (ACE_WIN32)
1252 if (result
.error () == ERROR_OPERATION_ABORTED
)
1255 if (result
.error () == ECANCELED
)
1257 #endif /* ACE_WIN32 */
1260 ACE_LOG_MSG
->errnum (result
.error ());
1261 ACE_LOG_MSG
->log (prio
,
1262 ACE_TEXT ("(%t) Server %d; %p\n"),
1264 ACE_TEXT ("write"));
1266 else if (loglevel
> 0)
1268 ACE_DEBUG ((LM_DEBUG
,
1269 ACE_TEXT ("(%t) Server %d: wrote %B bytes ok\n"),
1271 result
.bytes_transferred ()));
1276 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1278 this->total_snd_
+= result
.bytes_transferred ();
1280 if (duplex
== 0 && !this->flg_closing_
)
1281 this->initiate_read ();
1285 if (this->io_count_
> 0)
1291 // *******************************************
1294 // Connector creates the proper number of Clients and initiates
1295 // sessions on them.
1296 // *******************************************
1301 Connector (TestData
*tester
);
1303 int start (const ACE_INET_Addr
&addr
, int num
);
1309 // *************************************************************
1311 Connector::Connector (TestData
*tester
)
1317 Connector::start (const ACE_INET_Addr
& addr
, int num
)
1319 ACE_OS::sleep(3); // Let Master get going
1320 if (num
> MAX_CLIENTS
)
1328 for (; rc
< num
; rc
++)
1330 ACE_SOCK_CODgram sock
;
1331 if (sock
.open (addr
) == -1)
1332 ACE_ERROR_BREAK ((LM_ERROR
,
1333 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1335 ACE_TEXT ("open")));
1337 sock
.get_local_addr (me
);
1338 u_short my_port
= ACE_HTONS (me
.get_port_number ());
1339 ACE_INT32 my_addr
= ACE_HTONL (me
.get_ip_address ());
1340 Session_Data session
;
1341 session
.direction_
= 0; // Start
1342 session
.addr_
= my_addr
;
1343 session
.port_
= my_port
;
1344 if (sock
.send (&session
, sizeof (session
)) == -1)
1345 ACE_ERROR_BREAK ((LM_ERROR
,
1346 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1348 ACE_TEXT ("send")));
1349 if (sock
.recv (&session
, sizeof (session
)) == -1)
1350 ACE_ERROR_BREAK ((LM_ERROR
,
1351 ACE_TEXT ("(%t) Starting client %d: %p\n"),
1353 ACE_TEXT ("recv")));
1354 ACE_INET_Addr server
;
1355 server
.set (session
.port_
, session
.addr_
, 0);
1356 Client
*client
= this->tester_
->client_up ();
1357 ACE_TCHAR me_str
[80], server_str
[80];
1358 me
.addr_to_string (me_str
, 80);
1359 server
.addr_to_string (server_str
, 80);
1360 ACE_DEBUG ((LM_DEBUG
,
1361 ACE_TEXT ("(%t) Client %d setting up local %s, peer %s\n"),
1366 if (sock
.open (server
, me
) == -1)
1367 ACE_ERROR_BREAK ((LM_ERROR
,
1368 ACE_TEXT ("(%t) Re-opening %p\n"),
1369 ACE_TEXT ("client")));
1370 client
->go (sock
.get_handle (), server
);
1371 sock
.set_handle (ACE_INVALID_HANDLE
);
1379 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
1382 Client::Client (TestData
*tester
, int id
)
1387 flg_cancel_ (false),
1397 ACE_DEBUG ((LM_DEBUG
,
1398 ACE_TEXT ("(%t) Client %d dtor; %d sends (%B bytes); ")
1399 ACE_TEXT ("%d recvs (%B bytes)\n"),
1401 this->total_w_
, this->total_snd_
,
1402 this->total_r_
, this->total_rcv_
));
1403 if (this->io_count_
!= 0)
1404 ACE_ERROR ((LM_WARNING
,
1405 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1409 // This test bounces data back and forth between Clients and Servers.
1410 // Therefore, if there was significantly more data in one direction, that's
1411 // a problem. Remember, the byte counts are unsigned values.
1412 int issue_data_warning
= 0;
1413 if (this->total_snd_
> this->total_rcv_
)
1415 if (this->total_rcv_
== 0)
1416 issue_data_warning
= 1;
1417 else if (this->total_snd_
/ this->total_rcv_
> 2)
1418 issue_data_warning
= 1;
1422 if (this->total_snd_
== 0)
1423 issue_data_warning
= 1;
1424 else if (this->total_rcv_
/ this->total_snd_
> 2)
1425 issue_data_warning
= 1;
1427 if (issue_data_warning
)
1428 ACE_DEBUG ((LM_WARNING
,
1429 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1431 if (this->tester_
!= 0)
1432 this->tester_
->client_done (this);
1435 if (this->handle () != ACE_INVALID_HANDLE
)
1437 ACE_OS::closesocket (this->handle ());
1439 this->handle (ACE_INVALID_HANDLE
);
1445 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1447 this->flg_cancel_
= true;
1448 this->ws_
.cancel ();
1449 this->rs_
.cancel ();
1456 // This must be called with the lock_ held.
1457 ++this->stop_writing_
;
1458 ACE_DEBUG ((LM_DEBUG
,
1459 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1460 this->id_
, this->io_count_
));
1465 Client::go (ACE_HANDLE handle
, const ACE_INET_Addr
&server
)
1468 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1470 this->handle (handle
);
1471 this->server_addr_
.set (server
);
1473 // Open send and receive factories.
1474 if (this->ws_
.open (*this, handle
) == -1)
1475 ACE_ERROR ((LM_ERROR
,
1476 ACE_TEXT ("(%t) Client %d: %p\n"),
1478 ACE_TEXT ("ACE_Asynch_Write_Dgram::open")));
1479 else if (this->rs_
.open (*this, handle
) == -1)
1480 ACE_ERROR ((LM_ERROR
,
1481 ACE_TEXT ("(%t) Client %d: %p\n"),
1483 ACE_TEXT ("ACE_Asynch_Read_Dgram::open")));
1484 else if (this->initiate_write () == 0)
1486 if (duplex
!= 0) // Start an asynchronous read
1487 this->initiate_read ();
1490 if (this->io_count_
> 0)
1497 Client::initiate_write ()
1499 if (this->flg_cancel_
|| this->handle () == ACE_INVALID_HANDLE
)
1502 // stop_writing_ is set to 1 to say "stop". To avoid repeating the
1503 // close datagram for every echo, only send it once. Sure, there's a risk
1504 // it will get lost, but since this is most often intra-host, don't
1505 // worry about that very small risk.
1506 if (this->stop_writing_
> 0) // Need to tell server to "close"
1508 if (this->stop_writing_
> 1) // Already told server to close
1511 ++this->stop_writing_
;
1512 ACE_DEBUG ((LM_DEBUG
,
1513 ACE_TEXT ("(%t) Client %d requesting close\n"),
1515 ACE_Message_Block
*mb
=
1516 new ACE_Message_Block (ACE_OS::strlen (close_req_msg
) + 1);
1517 mb
->copy (close_req_msg
);
1518 size_t unused
; // Number of bytes sent
1519 if (this->ws_
.send (mb
, unused
, 0, this->server_addr_
) == -1)
1522 ACE_ERROR_RETURN ((LM_ERROR
,
1523 ACE_TEXT ("(%t) Client %d, %p\n"),
1525 ACE_TEXT ("initiating closing send")),
1534 static const size_t complete_message_length
=
1535 ACE_OS::strlen (complete_message
);
1537 #if defined (ACE_WIN32)
1539 ACE_Message_Block
*mb1
= 0,
1543 // No need to allocate +1 for proper printing - the memory includes it already
1544 ACE_NEW_RETURN (mb1
,
1545 ACE_Message_Block ((char *)complete_message
,
1546 complete_message_length
),
1549 ACE_NEW_RETURN (mb2
,
1550 ACE_Message_Block ((char *)complete_message
,
1551 complete_message_length
),
1554 ACE_NEW_RETURN (mb3
,
1555 ACE_Message_Block ((char *)complete_message
,
1556 complete_message_length
),
1559 mb1
->wr_ptr (complete_message_length
);
1560 mb2
->wr_ptr (complete_message_length
);
1561 mb3
->wr_ptr (complete_message_length
);
1563 // chain them together
1567 size_t unused
; // Number of bytes sent
1568 if (this->ws_
.send (mb1
, unused
, 0, this->server_addr_
) == -1)
1571 ACE_ERROR_RETURN((LM_ERROR
,
1572 ACE_TEXT ("(%t) %p\n"),
1573 ACE_TEXT ("Client::ACE_Asynch_Write_Dgram::send")),
1576 #else /* ACE_WIN32 */
1578 ACE_Message_Block
*mb
= 0;
1580 // No need to allocate +1 for proper printing - the memory includes
1583 ACE_Message_Block (complete_message
,
1584 complete_message_length
),
1586 mb
->wr_ptr (complete_message_length
);
1587 size_t unused
; // Number of bytes sent
1588 if (this->ws_
.send (mb
, unused
, 0, this->server_addr_
) == -1)
1591 ACE_ERROR_RETURN((LM_ERROR
,
1592 ACE_TEXT ("(%t) Client %d, %p\n"),
1597 #endif /* ACE_WIN32 */
1605 Client::initiate_read ()
1607 if (this->flg_cancel_
|| this->handle_
== ACE_INVALID_HANDLE
)
1610 static const size_t complete_message_length
=
1611 ACE_OS::strlen (complete_message
);
1613 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
1614 ACE_Message_Block
*mb1
= 0,
1621 // We allocate +1 only for proper printing - we can just set the last byte
1622 // to '\0' before printing out
1623 ACE_NEW_RETURN (mb1
, ACE_Message_Block (complete_message_length
+ 1), -1);
1624 ACE_NEW_RETURN (mb2
, ACE_Message_Block (complete_message_length
+ 1), -1);
1625 ACE_NEW_RETURN (mb3
, ACE_Message_Block (complete_message_length
+ 1), -1);
1627 // Let allocate memory for one more triplet,
1628 // This improves performance
1629 // as we can receive more the than one block at once
1630 // Generally, we can receive more triplets ....
1631 ACE_NEW_RETURN (mb4
, ACE_Message_Block (complete_message_length
+ 1), -1);
1632 ACE_NEW_RETURN (mb5
, ACE_Message_Block (complete_message_length
+ 1), -1);
1633 ACE_NEW_RETURN (mb6
, ACE_Message_Block (complete_message_length
+ 1), -1);
1643 // hide last byte in each message block, reserving it for later to set '\0'
1644 // for proper printouts
1645 mb1
->size (mb1
->size () - 1);
1646 mb2
->size (mb2
->size () - 1);
1647 mb3
->size (mb3
->size () - 1);
1649 mb4
->size (mb4
->size () - 1);
1650 mb5
->size (mb5
->size () - 1);
1651 mb6
->size (mb6
->size () - 1);
1655 if (this->rs_
.recv (mb1
, unused
, 0) == -1)
1658 ACE_ERROR_RETURN ((LM_ERROR
,
1659 ACE_TEXT ("(%t) %p\n"),
1660 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1663 #else /* ACE_HAS_WIN32_OVERLAPPED_IO */
1665 // Try to read more chunks
1666 size_t blksize
= ( complete_message_length
> BUFSIZ
) ?
1667 complete_message_length
: BUFSIZ
;
1669 ACE_Message_Block
*mb
= 0;
1671 // We allocate +1 only for proper printing - we can just set the last byte
1672 // to '\0' before printing out
1674 ACE_Message_Block (blksize
+ 1),
1679 if (this->rs_
.recv (mb
, unused
, 0) == -1)
1682 ACE_ERROR_RETURN ((LM_ERROR
,
1683 ACE_TEXT ("(%t) Client %d, %p\n"),
1688 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
1696 Client::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result
&result
)
1699 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1701 ACE_Message_Block
*mb
= result
.message_block ();
1707 ACE_DEBUG ((LM_DEBUG
,
1708 ACE_TEXT ("(%t) **** Client %d: handle_write_dgram() ****\n"),
1710 ACE_DEBUG ((LM_DEBUG
,
1711 ACE_TEXT ("%s = %B\n"),
1712 ACE_TEXT ("bytes_to_write"),
1713 result
.bytes_to_write ()));
1714 ACE_DEBUG ((LM_DEBUG
,
1715 ACE_TEXT ("%s = %d\n"),
1716 ACE_TEXT ("handle"),
1718 ACE_DEBUG ((LM_DEBUG
,
1719 ACE_TEXT ("%s = %B\n"),
1720 ACE_TEXT ("bytes_transfered"),
1721 result
.bytes_transferred ()));
1722 ACE_DEBUG ((LM_DEBUG
,
1723 ACE_TEXT ("%s = %@\n"),
1726 ACE_DEBUG ((LM_DEBUG
,
1727 ACE_TEXT ("%s = %d\n"),
1728 ACE_TEXT ("success"),
1729 result
.success ()));
1730 ACE_DEBUG ((LM_DEBUG
,
1731 ACE_TEXT ("%s = %@\n"),
1732 ACE_TEXT ("completion_key"),
1733 result
.completion_key ()));
1734 ACE_DEBUG ((LM_DEBUG
,
1735 ACE_TEXT ("%s = %d\n"),
1739 #if defined (ACE_WIN32)
1740 size_t bytes_transferred
= result
.bytes_transferred ();
1742 for (ACE_Message_Block
* mb_i
= mb
;
1743 (mb_i
!= 0) && (bytes_transferred
> 0);
1744 mb_i
= mb_i
->cont ())
1746 // write 0 at string end for proper printout (if end of mb,
1748 mb_i
->rd_ptr()[0] = '\0';
1750 size_t len
= mb_i
->rd_ptr () - mb_i
->base ();
1752 // move rd_ptr backwards as required for printout
1753 if (len
>= bytes_transferred
)
1755 mb_i
->rd_ptr (0 - bytes_transferred
);
1756 bytes_transferred
= 0;
1760 mb_i
->rd_ptr (0 - len
);
1761 bytes_transferred
-= len
;
1765 ACE_DEBUG ((LM_DEBUG
,
1766 ACE_TEXT ("%s%d = %s\n"),
1767 ACE_TEXT ("message_block, part "),
1771 #else /* ACE_WIN32 */
1772 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1773 mb
->rd_ptr()[0] = '\0';
1774 // move rd_ptr backwards as required for printout
1775 mb
->rd_ptr (- result
.bytes_transferred ());
1776 ACE_DEBUG ((LM_DEBUG
,
1777 ACE_TEXT ("%s = %s\n"),
1778 ACE_TEXT ("message_block"),
1780 #endif /* ACE_WIN32 */
1782 ACE_DEBUG ((LM_DEBUG
,
1783 ACE_TEXT ("**** end of message ****************\n")));
1785 else if (result
.error () != 0)
1787 ACE_Log_Priority prio
;
1788 #if defined (ACE_WIN32)
1789 if (result
.error () == ERROR_OPERATION_ABORTED
)
1792 if (result
.error () == ECANCELED
)
1794 #endif /* ACE_WIN32 */
1797 ACE_LOG_MSG
->errnum (result
.error ());
1798 ACE_LOG_MSG
->log (prio
,
1799 ACE_TEXT ("(%t) Client %d; %p\n"),
1801 ACE_TEXT ("write"));
1803 else if (loglevel
> 0)
1805 ACE_DEBUG ((LM_DEBUG
,
1806 ACE_TEXT ("(%t) Client %d: wrote %B bytes ok\n"),
1808 result
.bytes_transferred ()));
1813 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1815 this->total_snd_
+= result
.bytes_transferred ();
1816 if (this->total_snd_
>= xfer_limit
)
1818 ACE_DEBUG ((LM_DEBUG
,
1819 ACE_TEXT ("(%t) Client %d sent %B, limit %B\n"),
1820 this->id_
, this->total_snd_
, xfer_limit
));
1823 if (duplex
!= 0) // full duplex, continue write
1825 if ((this->total_snd_
- this->total_rcv_
) < 1024*32 ) //flow control
1826 this->initiate_write ();
1828 else // half-duplex read reply, after read we will start write
1829 this->initiate_read ();
1833 if (this->io_count_
> 0)
1840 Client::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result
&result
)
1843 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1845 ACE_Message_Block
*mb
= result
.message_block ();
1851 ACE_DEBUG ((LM_DEBUG
,
1852 ACE_TEXT ("(%t) **** Client %d: handle_read_dgram() ****\n"),
1854 ACE_DEBUG ((LM_DEBUG
,
1855 ACE_TEXT ("%s = %B\n"),
1856 ACE_TEXT ("bytes_to_read"),
1857 result
.bytes_to_read ()));
1858 ACE_DEBUG ((LM_DEBUG
,
1859 ACE_TEXT ("%s = %d\n"),
1860 ACE_TEXT ("handle"),
1862 ACE_DEBUG ((LM_DEBUG
,
1863 ACE_TEXT ("%s = %B\n"),
1864 ACE_TEXT ("bytes_transfered"),
1865 result
.bytes_transferred ()));
1866 ACE_DEBUG ((LM_DEBUG
,
1867 ACE_TEXT ("%s = %@\n"),
1870 ACE_DEBUG ((LM_DEBUG
,
1871 ACE_TEXT ("%s = %d\n"),
1872 ACE_TEXT ("success"),
1873 result
.success ()));
1874 ACE_DEBUG ((LM_DEBUG
,
1875 ACE_TEXT ("%s = %@\n"),
1876 ACE_TEXT ("completion_key"),
1877 result
.completion_key ()));
1878 ACE_DEBUG ((LM_DEBUG
,
1879 ACE_TEXT ("%s = %d\n"),
1883 #if defined (ACE_WIN32)
1885 for (ACE_Message_Block
* mb_i
= mb
;
1887 mb_i
= mb_i
->cont ())
1890 // write 0 at string end for proper printout
1891 mb_i
->wr_ptr()[0] = '\0';
1893 ACE_DEBUG ((LM_DEBUG
,
1894 ACE_TEXT ("%s%d = %s\n"),
1895 ACE_TEXT ("message_block, part "),
1899 #else /* ACE_WIN32 */
1900 // write 0 at string end for proper printout
1901 mb
->rd_ptr()[result
.bytes_transferred ()] = '\0'; // for proper printout
1902 ACE_DEBUG ((LM_DEBUG
,
1903 ACE_TEXT ("%s = %s\n"),
1904 ACE_TEXT ("message_block"),
1906 #endif /* ACE_WIN32 */
1908 ACE_DEBUG ((LM_DEBUG
,
1909 ACE_TEXT ("**** end of message ****************\n")));
1911 else if (result
.error () != 0)
1913 ACE_Log_Priority prio
;
1914 #if defined (ACE_WIN32)
1915 if (result
.error () == ERROR_OPERATION_ABORTED
)
1918 if (result
.error () == ECANCELED
)
1920 #endif /* ACE_WIN32 */
1923 ACE_Log_Msg::instance ()->errnum (result
.error ());
1924 ACE_Log_Msg::instance ()->log (prio
,
1925 ACE_TEXT ("(%t) Client %d; %p\n"),
1929 else if (loglevel
> 0)
1931 ACE_DEBUG ((LM_DEBUG
,
1932 ACE_TEXT ("(%t) Client %d: read %B bytes ok\n"),
1934 result
.bytes_transferred ()));
1937 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1939 this->total_rcv_
+= result
.bytes_transferred ();
1941 // If we've closed and the server acked, we're done.
1942 if (this->stop_writing_
&&
1943 ACE_OS::strcmp (mb
->rd_ptr (), close_ack_msg
) == 0)
1945 ACE_DEBUG ((LM_DEBUG
,
1946 ACE_TEXT ("(%t) Client %d recvd close-ack\n"),
1952 this->initiate_read ();
1953 else // half-duplex write, after write we will start read
1954 this->initiate_write ();
1960 if (this->io_count_
> 0)
1966 // *************************************************************
1967 // Configuration helpers
1968 // *************************************************************
1970 print_usage (int /* argc */, ACE_TCHAR
*argv
[])
1974 ACE_TEXT ("\nusage: %s")
1975 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1976 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1977 ACE_TEXT ("\n a AIOCB")
1978 ACE_TEXT ("\n i SIG")
1979 ACE_TEXT ("\n c CB")
1980 ACE_TEXT ("\n d default")
1981 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1982 ACE_TEXT ("\n-h <host> for Client mode")
1983 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1984 ACE_TEXT ("\n-p <port to listen/connect>")
1985 ACE_TEXT ("\n-c <number of client instances>")
1986 ACE_TEXT ("\n-b run client and server at the same time")
1987 ACE_TEXT ("\n f file")
1988 ACE_TEXT ("\n c console")
1989 ACE_TEXT ("\n-v log level")
1990 ACE_TEXT ("\n 0 - log errors and highlights")
1991 ACE_TEXT ("\n 1 - log level 0 plus progress information")
1992 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
1993 ACE_TEXT ("\n-x max transfer byte count per Client")
1994 ACE_TEXT ("\n-u show this message")
2002 set_proactor_type (const ACE_TCHAR
*ptype
)
2007 switch (ACE_OS::ace_toupper (*ptype
))
2010 proactor_type
= DEFAULT
;
2013 proactor_type
= AIOCB
;
2016 proactor_type
= SIG
;
2018 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
2022 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
2030 parse_args (int argc
, ACE_TCHAR
*argv
[])
2032 // First, set up all the defaults then let any args change them.
2033 both
= 1; // client and server simultaneosly
2034 duplex
= 1; // full duplex is on
2035 host
= ACE_LOCALHOST
; // server to connect
2036 port
= ACE_DEFAULT_SERVER_PORT
; // port to connect/listen
2037 max_aio_operations
= 512; // POSIX Proactor params
2038 proactor_type
= DEFAULT
; // Proactor type = default
2039 threads
= 3; // size of Proactor thread pool
2040 clients
= 10; // number of clients
2041 loglevel
= 0; // log level : only errors and highlights
2042 // Default transfer limit 50 messages per Sender
2043 xfer_limit
= 50 * ACE_OS::strlen (complete_message
);
2045 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
2046 # if defined (ACE_LINUX)
2050 if (argc
== 1) // no arguments , so one button test
2053 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
2056 while ((c
= get_opt ()) != EOF
)
2060 case 'x': // xfer limit
2061 xfer_limit
= static_cast<size_t> (ACE_OS::atoi (get_opt
.opt_arg ()));
2062 if (xfer_limit
== 0)
2063 xfer_limit
= 1; // Bare minimum.
2065 case 'b': // both client and server
2068 case 'v': // log level
2069 loglevel
= ACE_OS::atoi (get_opt
.opt_arg ());
2072 duplex
= ACE_OS::atoi (get_opt
.opt_arg ());
2074 case 'h': // host for sender
2075 host
= get_opt
.opt_arg ();
2077 case 'p': // port number
2078 port
= ACE_OS::atoi (get_opt
.opt_arg ());
2080 case 'n': // thread pool size
2081 threads
= ACE_OS::atoi (get_opt
.opt_arg ());
2083 case 'c': // number of clients
2084 clients
= ACE_OS::atoi (get_opt
.opt_arg ());
2085 if (clients
> MAX_CLIENTS
)
2086 clients
= MAX_CLIENTS
;
2088 case 'o': // max number of aio for proactor
2089 max_aio_operations
= ACE_OS::atoi (get_opt
.opt_arg ());
2091 case 't': // Proactor Type
2092 if (set_proactor_type (get_opt
.opt_arg ()))
2094 return print_usage (argc
, argv
);
2097 return print_usage (argc
, argv
);
2105 run_main (int argc
, ACE_TCHAR
*argv
[])
2107 ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
2109 if (::parse_args (argc
, argv
) == -1)
2112 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
2113 disable_signal (SIGPIPE
, SIGPIPE
);
2118 if (task1
.start (threads
, proactor_type
, max_aio_operations
) == 0)
2120 // NOTE - there's no real reason this test is limited to IPv4 other
2121 // than the way Session_Data is set up - to expand this test to work
2122 // on IPv6 as well as IPv4, you need to do some work on passing the
2123 // Session_Data address differently.
2124 ACE_INET_Addr
addr (port
, ACE_LOCALHOST
, AF_INET
);
2125 Master
master (&test
, addr
, clients
);
2126 Connector
connector (&test
);
2129 if (both
!= 0 || host
== 0) // Acceptor
2131 // Already running; if not needed will be deleted soon.
2135 if (both
!= 0 || host
!= 0)
2138 host
= ACE_LOCALHOST
;
2140 if (addr
.set (port
, host
, 1, addr
.get_type ()) == -1)
2141 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), host
));
2143 rc
+= connector
.start (addr
, clients
);
2146 // Wait a few seconds to let things get going, then poll til
2147 // all sessions are done. Note that when we exit this scope, the
2148 // Acceptor and Connector will be destroyed, which should prevent
2149 // further connections and also test how well destroyed handlers
2153 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
2154 while (!test
.testing_done ())
2159 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
2170 run_main (int, ACE_TCHAR
*[])
2172 ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
2174 ACE_DEBUG ((LM_INFO
,
2175 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
2176 ACE_TEXT ("Proactor_UDP_Test will not be run.\n")));
2183 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */