1 // ============================================================================
3 * @file Proactor_Test.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that does various asynchronous
9 * @author Alexander Libman <alibman@baltimore.com>
11 // ============================================================================
13 #include "test_config.h"
15 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
16 // This only works on Win32 platforms and on Unix platforms
17 // supporting POSIX aio calls.
19 #include "ace/Signal.h"
21 #include "ace/Service_Config.h"
22 #include "ace/INET_Addr.h"
23 #include "ace/SOCK_Connector.h"
24 #include "ace/SOCK_Acceptor.h"
25 #include "ace/SOCK_Stream.h"
26 #include "ace/Object_Manager.h"
27 #include "ace/Get_Opt.h"
29 #include "ace/Proactor.h"
30 #include "ace/Asynch_Acceptor.h"
31 #include "ace/Asynch_Connector.h"
33 #include "ace/Thread_Semaphore.h"
34 #include "ace/OS_NS_ctype.h"
35 #include "ace/OS_NS_errno.h"
36 #include "ace/OS_NS_signal.h"
37 #include "ace/OS_NS_string.h"
38 #include "ace/OS_NS_unistd.h"
39 #include "ace/OS_NS_sys_socket.h"
40 #include "ace/os_include/netinet/os_tcp.h"
42 #include "ace/Atomic_Op.h"
43 #include "ace/Synch_Traits.h"
45 #if defined (ACE_WIN32)
47 # include "ace/WIN32_Proactor.h"
49 #elif defined (ACE_HAS_AIO_CALLS)
51 # include "ace/POSIX_Proactor.h"
52 # include "ace/POSIX_CB_Proactor.h"
54 #endif /* ACE_WIN32 */
56 #include "Proactor_Test.h"
59 // Proactor Type (UNIX only, Win32 ignored)
60 using ProactorType
= enum { DEFAULT
= 0, AIOCB
, SIG
, CB
};
61 static ProactorType proactor_type
= DEFAULT
;
63 // POSIX : > 0 max number aio operations proactor,
64 static size_t max_aio_operations
= 0;
66 // both: 0 run client or server / depends on host
67 // != 0 run client and server
70 // Host that we're connecting to.
71 static const ACE_TCHAR
*host
= 0;
73 // number of Client instances
74 static int clients
= 1;
75 const int MAX_CLIENTS
= 1000;
76 const int MAX_SERVERS
= 1000;
78 // duplex mode: == 0 half-duplex
80 static int duplex
= 0;
82 // number threads in the Proactor thread pool
83 static int threads
= 1;
85 // Port that we're receiving connections on.
86 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
89 static int loglevel
; // 0 full , 1 only errors
91 static size_t xfer_limit
; // Number of bytes for Client to send.
93 static char complete_message
[] =
96 "Accept-Language: C++\r\n"
97 "Accept-Encoding: gzip, deflate\r\n"
98 "User-Agent: Proactor_Test/1.0 (non-compatible)\r\n"
99 "Connection: Keep-Alive\r\n"
105 LogLocker () { ACE_LOG_MSG
->acquire (); }
106 virtual ~LogLocker () { ACE_LOG_MSG
->release (); }
110 // Function to remove signals from the signal mask.
112 disable_signal (int sigmin
, int sigmax
)
114 #if !defined (ACE_LACKS_UNIX_SIGNALS)
116 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
117 ACE_ERROR ((LM_ERROR
,
118 ACE_TEXT ("Error: (%P|%t):%p\n"),
119 ACE_TEXT ("sigemptyset failed")));
121 for (int i
= sigmin
; i
<= sigmax
; i
++)
122 ACE_OS::sigaddset (&signal_set
, i
);
124 // Put the <signal_set>.
125 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
126 // In multi-threaded application this is not POSIX compliant
127 // but let's leave it just in case.
128 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
130 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
131 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
132 ACE_ERROR_RETURN ((LM_ERROR
,
133 ACE_TEXT ("Error: (%P|%t): %p\n"),
134 ACE_TEXT ("SIG_BLOCK failed")),
137 ACE_UNUSED_ARG (sigmin
);
138 ACE_UNUSED_ARG (sigmax
);
139 #endif /* ACE_LACKS_UNIX_SIGNALS */
145 // *************************************************************
146 // MyTask is ACE_Task resposible for :
147 // 1. creation and deletion of
148 // Proactor and Proactor thread pool
149 // 2. running Proactor event loop
150 // *************************************************************
155 * MyTask plays role for Proactor threads pool
157 * MyTask is ACE_Task resposible for:
158 * 1. Creation and deletion of Proactor and Proactor thread pool
159 * 2. Running Proactor event loop
161 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
166 sem_ ((unsigned int) 0),
171 (void) this->stop ();
172 (void) this->delete_proactor();
177 int start (int num_threads
,
178 ProactorType type_proactor
,
183 int create_proactor (ProactorType type_proactor
,
185 int delete_proactor ();
187 ACE_SYNCH_RECURSIVE_MUTEX lock_
;
188 ACE_Thread_Semaphore sem_
;
189 ACE_Proactor
* proactor_
;
193 MyTask::create_proactor (ProactorType type_proactor
, size_t max_op
)
195 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
200 ACE_TEST_ASSERT (this->proactor_
== 0);
202 #if defined (ACE_WIN32)
204 ACE_UNUSED_ARG (type_proactor
);
205 ACE_UNUSED_ARG (max_op
);
207 ACE_WIN32_Proactor
*proactor_impl
= 0;
209 ACE_NEW_RETURN (proactor_impl
,
213 ACE_DEBUG ((LM_DEBUG
,
214 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
216 #elif defined (ACE_HAS_AIO_CALLS)
218 ACE_POSIX_Proactor
* proactor_impl
= 0;
220 switch (type_proactor
)
223 ACE_NEW_RETURN (proactor_impl
,
224 ACE_POSIX_AIOCB_Proactor (max_op
),
226 ACE_DEBUG ((LM_DEBUG
,
227 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
230 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
232 ACE_NEW_RETURN (proactor_impl
,
233 ACE_POSIX_SIG_Proactor (max_op
),
235 ACE_DEBUG ((LM_DEBUG
,
236 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
238 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
240 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
242 ACE_NEW_RETURN (proactor_impl
,
243 ACE_POSIX_CB_Proactor (max_op
),
245 ACE_DEBUG ((LM_DEBUG
,
246 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
248 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
251 ACE_DEBUG ((LM_DEBUG
,
252 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
256 #endif /* ACE_WIN32 */
258 // always delete implementation 1 , not !(proactor_impl == 0)
259 ACE_NEW_RETURN (this->proactor_
,
260 ACE_Proactor (proactor_impl
, 1 ),
262 // Set new singleton and delete it in close_singleton()
263 ACE_Proactor::instance (this->proactor_
, 1);
268 MyTask::delete_proactor ()
270 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
275 ACE_DEBUG ((LM_DEBUG
,
276 ACE_TEXT ("(%t) Delete Proactor\n")));
278 ACE_Proactor::close_singleton ();
285 MyTask::start (int num_threads
,
286 ProactorType type_proactor
,
289 if (this->create_proactor (type_proactor
, max_op
) == -1)
290 ACE_ERROR_RETURN ((LM_ERROR
,
292 ACE_TEXT ("unable to create proactor")),
295 if (this->activate (THR_NEW_LWP
, num_threads
) == -1)
296 ACE_ERROR_RETURN ((LM_ERROR
,
298 ACE_TEXT ("unable to activate thread pool")),
301 for (; num_threads
> 0; num_threads
--)
313 if (this->proactor_
!= 0)
315 ACE_DEBUG ((LM_DEBUG
,
316 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
318 this->proactor_
->proactor_end_event_loop ();
321 if (this->wait () == -1)
322 ACE_ERROR ((LM_ERROR
,
324 ACE_TEXT ("unable to stop thread pool")));
332 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask started\n")));
334 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
335 disable_signal (SIGPIPE
, SIGPIPE
);
337 // signal that we are ready
340 ACE_Proactor::run_event_loop ();
342 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask finished\n")));
347 // TestData collects and reports on test-related transfer and connection
353 bool testing_done ();
354 Server
*server_up ();
355 Client
*client_up ();
356 void server_done (Server
*s
);
357 void client_done (Client
*c
);
364 // Track number of sessions that report start, and those that report
365 // their end (and stats).
366 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_up_
;
367 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_down_
;
369 // Total read and write bytes for all sessions.
370 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_cnt_
;
371 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_cnt_
;
372 // Total read and write operations issues for all sessions.
373 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_ops_
;
374 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_ops_
;
375 } servers_
, clients_
;
377 ACE_SYNCH_MUTEX list_lock_
;
378 Server
*server_list_
[MAX_SERVERS
];
379 Client
*client_list_
[MAX_CLIENTS
];
382 TestData::TestData ()
385 for (i
= 0; i
< MAX_SERVERS
; ++i
)
386 this->server_list_
[i
] = 0;
387 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
388 this->client_list_
[i
] = 0;
392 TestData::testing_done ()
394 int svr_up
= this->servers_
.sessions_up_
.value ();
395 int svr_dn
= this->servers_
.sessions_down_
.value ();
396 int clt_up
= this->clients_
.sessions_up_
.value ();
397 int clt_dn
= this->clients_
.sessions_down_
.value ();
399 if (svr_up
== 0 && clt_up
== 0) // No connections up yet
402 return (svr_dn
>= svr_up
&& clt_dn
>= clt_up
);
406 TestData::server_up ()
408 ++this->servers_
.sessions_up_
;
409 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
411 for (int i
= 0; i
< MAX_SERVERS
; ++i
)
413 if (this->server_list_
[i
] == 0)
415 ACE_NEW_RETURN (this->server_list_
[i
], Server (this, i
), 0);
416 ACE_DEBUG ((LM_DEBUG
,
417 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
419 this->servers_
.sessions_up_
.value (),
420 this->servers_
.sessions_down_
.value ()));
421 return this->server_list_
[i
];
428 TestData::client_up ()
430 ++this->clients_
.sessions_up_
;
431 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
433 for (int i
= 0; i
< MAX_CLIENTS
; ++i
)
435 if (this->client_list_
[i
] == 0)
437 ACE_NEW_RETURN (this->client_list_
[i
], Client (this, i
), 0);
438 ACE_DEBUG ((LM_DEBUG
,
439 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
441 this->clients_
.sessions_up_
.value (),
442 this->clients_
.sessions_down_
.value ()));
443 return this->client_list_
[i
];
450 TestData::server_done (Server
*s
)
452 this->servers_
.w_cnt_
+= s
->get_total_snd ();
453 this->servers_
.r_cnt_
+= s
->get_total_rcv ();
454 this->servers_
.w_ops_
+= s
->get_total_w ();
455 this->servers_
.r_ops_
+= s
->get_total_r ();
456 ++this->servers_
.sessions_down_
;
457 ACE_DEBUG ((LM_DEBUG
,
458 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
460 this->servers_
.sessions_up_
.value (),
461 this->servers_
.sessions_down_
.value ()));
463 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
465 for (i
= 0; i
< MAX_SERVERS
; ++i
)
467 if (this->server_list_
[i
] == s
)
470 ACE_ERROR ((LM_ERROR
,
471 ACE_TEXT ("Server %d is pos %d in list\n"),
474 this->server_list_
[i
] = 0;
478 if (i
>= MAX_SERVERS
)
479 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Server %@ done but not listed\n"), s
));
485 TestData::client_done (Client
*c
)
487 this->clients_
.w_cnt_
+= c
->get_total_snd ();
488 this->clients_
.r_cnt_
+= c
->get_total_rcv ();
489 this->clients_
.w_ops_
+= c
->get_total_w ();
490 this->clients_
.r_ops_
+= c
->get_total_r ();
491 ++this->clients_
.sessions_down_
;
492 ACE_DEBUG ((LM_DEBUG
,
493 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
495 this->clients_
.sessions_up_
.value (),
496 this->clients_
.sessions_down_
.value ()));
498 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
500 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
502 if (this->client_list_
[i
] == c
)
505 ACE_ERROR ((LM_ERROR
,
506 ACE_TEXT ("Client %d is pos %d in list\n"),
509 this->client_list_
[i
] = 0;
513 if (i
>= MAX_CLIENTS
)
514 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Client %@ done but not listed\n"), c
));
520 TestData::stop_all ()
524 // Lock and cancel everything. Then release the lock, possibly allowing
525 // cleanups, then grab it again and delete all Servers and Clients.
527 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
528 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
530 if (this->client_list_
[i
] != 0)
531 this->client_list_
[i
]->cancel ();
534 for (i
= 0; i
< MAX_SERVERS
; ++i
)
536 if (this->server_list_
[i
] != 0)
537 this->server_list_
[i
]->cancel ();
541 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
542 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
544 if (this->client_list_
[i
] != 0)
545 delete this->client_list_
[i
];
548 for (i
= 0; i
< MAX_SERVERS
; ++i
)
550 if (this->server_list_
[i
] != 0)
551 delete this->server_list_
[i
];
560 ACE_TCHAR bufs
[256];
561 ACE_TCHAR bufr
[256];
563 ACE_OS::snprintf (bufs
, 256,
564 ACE_SIZE_T_FORMAT_SPECIFIER
565 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
566 this->clients_
.w_cnt_
.value (),
567 this->clients_
.w_ops_
.value ());
569 ACE_OS::snprintf (bufr
, 256,
570 ACE_SIZE_T_FORMAT_SPECIFIER
571 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
572 this->clients_
.r_cnt_
.value (),
573 this->clients_
.r_ops_
.value ());
575 ACE_DEBUG ((LM_DEBUG
,
576 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
580 ACE_OS::snprintf (bufs
, 256,
581 ACE_SIZE_T_FORMAT_SPECIFIER
582 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
583 this->servers_
.w_cnt_
.value (),
584 this->servers_
.w_ops_
.value ());
586 ACE_OS::snprintf (bufr
, 256,
587 ACE_SIZE_T_FORMAT_SPECIFIER
588 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
589 this->servers_
.r_cnt_
.value (),
590 this->servers_
.r_ops_
.value ());
592 ACE_DEBUG ((LM_DEBUG
,
593 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
597 if (this->clients_
.w_cnt_
.value () == 0 ||
598 this->clients_
.r_cnt_
.value () == 0 ||
599 this->servers_
.w_cnt_
.value () == 0 ||
600 this->servers_
.r_cnt_
.value () == 0 )
601 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("It appears that this test didn't ")
602 ACE_TEXT ("really do anything. Something is very wrong.\n")));
606 class Acceptor
: public ACE_Asynch_Acceptor
<Server
>
609 Acceptor (TestData
*tester
);
610 ~Acceptor () override
;
612 // Virtual from ACE_Asynch_Acceptor
613 Server
*make_handler () override
;
619 // *************************************************************
620 Acceptor::Acceptor (TestData
*tester
)
625 Acceptor::~Acceptor ()
631 Acceptor::make_handler ()
633 return this->tester_
->server_up ();
636 // ***************************************************
639 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
642 Server::Server (TestData
*tester
, int id
)
645 handle_ (ACE_INVALID_HANDLE
),
657 ACE_DEBUG ((LM_DEBUG
,
658 ACE_TEXT ("(%t) Server %d dtor; %d sends (%B bytes); ")
659 ACE_TEXT ("%d recvs (%B bytes)\n"),
661 this->total_w_
, this->total_snd_
,
662 this->total_r_
, this->total_rcv_
));
663 if (this->io_count_
!= 0)
664 ACE_ERROR ((LM_WARNING
,
665 ACE_TEXT ("(%t) Server %d deleted with ")
666 ACE_TEXT ("%d I/O outstanding\n"),
670 // This test bounces data back and forth between Clients and Servers.
671 // Therefore, if there was significantly more data in one direction, that's
672 // a problem. Remember, the byte counts are unsigned values.
673 int issue_data_warning
= 0;
674 if (this->total_snd_
> this->total_rcv_
)
676 if (this->total_rcv_
== 0)
677 issue_data_warning
= 1;
678 else if (this->total_snd_
/ this->total_rcv_
> 2)
679 issue_data_warning
= 1;
683 if (this->total_snd_
== 0)
684 issue_data_warning
= 1;
685 else if (this->total_rcv_
/ this->total_snd_
> 2)
686 issue_data_warning
= 1;
688 if (issue_data_warning
)
689 ACE_DEBUG ((LM_WARNING
,
690 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
692 if (this->tester_
!= 0)
693 this->tester_
->server_done (this);
695 if (this->handle_
!= ACE_INVALID_HANDLE
)
696 ACE_OS::closesocket (this->handle_
);
699 this->handle_
= ACE_INVALID_HANDLE
;
705 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
707 this->flg_cancel_
= 1;
715 Server::addresses (const ACE_INET_Addr
& peer
, const ACE_INET_Addr
&)
718 if (0 == peer
.addr_to_string (str
, sizeof (str
) / sizeof (ACE_TCHAR
)))
719 ACE_DEBUG ((LM_DEBUG
,
720 ACE_TEXT ("(%t) Server %d connection from %s\n"),
724 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) Server %d %p\n"),
726 ACE_TEXT ("addr_to_string")));
732 Server::open (ACE_HANDLE handle
, ACE_Message_Block
&)
735 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
737 // Don't buffer serial sends.
738 this->handle_
= handle
;
740 ACE_SOCK_Stream
option_setter (handle
);
741 if (-1 == option_setter
.set_option (ACE_IPPROTO_TCP
,
745 ACE_ERROR ((LM_ERROR
, "%p\n", "set_option"));
747 if (this->ws_
.open (*this, this->handle_
) == -1)
748 ACE_ERROR ((LM_ERROR
,
749 ACE_TEXT ("(%t) %p\n"),
750 ACE_TEXT ("Server::ACE_Asynch_Write_Stream::open")));
751 else if (this->rs_
.open (*this, this->handle_
) == -1)
752 ACE_ERROR ((LM_ERROR
,
753 ACE_TEXT ("(%t) %p\n"),
754 ACE_TEXT ("Server::ACE_Asynch_Read_Stream::open")));
756 this->initiate_read_stream ();
758 if (this->io_count_
> 0)
765 Server::initiate_read_stream ()
767 if (this->flg_cancel_
!= 0 || this->handle_
== ACE_INVALID_HANDLE
)
770 ACE_Message_Block
*mb
= 0;
772 ACE_Message_Block (1024), //BUFSIZ + 1),
776 if (this->rs_
.read (*mb
, mb
->size () - 1) == -1)
779 #if defined (ACE_WIN32)
780 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
781 // a 0-byte read as we would if underlying calls used WSARecv.
782 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
783 ACE_ERROR_RETURN ((LM_DEBUG
,
784 ACE_TEXT ("(%t) Server %d, peer closed\n"),
787 #endif /* ACE_WIN32 */
788 ACE_ERROR_RETURN ((LM_ERROR
,
789 ACE_TEXT ("(%t) Server %d, %p\n"),
801 Server::initiate_write_stream (ACE_Message_Block
&mb
, size_t nbytes
)
803 if (this->flg_cancel_
!= 0 || this->handle_
== ACE_INVALID_HANDLE
)
812 ACE_ERROR_RETURN((LM_ERROR
,
813 ACE_TEXT ("(%t) Server::ACE_Asynch_Write_Stream::write nbytes <0 ")),
817 if (this->ws_
.write (mb
, nbytes
) == -1)
820 #if defined (ACE_WIN32)
821 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
822 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
823 ACE_ERROR_RETURN ((LM_DEBUG
,
824 ACE_TEXT ("(%t) Server %d, peer gone\n"),
827 #endif /* ACE_WIN32 */
828 ACE_ERROR_RETURN((LM_ERROR
,
829 ACE_TEXT ("(%t) Server %d, %p\n"),
841 Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
844 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
846 ACE_Message_Block
& mb
= result
.message_block ();
849 mb
.rd_ptr ()[result
.bytes_transferred ()] = '\0';
855 ACE_DEBUG ((LM_DEBUG
,
856 ACE_TEXT ("(%t) **** Server %d: handle_read_stream() ****\n"),
858 ACE_DEBUG ((LM_DEBUG
,
859 ACE_TEXT ("%s = %B\n"),
860 ACE_TEXT ("bytes_to_read"),
861 result
.bytes_to_read ()));
862 ACE_DEBUG ((LM_DEBUG
,
863 ACE_TEXT ("%s = %d\n"),
866 ACE_DEBUG ((LM_DEBUG
,
867 ACE_TEXT ("%s = %B\n"),
868 ACE_TEXT ("bytes_transfered"),
869 result
.bytes_transferred ()));
870 ACE_DEBUG ((LM_DEBUG
,
871 ACE_TEXT ("%s = %@\n"),
874 ACE_DEBUG ((LM_DEBUG
,
875 ACE_TEXT ("%s = %d\n"),
876 ACE_TEXT ("success"),
878 ACE_DEBUG ((LM_DEBUG
,
879 ACE_TEXT ("%s = %@\n"),
880 ACE_TEXT ("completion_key"),
881 result
.completion_key ()));
882 ACE_DEBUG ((LM_DEBUG
,
883 ACE_TEXT ("%s = %d\n"),
886 ACE_DEBUG ((LM_DEBUG
,
887 ACE_TEXT ("%s = %s\n"),
888 ACE_TEXT ("message_block"),
890 ACE_DEBUG ((LM_DEBUG
,
891 ACE_TEXT ("**** end of message ****************\n")));
893 else if (result
.error () != 0)
895 ACE_Log_Priority prio
;
896 #if defined (ACE_WIN32)
897 if (result
.error () == ERROR_OPERATION_ABORTED
)
900 if (result
.error () == ECANCELED
)
902 #endif /* ACE_WIN32 */
905 ACE_Log_Msg::instance ()->errnum (result
.error ());
906 ACE_Log_Msg::instance ()->log (prio
,
907 ACE_TEXT ("(%t) Server %d; %p\n"),
911 else if (loglevel
> 0)
913 ACE_DEBUG ((LM_DEBUG
,
914 ACE_TEXT ("(%t) Server %d: read %B bytes\n"),
916 result
.bytes_transferred ()));
919 if (result
.error () == 0 && result
.bytes_transferred () > 0)
921 this->total_rcv_
+= result
.bytes_transferred ();
923 if (this->initiate_write_stream (mb
,
924 result
.bytes_transferred ()) == 0)
926 if (duplex
!= 0) // Initiate new read from the stream.
927 this->initiate_read_stream ();
934 if (this->io_count_
> 0)
941 Server::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
)
944 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
946 ACE_Message_Block
& mb
= result
.message_block ();
952 //mb.rd_ptr () [0] = '\0';
953 mb
.rd_ptr (mb
.rd_ptr () - result
.bytes_transferred ());
955 ACE_DEBUG ((LM_DEBUG
,
956 ACE_TEXT ("(%t) **** Server %d: handle_write_stream() ****\n"),
958 ACE_DEBUG ((LM_DEBUG
,
959 ACE_TEXT ("%s = %B\n"),
960 ACE_TEXT ("bytes_to_write"),
961 result
.bytes_to_write ()));
962 ACE_DEBUG ((LM_DEBUG
,
963 ACE_TEXT ("%s = %d\n"),
966 ACE_DEBUG ((LM_DEBUG
,
967 ACE_TEXT ("%s = %B\n"),
968 ACE_TEXT ("bytes_transfered"),
969 result
.bytes_transferred ()));
970 ACE_DEBUG ((LM_DEBUG
,
971 ACE_TEXT ("%s = %@\n"),
974 ACE_DEBUG ((LM_DEBUG
,
975 ACE_TEXT ("%s = %d\n"),
976 ACE_TEXT ("success"),
978 ACE_DEBUG ((LM_DEBUG
,
979 ACE_TEXT ("%s = %@\n"),
980 ACE_TEXT ("completion_key"),
981 result
.completion_key ()));
982 ACE_DEBUG ((LM_DEBUG
,
983 ACE_TEXT ("%s = %d\n"),
986 ACE_DEBUG ((LM_DEBUG
,
987 ACE_TEXT ("%s = %s\n"),
988 ACE_TEXT ("message_block"),
990 ACE_DEBUG ((LM_DEBUG
,
991 ACE_TEXT ("**** end of message ****************\n")));
993 else if (result
.error () != 0)
995 ACE_Log_Priority prio
;
996 #if defined (ACE_WIN32)
997 if (result
.error () == ERROR_OPERATION_ABORTED
)
1000 if (result
.error () == ECANCELED
)
1002 #endif /* ACE_WIN32 */
1005 ACE_Log_Msg::instance ()->errnum (result
.error ());
1006 ACE_Log_Msg::instance ()->log (prio
,
1007 ACE_TEXT ("(%t) Server %d; %p\n"),
1009 ACE_TEXT ("write"));
1011 else if (loglevel
> 0)
1013 ACE_DEBUG ((LM_DEBUG
,
1014 ACE_TEXT ("(%t) Server %d: wrote %B bytes ok\n"),
1016 result
.bytes_transferred ()));
1021 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1023 this->total_snd_
+= result
.bytes_transferred ();
1026 this->initiate_read_stream ();
1030 if (this->io_count_
> 0)
1036 // *******************************************
1038 // *******************************************
1040 class Connector
: public ACE_Asynch_Connector
<Client
>
1043 Connector (TestData
*tester
);
1044 ~Connector () override
;
1046 int start (const ACE_INET_Addr
&addr
, int num
);
1048 // Virtual from ACE_Asynch_Connector
1049 Client
*make_handler () override
;
1055 // *************************************************************
1057 Connector::Connector (TestData
*tester
)
1062 Connector::~Connector ()
1068 Connector::make_handler ()
1070 return this->tester_
->client_up ();
1075 Connector::start (const ACE_INET_Addr
& addr
, int num
)
1077 if (num
> MAX_CLIENTS
)
1085 // int open ( int pass_addresses = 0,
1086 // ACE_Proactor *proactor = 0,
1087 // int validate_new_connection = 0 );
1089 if (this->open (1, 0, 1) != 0)
1091 ACE_ERROR ((LM_ERROR
,
1092 ACE_TEXT ("(%t) %p\n"),
1093 ACE_TEXT ("Connector::open failed")));
1097 for (; rc
< num
; rc
++)
1099 if (this->connect (addr
) != 0)
1101 ACE_ERROR ((LM_ERROR
,
1102 ACE_TEXT ("(%t) %p\n"),
1103 ACE_TEXT ("Connector::connect failed")));
1113 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
1116 Client::Client (TestData
*tester
, int id
)
1119 handle_ (ACE_INVALID_HANDLE
),
1132 ACE_DEBUG ((LM_DEBUG
,
1133 ACE_TEXT ("(%t) Client %d dtor; %d sends (%B bytes); ")
1134 ACE_TEXT ("%d recvs (%B bytes)\n"),
1136 this->total_w_
, this->total_snd_
,
1137 this->total_r_
, this->total_rcv_
));
1138 if (this->io_count_
!= 0)
1139 ACE_ERROR ((LM_WARNING
,
1140 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1144 // This test bounces data back and forth between Clients and Servers.
1145 // Therefore, if there was significantly more data in one direction, that's
1146 // a problem. Remember, the byte counts are unsigned values.
1147 int issue_data_warning
= 0;
1148 if (this->total_snd_
> this->total_rcv_
)
1150 if (this->total_rcv_
== 0)
1151 issue_data_warning
= 1;
1152 else if (this->total_snd_
/ this->total_rcv_
> 2)
1153 issue_data_warning
= 1;
1157 if (this->total_snd_
== 0)
1158 issue_data_warning
= 1;
1159 else if (this->total_rcv_
/ this->total_snd_
> 2)
1160 issue_data_warning
= 1;
1162 if (issue_data_warning
)
1163 ACE_DEBUG ((LM_WARNING
,
1164 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1166 if (this->tester_
!= 0)
1167 this->tester_
->client_done (this);
1170 this->handle_
= ACE_INVALID_HANDLE
;
1171 if (this->handle_
!= ACE_INVALID_HANDLE
)
1173 ACE_OS::closesocket (this->handle_
);
1175 this->handle_
= ACE_INVALID_HANDLE
;
1181 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1183 this->flg_cancel_
= 1;
1184 this->ws_
.cancel ();
1185 this->rs_
.cancel ();
1192 // This must be called with the lock_ held.
1193 ACE_DEBUG ((LM_DEBUG
,
1194 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1195 this->id_
, this->io_count_
));
1196 ACE_OS::shutdown (this->handle_
, ACE_SHUTDOWN_WRITE
);
1197 this->stop_writing_
= 1;
1203 Client::addresses (const ACE_INET_Addr
& /* peer */, const ACE_INET_Addr
& local
)
1206 if (0 == local
.addr_to_string (str
, sizeof (str
) / sizeof (ACE_TCHAR
)))
1207 ACE_DEBUG ((LM_DEBUG
,
1208 ACE_TEXT ("(%t) Client %d connected on %s\n"),
1212 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) Client %d %p\n"),
1214 ACE_TEXT ("addr_to_string")));
1220 Client::open (ACE_HANDLE handle
, ACE_Message_Block
&)
1223 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1225 // Don't buffer serial sends.
1226 this->handle_
= handle
;
1228 ACE_SOCK_Stream
option_setter (handle
);
1229 if (option_setter
.set_option (ACE_IPPROTO_TCP
,
1233 ACE_ERROR ((LM_ERROR
, "%p\n", "set_option"));
1235 // Open ACE_Asynch_Write_Stream
1236 if (this->ws_
.open (*this, this->handle_
) == -1)
1237 ACE_ERROR ((LM_ERROR
,
1238 ACE_TEXT ("(%t) %p\n"),
1239 ACE_TEXT ("Client::ACE_Asynch_Write_Stream::open")));
1241 // Open ACE_Asynch_Read_Stream
1242 else if (this->rs_
.open (*this, this->handle_
) == -1)
1243 ACE_ERROR ((LM_ERROR
,
1244 ACE_TEXT ("(%t) %p\n"),
1245 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::open")));
1247 else if (this->initiate_write_stream () == 0)
1249 if (duplex
!= 0) // Start an asynchronous read
1250 this->initiate_read_stream ();
1253 if (this->io_count_
> 0)
1260 Client::initiate_write_stream ()
1262 if (this->flg_cancel_
!= 0 ||
1263 this->stop_writing_
||
1264 this->handle_
== ACE_INVALID_HANDLE
)
1267 static const size_t complete_message_length
= ACE_OS::strlen (complete_message
);
1269 #if defined (ACE_WIN32)
1271 ACE_Message_Block
*mb1
= 0,
1275 // No need to allocate +1 for proper printing - the memory includes it already
1276 ACE_NEW_RETURN (mb1
,
1277 ACE_Message_Block ((char *)complete_message
,
1278 complete_message_length
),
1281 ACE_NEW_RETURN (mb2
,
1282 ACE_Message_Block ((char *)complete_message
,
1283 complete_message_length
),
1286 ACE_NEW_RETURN (mb3
,
1287 ACE_Message_Block ((char *)complete_message
,
1288 complete_message_length
),
1291 mb1
->wr_ptr (complete_message_length
);
1292 mb2
->wr_ptr (complete_message_length
);
1293 mb3
->wr_ptr (complete_message_length
);
1295 // chain them together
1299 if (this->ws_
.writev (*mb1
, mb1
->total_length ()) == -1)
1302 ACE_ERROR_RETURN((LM_ERROR
,
1303 ACE_TEXT ("(%t) %p\n"),
1304 ACE_TEXT ("Client::ACE_Asynch_Stream::writev")),
1307 #else /* ACE_WIN32 */
1309 ACE_Message_Block
*mb
= 0;
1311 // No need to allocate +1 for proper printing - the memory includes it already
1313 ACE_Message_Block (complete_message
, complete_message_length
),
1315 mb
->wr_ptr (complete_message_length
);
1317 if (this->ws_
.write (*mb
, mb
->length ()) == -1)
1320 #if defined (ACE_WIN32)
1321 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
1322 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
1323 ACE_ERROR_RETURN ((LM_DEBUG
,
1324 ACE_TEXT ("(%t) Client %d, peer gone\n"),
1327 #endif /* ACE_WIN32 */
1328 ACE_ERROR_RETURN((LM_ERROR
,
1329 ACE_TEXT ("(%t) Client %d, %p\n"),
1331 ACE_TEXT ("write")),
1334 #endif /* ACE_WIN32 */
1342 Client::initiate_read_stream ()
1344 if (this->flg_cancel_
!= 0 || this->handle_
== ACE_INVALID_HANDLE
)
1347 static const size_t complete_message_length
=
1348 ACE_OS::strlen (complete_message
);
1350 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
1351 ACE_Message_Block
*mb1
= 0,
1358 // We allocate +1 only for proper printing - we can just set the last byte
1359 // to '\0' before printing out
1360 ACE_NEW_RETURN (mb1
, ACE_Message_Block (complete_message_length
+ 1), -1);
1361 ACE_NEW_RETURN (mb2
, ACE_Message_Block (complete_message_length
+ 1), -1);
1362 ACE_NEW_RETURN (mb3
, ACE_Message_Block (complete_message_length
+ 1), -1);
1364 // Let allocate memory for one more triplet,
1365 // This improves performance
1366 // as we can receive more the than one block at once
1367 // Generally, we can receive more triplets ....
1368 ACE_NEW_RETURN (mb4
, ACE_Message_Block (complete_message_length
+ 1), -1);
1369 ACE_NEW_RETURN (mb5
, ACE_Message_Block (complete_message_length
+ 1), -1);
1370 ACE_NEW_RETURN (mb6
, ACE_Message_Block (complete_message_length
+ 1), -1);
1380 // hide last byte in each message block, reserving it for later to set '\0'
1381 // for proper printouts
1382 mb1
->size (mb1
->size () - 1);
1383 mb2
->size (mb2
->size () - 1);
1384 mb3
->size (mb3
->size () - 1);
1386 mb4
->size (mb4
->size () - 1);
1387 mb5
->size (mb5
->size () - 1);
1388 mb6
->size (mb6
->size () - 1);
1391 if (this->rs_
.readv (*mb1
, mb1
->total_size () - 1) == -1)
1394 ACE_ERROR_RETURN ((LM_ERROR
,
1395 ACE_TEXT ("(%t) %p\n"),
1396 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1399 #else /* ACE_HAS_WIN32_OVERLAPPED_IO */
1401 // Try to read more chunks
1402 size_t blksize
= ( complete_message_length
> BUFSIZ
) ?
1403 complete_message_length
: BUFSIZ
;
1405 ACE_Message_Block
*mb
= 0;
1407 // We allocate +1 only for proper printing - we can just set the last byte
1408 // to '\0' before printing out
1410 ACE_Message_Block (blksize
+ 1),
1414 if (this->rs_
.read (*mb
, mb
->size () - 1) == -1)
1417 #if defined (ACE_WIN32)
1418 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
1419 // a 0-byte read as we would if underlying calls used WSARecv.
1420 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
1421 ACE_ERROR_RETURN ((LM_DEBUG
,
1422 ACE_TEXT ("(%t) Client %d, peer closed\n"),
1425 #endif /* ACE_WIN32 */
1426 ACE_ERROR_RETURN ((LM_ERROR
,
1427 ACE_TEXT ("(%t) Client %d, %p\n"),
1432 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
1440 Client::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
)
1443 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1445 ACE_Message_Block
& mb
= result
.message_block ();
1451 ACE_DEBUG ((LM_DEBUG
,
1452 ACE_TEXT ("(%t) **** Client %d: handle_write_stream() ****\n"),
1454 ACE_DEBUG ((LM_DEBUG
,
1455 ACE_TEXT ("%s = %B\n"),
1456 ACE_TEXT ("bytes_to_write"),
1457 result
.bytes_to_write ()));
1458 ACE_DEBUG ((LM_DEBUG
,
1459 ACE_TEXT ("%s = %d\n"),
1460 ACE_TEXT ("handle"),
1462 ACE_DEBUG ((LM_DEBUG
,
1463 ACE_TEXT ("%s = %B\n"),
1464 ACE_TEXT ("bytes_transfered"),
1465 result
.bytes_transferred ()));
1466 ACE_DEBUG ((LM_DEBUG
,
1467 ACE_TEXT ("%s = %@\n"),
1470 ACE_DEBUG ((LM_DEBUG
,
1471 ACE_TEXT ("%s = %d\n"),
1472 ACE_TEXT ("success"),
1473 result
.success ()));
1474 ACE_DEBUG ((LM_DEBUG
,
1475 ACE_TEXT ("%s = %@\n"),
1476 ACE_TEXT ("completion_key"),
1477 result
.completion_key ()));
1478 ACE_DEBUG ((LM_DEBUG
,
1479 ACE_TEXT ("%s = %d\n"),
1483 #if defined (ACE_WIN32)
1484 size_t bytes_transferred
= result
.bytes_transferred ();
1486 for (ACE_Message_Block
* mb_i
= &mb
;
1487 (mb_i
!= 0) && (bytes_transferred
> 0);
1488 mb_i
= mb_i
->cont ())
1490 // write 0 at string end for proper printout (if end of mb,
1492 mb_i
->rd_ptr()[0] = '\0';
1494 size_t len
= mb_i
->rd_ptr () - mb_i
->base ();
1496 // move rd_ptr backwards as required for printout
1497 if (len
>= bytes_transferred
)
1499 mb_i
->rd_ptr (0 - bytes_transferred
);
1500 bytes_transferred
= 0;
1504 mb_i
->rd_ptr (0 - len
);
1505 bytes_transferred
-= len
;
1509 ACE_DEBUG ((LM_DEBUG
,
1510 ACE_TEXT ("%s%d = %s\n"),
1511 ACE_TEXT ("message_block, part "),
1515 #else /* ACE_WIN32 */
1516 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1517 mb
.rd_ptr()[0] = '\0';
1518 // move rd_ptr backwards as required for printout
1519 mb
.rd_ptr (- result
.bytes_transferred ());
1520 ACE_DEBUG ((LM_DEBUG
,
1521 ACE_TEXT ("%s = %s\n"),
1522 ACE_TEXT ("message_block"),
1524 #endif /* ACE_WIN32 */
1526 ACE_DEBUG ((LM_DEBUG
,
1527 ACE_TEXT ("**** end of message ****************\n")));
1529 else if (result
.error () != 0)
1531 ACE_Log_Priority prio
;
1532 #if defined (ACE_WIN32)
1533 if (result
.error () == ERROR_OPERATION_ABORTED
)
1536 if (result
.error () == ECANCELED
)
1538 #endif /* ACE_WIN32 */
1541 ACE_Log_Msg::instance ()->errnum (result
.error ());
1542 ACE_Log_Msg::instance ()->log (prio
,
1543 ACE_TEXT ("(%t) Client %d; %p\n"),
1545 ACE_TEXT ("write"));
1547 else if (loglevel
> 0)
1549 ACE_DEBUG ((LM_DEBUG
,
1550 ACE_TEXT ("(%t) Client %d: wrote %B bytes ok\n"),
1552 result
.bytes_transferred ()));
1557 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1559 this->total_snd_
+= result
.bytes_transferred ();
1560 if (this->total_snd_
>= xfer_limit
)
1562 ACE_DEBUG ((LM_DEBUG
,
1563 ACE_TEXT ("(%t) Client %d sent %B, limit %B\n"),
1564 this->id_
, this->total_snd_
, xfer_limit
));
1567 if (duplex
!= 0) // full duplex, continue write
1569 if ((this->total_snd_
- this->total_rcv_
) < 1024*32 ) //flow control
1570 this->initiate_write_stream ();
1572 else // half-duplex read reply, after read we will start write
1573 this->initiate_read_stream ();
1577 if (this->io_count_
> 0)
1584 Client::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
1587 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1589 ACE_Message_Block
& mb
= result
.message_block ();
1595 ACE_DEBUG ((LM_DEBUG
,
1596 ACE_TEXT ("(%t) **** Client %d: handle_read_stream() ****\n"),
1598 ACE_DEBUG ((LM_DEBUG
,
1599 ACE_TEXT ("%s = %B\n"),
1600 ACE_TEXT ("bytes_to_read"),
1601 result
.bytes_to_read ()));
1602 ACE_DEBUG ((LM_DEBUG
,
1603 ACE_TEXT ("%s = %d\n"),
1604 ACE_TEXT ("handle"),
1606 ACE_DEBUG ((LM_DEBUG
,
1607 ACE_TEXT ("%s = %B\n"),
1608 ACE_TEXT ("bytes_transfered"),
1609 result
.bytes_transferred ()));
1610 ACE_DEBUG ((LM_DEBUG
,
1611 ACE_TEXT ("%s = %@\n"),
1614 ACE_DEBUG ((LM_DEBUG
,
1615 ACE_TEXT ("%s = %d\n"),
1616 ACE_TEXT ("success"),
1617 result
.success ()));
1618 ACE_DEBUG ((LM_DEBUG
,
1619 ACE_TEXT ("%s = %@\n"),
1620 ACE_TEXT ("completion_key"),
1621 result
.completion_key ()));
1622 ACE_DEBUG ((LM_DEBUG
,
1623 ACE_TEXT ("%s = %d\n"),
1627 #if defined (ACE_WIN32)
1629 for (ACE_Message_Block
* mb_i
= &mb
;
1631 mb_i
= mb_i
->cont ())
1634 // write 0 at string end for proper printout
1635 mb_i
->wr_ptr()[0] = '\0';
1637 ACE_DEBUG ((LM_DEBUG
,
1638 ACE_TEXT ("%s%d = %s\n"),
1639 ACE_TEXT ("message_block, part "),
1643 #else /* ACE_WIN32 */
1644 // write 0 at string end for proper printout
1645 mb
.rd_ptr()[result
.bytes_transferred ()] = '\0'; // for proper printout
1646 ACE_DEBUG ((LM_DEBUG
,
1647 ACE_TEXT ("%s = %s\n"),
1648 ACE_TEXT ("message_block"),
1650 #endif /* ACE_WIN32 */
1652 ACE_DEBUG ((LM_DEBUG
,
1653 ACE_TEXT ("**** end of message ****************\n")));
1655 else if (result
.error () != 0)
1657 ACE_Log_Priority prio
;
1658 #if defined (ACE_WIN32)
1659 if (result
.error () == ERROR_OPERATION_ABORTED
)
1662 if (result
.error () == ECANCELED
)
1664 #endif /* ACE_WIN32 */
1667 ACE_Log_Msg::instance ()->errnum (result
.error ());
1668 ACE_Log_Msg::instance ()->log (prio
,
1669 ACE_TEXT ("(%t) Client %d; %p\n"),
1673 else if (loglevel
> 0)
1675 ACE_DEBUG ((LM_DEBUG
,
1676 ACE_TEXT ("(%t) Client %d: read %B bytes ok\n"),
1678 result
.bytes_transferred ()));
1683 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1685 this->total_rcv_
+= result
.bytes_transferred ();
1687 if (duplex
!= 0 || this->stop_writing_
) // full duplex, continue read
1688 this->initiate_read_stream ();
1689 else // half-duplex write, after write we will start read
1690 this->initiate_write_stream ();
1694 if (this->io_count_
> 0)
1700 // *************************************************************
1701 // Configuration helpers
1702 // *************************************************************
1704 print_usage (int /* argc */, ACE_TCHAR
*argv
[])
1708 ACE_TEXT ("\nusage: %s")
1709 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1710 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1711 ACE_TEXT ("\n a AIOCB")
1712 ACE_TEXT ("\n i SIG")
1713 ACE_TEXT ("\n c CB")
1714 ACE_TEXT ("\n d default")
1715 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1716 ACE_TEXT ("\n-h <host> for Client mode")
1717 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1718 ACE_TEXT ("\n-p <port to listen/connect>")
1719 ACE_TEXT ("\n-c <number of client instances>")
1720 ACE_TEXT ("\n-b run client and server at the same time")
1721 ACE_TEXT ("\n f file")
1722 ACE_TEXT ("\n c console")
1723 ACE_TEXT ("\n-v log level")
1724 ACE_TEXT ("\n 0 - log errors and highlights")
1725 ACE_TEXT ("\n 1 - log level 0 plus progress information")
1726 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
1727 ACE_TEXT ("\n-x max transfer byte count per Client")
1728 ACE_TEXT ("\n-u show this message")
1736 set_proactor_type (const ACE_TCHAR
*ptype
)
1741 switch (ACE_OS::ace_toupper (*ptype
))
1744 proactor_type
= DEFAULT
;
1747 proactor_type
= AIOCB
;
1750 proactor_type
= SIG
;
1752 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
1756 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
1764 parse_args (int argc
, ACE_TCHAR
*argv
[])
1766 // First, set up all the defaults then let any args change them.
1767 both
= 1; // client and server simultaneosly
1768 duplex
= 1; // full duplex is on
1769 host
= ACE_LOCALHOST
; // server to connect
1770 port
= ACE_DEFAULT_SERVER_PORT
; // port to connect/listen
1771 max_aio_operations
= 512; // POSIX Proactor params
1772 proactor_type
= DEFAULT
; // Proactor type = default
1773 threads
= 3; // size of Proactor thread pool
1774 clients
= 10; // number of clients
1775 loglevel
= 0; // log level : only errors and highlights
1776 // Default transfer limit 50 messages per Sender
1777 xfer_limit
= 50 * ACE_OS::strlen (complete_message
);
1779 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
1780 # if defined (ACE_LINUX)
1784 if (argc
== 1) // no arguments , so one button test
1787 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
1790 while ((c
= get_opt ()) != EOF
)
1794 case 'x': // xfer limit
1795 xfer_limit
= static_cast<size_t> (ACE_OS::atoi (get_opt
.opt_arg ()));
1796 if (xfer_limit
== 0)
1797 xfer_limit
= 1; // Bare minimum.
1799 case 'b': // both client and server
1802 case 'v': // log level
1803 loglevel
= ACE_OS::atoi (get_opt
.opt_arg ());
1806 duplex
= ACE_OS::atoi (get_opt
.opt_arg ());
1808 case 'h': // host for sender
1809 host
= get_opt
.opt_arg ();
1811 case 'p': // port number
1812 port
= ACE_OS::atoi (get_opt
.opt_arg ());
1814 case 'n': // thread pool size
1815 threads
= ACE_OS::atoi (get_opt
.opt_arg ());
1817 case 'c': // number of clients
1818 clients
= ACE_OS::atoi (get_opt
.opt_arg ());
1819 if (clients
> MAX_CLIENTS
)
1820 clients
= MAX_CLIENTS
;
1822 case 'o': // max number of aio for proactor
1823 max_aio_operations
= ACE_OS::atoi (get_opt
.opt_arg ());
1825 case 't': // Proactor Type
1826 if (set_proactor_type (get_opt
.opt_arg ()))
1828 return print_usage (argc
, argv
);
1831 return print_usage (argc
, argv
);
1839 run_main (int argc
, ACE_TCHAR
*argv
[])
1841 ACE_START_TEST (ACE_TEXT ("Proactor_Test"));
1843 if (::parse_args (argc
, argv
) == -1)
1846 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
1847 disable_signal (SIGPIPE
, SIGPIPE
);
1852 if (task1
.start (threads
, proactor_type
, max_aio_operations
) == 0)
1854 Acceptor
acceptor (&test
);
1855 Connector
connector (&test
);
1856 ACE_INET_Addr
addr (port
);
1860 if (both
!= 0 || host
== 0) // Acceptor
1862 // Simplify, initial read with zero size
1863 if (acceptor
.open (addr
, 0, 1) == 0)
1867 if (both
!= 0 || host
!= 0)
1870 host
= ACE_LOCALHOST
;
1872 if (addr
.set (port
, host
, 1, addr
.get_type ()) == -1)
1873 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), host
));
1875 rc
+= connector
.start (addr
, clients
);
1878 // Wait a few seconds to let things get going, then poll til
1879 // all sessions are done. Note that when we exit this scope, the
1880 // Acceptor and Connector will be destroyed, which should prevent
1881 // further connections and also test how well destroyed handlers
1885 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
1886 while (!test
.testing_done ())
1891 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
1902 run_main (int, ACE_TCHAR
*[])
1904 ACE_START_TEST (ACE_TEXT ("Proactor_Test"));
1906 ACE_DEBUG ((LM_INFO
,
1907 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
1908 ACE_TEXT ("Proactor_Test will not be run.\n")));
1915 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */