1 // ============================================================================
3 * @file Proactor_Test_IPV6.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that does various asynchronous
9 * @author Alexander Libman <alibman@baltimore.com>
10 * @author Brian Buesker <bbuesker@qualcomm.com> - modified for IPv6 operation
12 // ============================================================================
14 #include "test_config.h"
16 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
17 // This only works on Win32 platforms and on Unix platforms
18 // supporting POSIX aio calls.
20 #include "ace/Signal.h"
22 #include "ace/Service_Config.h"
23 #include "ace/INET_Addr.h"
24 #include "ace/SOCK_Connector.h"
25 #include "ace/SOCK_Acceptor.h"
26 #include "ace/SOCK_Stream.h"
27 #include "ace/Object_Manager.h"
28 #include "ace/Get_Opt.h"
30 #include "ace/Proactor.h"
31 #include "ace/Asynch_Acceptor.h"
32 #include "ace/Asynch_Connector.h"
34 #include "ace/Thread_Semaphore.h"
35 #include "ace/OS_NS_ctype.h"
36 #include "ace/OS_NS_errno.h"
37 #include "ace/OS_NS_signal.h"
38 #include "ace/OS_NS_string.h"
39 #include "ace/OS_NS_unistd.h"
40 #include "ace/OS_NS_sys_socket.h"
41 #include "ace/os_include/netinet/os_tcp.h"
43 #include "ace/Atomic_Op.h"
44 #include "ace/Synch_Traits.h"
46 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
48 # include "ace/WIN32_Proactor.h"
50 #elif defined (ACE_HAS_AIO_CALLS)
52 # include "ace/POSIX_Proactor.h"
53 # include "ace/POSIX_CB_Proactor.h"
55 #endif /* defined (ACE_HAS_WIN32_OVERLAPPED_IO) */
57 #include "Proactor_Test.h"
59 // Proactor Type (UNIX only, Win32 ignored)
60 using ProactorType
= enum { DEFAULT
= 0, AIOCB
, SIG
, CB
};
61 static ProactorType proactor_type
= DEFAULT
;
63 // POSIX : > 0 max number aio operations proactor,
64 static size_t max_aio_operations
= 0;
66 // both: 0 run client or server / depends on host
67 // != 0 run client and server
70 // Host that we're connecting to.
71 static const ACE_TCHAR
*host
= 0;
73 // number of Client instances
74 static int clients
= 1;
75 const int MAX_CLIENTS
= 1000;
76 const int MAX_SERVERS
= 1000;
78 // duplex mode: == 0 half-duplex
80 static int duplex
= 0;
82 // number threads in the Proactor thread pool
83 static int threads
= 1;
85 // Port that we're receiving connections on.
86 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
89 static int loglevel
; // 0 full , 1 only errors
91 static size_t xfer_limit
; // Number of bytes for Client to send.
93 static char complete_message
[] =
96 "Accept-Language: C++\r\n"
97 "Accept-Encoding: gzip, deflate\r\n"
98 "User-Agent: Proactor_Test_IPv6/1.0 (non-compatible)\r\n"
99 "Connection: Keep-Alive\r\n"
105 LogLocker () { ACE_LOG_MSG
->acquire (); }
106 virtual ~LogLocker () { ACE_LOG_MSG
->release (); }
109 #if defined (ACE_HAS_IPV6)
111 // Function to remove signals from the signal mask.
113 disable_signal (int sigmin
, int sigmax
)
115 #if !defined (ACE_LACKS_UNIX_SIGNALS)
117 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
118 ACE_ERROR ((LM_ERROR
,
119 ACE_TEXT ("Error: (%P|%t):%p\n"),
120 ACE_TEXT ("sigemptyset failed")));
122 for (int i
= sigmin
; i
<= sigmax
; i
++)
123 ACE_OS::sigaddset (&signal_set
, i
);
125 // Put the <signal_set>.
126 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
127 // In multi-threaded application this is not POSIX compliant
128 // but let's leave it just in case.
129 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
131 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
132 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
133 ACE_ERROR_RETURN ((LM_ERROR
,
134 ACE_TEXT ("Error: (%P|%t): %p\n"),
135 ACE_TEXT ("SIG_BLOCK failed")),
138 ACE_UNUSED_ARG (sigmin
);
139 ACE_UNUSED_ARG (sigmax
);
140 #endif /* ACE_LACKS_UNIX_SIGNALS */
145 // *************************************************************
146 // MyTask is ACE_Task resposible for :
147 // 1. creation and deletion of
148 // Proactor and Proactor thread pool
149 // 2. running Proactor event loop
150 // *************************************************************
155 * MyTask plays role for Proactor threads pool
157 * MyTask is ACE_Task resposible for:
158 * 1. Creation and deletion of Proactor and Proactor thread pool
159 * 2. Running Proactor event loop
161 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
166 sem_ ((unsigned int) 0),
171 (void) this->stop ();
172 (void) this->delete_proactor();
177 int start (int num_threads
,
178 ProactorType type_proactor
,
183 int create_proactor (ProactorType type_proactor
,
185 int delete_proactor ();
187 ACE_SYNCH_RECURSIVE_MUTEX lock_
;
188 ACE_Thread_Semaphore sem_
;
189 ACE_Proactor
* proactor_
;
193 MyTask::create_proactor (ProactorType type_proactor
, size_t max_op
)
195 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
200 ACE_TEST_ASSERT (this->proactor_
== 0);
202 #if defined (ACE_WIN32)
204 ACE_UNUSED_ARG (type_proactor
);
205 ACE_UNUSED_ARG (max_op
);
207 ACE_WIN32_Proactor
*proactor_impl
= 0;
209 ACE_NEW_RETURN (proactor_impl
,
213 ACE_DEBUG ((LM_DEBUG
,
214 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
216 #elif defined (ACE_HAS_AIO_CALLS)
218 ACE_POSIX_Proactor
* proactor_impl
= 0;
220 switch (type_proactor
)
223 ACE_NEW_RETURN (proactor_impl
,
224 ACE_POSIX_AIOCB_Proactor (max_op
),
226 ACE_DEBUG ((LM_DEBUG
,
227 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
230 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
232 ACE_NEW_RETURN (proactor_impl
,
233 ACE_POSIX_SIG_Proactor (max_op
),
235 ACE_DEBUG ((LM_DEBUG
,
236 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
238 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
240 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
242 ACE_NEW_RETURN (proactor_impl
,
243 ACE_POSIX_CB_Proactor (max_op
),
245 ACE_DEBUG ((LM_DEBUG
,
246 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
248 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
251 ACE_DEBUG ((LM_DEBUG
,
252 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
256 #endif // (ACE_WIN32)
258 // always delete implementation 1 , not !(proactor_impl == 0)
259 ACE_NEW_RETURN (this->proactor_
,
260 ACE_Proactor (proactor_impl
, 1 ),
262 // Set new singleton and delete it in close_singleton()
263 ACE_Proactor::instance (this->proactor_
, 1);
268 MyTask::delete_proactor ()
270 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
275 ACE_DEBUG ((LM_DEBUG
,
276 ACE_TEXT ("(%t) Delete Proactor\n")));
278 ACE_Proactor::close_singleton ();
285 MyTask::start (int num_threads
,
286 ProactorType type_proactor
,
289 if (this->create_proactor (type_proactor
, max_op
) == -1)
290 ACE_ERROR_RETURN ((LM_ERROR
,
292 ACE_TEXT ("unable to create proactor")),
295 if (this->activate (THR_NEW_LWP
, num_threads
) == -1)
296 ACE_ERROR_RETURN ((LM_ERROR
,
298 ACE_TEXT ("unable to activate thread pool")),
301 for (; num_threads
> 0; num_threads
--)
313 if (this->proactor_
!= 0)
315 ACE_DEBUG ((LM_DEBUG
,
316 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
318 ACE_Proactor::end_event_loop ();
321 if (this->wait () == -1)
322 ACE_ERROR ((LM_ERROR
,
324 ACE_TEXT ("unable to stop thread pool")));
332 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask started\n")));
334 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
335 disable_signal (SIGPIPE
, SIGPIPE
);
337 // signal that we are ready
340 ACE_Proactor::run_event_loop ();
342 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask finished\n")));
345 #endif /* ACE_HAS_IPV6 */
347 // TestData collects and reports on test-related transfer and connection
353 bool testing_done ();
354 Server
*server_up ();
355 Client
*client_up ();
356 void server_done (Server
*s
);
357 void client_done (Client
*c
);
364 // Track number of sessions that report start, and those that report
365 // their end (and stats).
366 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_up_
;
367 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_down_
;
369 // Total read and write bytes for all sessions.
370 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_cnt_
;
371 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_cnt_
;
372 // Total read and write operations issues for all sessions.
373 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_ops_
;
374 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_ops_
;
375 } servers_
, clients_
;
377 ACE_SYNCH_MUTEX list_lock_
;
378 Server
*server_list_
[MAX_SERVERS
];
379 Client
*client_list_
[MAX_CLIENTS
];
382 TestData::TestData ()
385 for (i
= 0; i
< MAX_SERVERS
; ++i
)
386 this->server_list_
[i
] = 0;
387 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
388 this->client_list_
[i
] = 0;
392 TestData::testing_done ()
394 int svr_up
= this->servers_
.sessions_up_
.value ();
395 int svr_dn
= this->servers_
.sessions_down_
.value ();
396 int clt_up
= this->clients_
.sessions_up_
.value ();
397 int clt_dn
= this->clients_
.sessions_down_
.value ();
399 if (svr_up
== 0 && clt_up
== 0) // No connections up yet
402 return (svr_dn
>= svr_up
&& clt_dn
>= clt_up
);
406 TestData::server_up ()
408 ++this->servers_
.sessions_up_
;
409 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
411 for (int i
= 0; i
< MAX_SERVERS
; ++i
)
413 if (this->server_list_
[i
] == 0)
415 ACE_NEW_RETURN (this->server_list_
[i
], Server (this, i
), 0);
416 ACE_DEBUG ((LM_DEBUG
,
417 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
419 this->servers_
.sessions_up_
.value (),
420 this->servers_
.sessions_down_
.value ()));
421 return this->server_list_
[i
];
428 TestData::client_up ()
430 ++this->clients_
.sessions_up_
;
431 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
433 for (int i
= 0; i
< MAX_CLIENTS
; ++i
)
435 if (this->client_list_
[i
] == 0)
437 ACE_NEW_RETURN (this->client_list_
[i
], Client (this, i
), 0);
438 ACE_DEBUG ((LM_DEBUG
,
439 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
441 this->clients_
.sessions_up_
.value (),
442 this->clients_
.sessions_down_
.value ()));
443 return this->client_list_
[i
];
450 TestData::server_done (Server
*s
)
452 this->servers_
.w_cnt_
+= s
->get_total_snd ();
453 this->servers_
.r_cnt_
+= s
->get_total_rcv ();
454 this->servers_
.w_ops_
+= s
->get_total_w ();
455 this->servers_
.r_ops_
+= s
->get_total_r ();
456 ++this->servers_
.sessions_down_
;
457 ACE_DEBUG ((LM_DEBUG
,
458 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
460 this->servers_
.sessions_up_
.value (),
461 this->servers_
.sessions_down_
.value ()));
463 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
465 for (i
= 0; i
< MAX_SERVERS
; ++i
)
467 if (this->server_list_
[i
] == s
)
470 ACE_ERROR ((LM_ERROR
,
471 ACE_TEXT ("Server %d is pos %d in list\n"),
474 this->server_list_
[i
] = 0;
478 if (i
>= MAX_SERVERS
)
479 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Server %@ done but not listed\n"), s
));
485 TestData::client_done (Client
*c
)
487 this->clients_
.w_cnt_
+= c
->get_total_snd ();
488 this->clients_
.r_cnt_
+= c
->get_total_rcv ();
489 this->clients_
.w_ops_
+= c
->get_total_w ();
490 this->clients_
.r_ops_
+= c
->get_total_r ();
491 ++this->clients_
.sessions_down_
;
492 ACE_DEBUG ((LM_DEBUG
,
493 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
495 this->clients_
.sessions_up_
.value (),
496 this->clients_
.sessions_down_
.value ()));
498 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
500 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
502 if (this->client_list_
[i
] == c
)
505 ACE_ERROR ((LM_ERROR
,
506 ACE_TEXT ("Client %d is pos %d in list\n"),
509 this->client_list_
[i
] = 0;
513 if (i
>= MAX_CLIENTS
)
514 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Client %@ done but not listed\n"), c
));
520 TestData::stop_all ()
524 // Lock and cancel everything. Then release the lock, possibly allowing
525 // cleanups, then grab it again and delete all Servers and Clients.
527 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
528 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
530 if (this->client_list_
[i
] != 0)
531 this->client_list_
[i
]->cancel ();
534 for (i
= 0; i
< MAX_SERVERS
; ++i
)
536 if (this->server_list_
[i
] != 0)
537 this->server_list_
[i
]->cancel ();
541 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
542 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
544 if (this->client_list_
[i
] != 0)
545 delete this->client_list_
[i
];
548 for (i
= 0; i
< MAX_SERVERS
; ++i
)
550 if (this->server_list_
[i
] != 0)
551 delete this->server_list_
[i
];
560 ACE_TCHAR bufs
[256];
561 ACE_TCHAR bufr
[256];
563 ACE_OS::snprintf (bufs
, 256,
564 ACE_SIZE_T_FORMAT_SPECIFIER
565 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
566 this->clients_
.w_cnt_
.value (),
567 this->clients_
.w_ops_
.value ());
569 ACE_OS::snprintf (bufr
, 256,
570 ACE_SIZE_T_FORMAT_SPECIFIER
571 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
572 this->clients_
.r_cnt_
.value (),
573 this->clients_
.r_ops_
.value ());
575 ACE_DEBUG ((LM_DEBUG
,
576 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
580 ACE_OS::snprintf (bufs
, 256,
581 ACE_SIZE_T_FORMAT_SPECIFIER
582 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
583 this->servers_
.w_cnt_
.value (),
584 this->servers_
.w_ops_
.value ());
586 ACE_OS::snprintf (bufr
, 256,
587 ACE_SIZE_T_FORMAT_SPECIFIER
588 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
589 this->servers_
.r_cnt_
.value (),
590 this->servers_
.r_ops_
.value ());
592 ACE_DEBUG ((LM_DEBUG
,
593 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
597 if (this->clients_
.w_cnt_
.value () == 0 ||
598 this->clients_
.r_cnt_
.value () == 0 ||
599 this->servers_
.w_cnt_
.value () == 0 ||
600 this->servers_
.r_cnt_
.value () == 0 )
601 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("It appears that this test didn't ")
602 ACE_TEXT ("really do anything. Something is very wrong.\n")));
605 class Acceptor
: public ACE_Asynch_Acceptor
<Server
>
608 Acceptor (TestData
*tester
);
609 ~Acceptor () override
;
611 // Virtual from ACE_Asynch_Acceptor
612 Server
*make_handler () override
;
618 // *************************************************************
619 Acceptor::Acceptor (TestData
*tester
)
624 Acceptor::~Acceptor ()
630 Acceptor::make_handler ()
632 return this->tester_
->server_up ();
635 // ***************************************************
638 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
641 Server::Server (TestData
*tester
, int id
)
644 handle_ (ACE_INVALID_HANDLE
),
656 ACE_DEBUG ((LM_DEBUG
,
657 ACE_TEXT ("(%t) Server %d dtor; %d sends (%d bytes); ")
658 ACE_TEXT ("%d recvs (%d bytes)\n"),
660 this->total_w_
, this->total_snd_
,
661 this->total_r_
, this->total_rcv_
));
662 if (this->io_count_
!= 0)
663 ACE_ERROR ((LM_WARNING
,
664 ACE_TEXT ("(%t) Server %d deleted with ")
665 ACE_TEXT ("%d I/O outstanding\n"),
669 // This test bounces data back and forth between Clients and Servers.
670 // Therefore, if there was significantly more data in one direction, that's
671 // a problem. Remember, the byte counts are unsigned values.
672 int issue_data_warning
= 0;
673 if (this->total_snd_
> this->total_rcv_
)
675 if (this->total_rcv_
== 0)
676 issue_data_warning
= 1;
677 else if (this->total_snd_
/ this->total_rcv_
> 2)
678 issue_data_warning
= 1;
682 if (this->total_snd_
== 0)
683 issue_data_warning
= 1;
684 else if (this->total_rcv_
/ this->total_snd_
> 2)
685 issue_data_warning
= 1;
687 if (issue_data_warning
)
688 ACE_DEBUG ((LM_WARNING
,
689 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
691 if (this->tester_
!= 0)
692 this->tester_
->server_done (this);
694 if (this->handle_
!= ACE_INVALID_HANDLE
)
695 ACE_OS::closesocket (this->handle_
);
698 this->handle_
= ACE_INVALID_HANDLE
;
704 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
706 this->flg_cancel_
= 1;
714 Server::addresses (const ACE_INET_Addr
& peer
, const ACE_INET_Addr
&)
717 if (0 == peer
.addr_to_string (str
, sizeof (str
) / sizeof (ACE_TCHAR
)))
718 ACE_DEBUG ((LM_DEBUG
,
719 ACE_TEXT ("(%t) Server %d connection from %s\n"),
723 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) Server %d %p\n"),
725 ACE_TEXT ("addr_to_string")));
731 Server::open (ACE_HANDLE handle
, ACE_Message_Block
&)
734 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
736 // Don't buffer serial sends.
737 this->handle_
= handle
;
739 ACE_SOCK_Stream
option_setter (handle
);
740 if (-1 == option_setter
.set_option (ACE_IPPROTO_TCP
,
744 ACE_ERROR ((LM_ERROR
, "%p\n", "set_option"));
746 if (this->ws_
.open (*this, this->handle_
) == -1)
747 ACE_ERROR ((LM_ERROR
,
748 ACE_TEXT ("(%t) %p\n"),
749 ACE_TEXT ("Server::ACE_Asynch_Write_Stream::open")));
750 else if (this->rs_
.open (*this, this->handle_
) == -1)
751 ACE_ERROR ((LM_ERROR
,
752 ACE_TEXT ("(%t) %p\n"),
753 ACE_TEXT ("Server::ACE_Asynch_Read_Stream::open")));
755 this->initiate_read_stream ();
757 if (this->io_count_
> 0)
764 Server::initiate_read_stream ()
766 if (this->flg_cancel_
!= 0 || this->handle_
== ACE_INVALID_HANDLE
)
769 ACE_Message_Block
*mb
= 0;
771 ACE_Message_Block (1024), //BUFSIZ + 1),
775 if (this->rs_
.read (*mb
, mb
->size () - 1) == -1)
778 #if defined (ACE_WIN32)
779 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
780 // a 0-byte read as we would if underlying calls used WSARecv.
781 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
782 ACE_ERROR_RETURN ((LM_DEBUG
,
783 ACE_TEXT ("(%t) Server %d, peer closed\n"),
786 #endif /* ACE_WIN32 */
787 ACE_ERROR_RETURN ((LM_ERROR
,
788 ACE_TEXT ("(%t) Server %d, %p\n"),
800 Server::initiate_write_stream (ACE_Message_Block
&mb
, size_t nbytes
)
802 if (this->flg_cancel_
!= 0 || this->handle_
== ACE_INVALID_HANDLE
)
811 ACE_ERROR_RETURN((LM_ERROR
,
812 ACE_TEXT ("(%t) Server::ACE_Asynch_Write_Stream::write nbytes <0 ")),
816 if (this->ws_
.write (mb
, nbytes
) == -1)
819 #if defined (ACE_WIN32)
820 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
821 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
822 ACE_ERROR_RETURN ((LM_DEBUG
,
823 ACE_TEXT ("(%t) Server %d, peer gone\n"),
826 #endif /* ACE_WIN32 */
827 ACE_ERROR_RETURN((LM_ERROR
,
828 ACE_TEXT ("(%t) Server %d, %p\n"),
840 Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
843 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
845 ACE_Message_Block
& mb
= result
.message_block ();
848 mb
.rd_ptr ()[result
.bytes_transferred ()] = '\0';
854 ACE_DEBUG ((LM_DEBUG
,
855 ACE_TEXT ("(%t) **** Server %d: handle_read_stream() ****\n"),
857 ACE_DEBUG ((LM_DEBUG
,
858 ACE_TEXT ("%s = %d\n"),
859 ACE_TEXT ("bytes_to_read"),
860 result
.bytes_to_read ()));
861 ACE_DEBUG ((LM_DEBUG
,
862 ACE_TEXT ("%s = %d\n"),
865 ACE_DEBUG ((LM_DEBUG
,
866 ACE_TEXT ("%s = %d\n"),
867 ACE_TEXT ("bytes_transfered"),
868 result
.bytes_transferred ()));
869 ACE_DEBUG ((LM_DEBUG
,
870 ACE_TEXT ("%s = %@\n"),
873 ACE_DEBUG ((LM_DEBUG
,
874 ACE_TEXT ("%s = %d\n"),
875 ACE_TEXT ("success"),
877 ACE_DEBUG ((LM_DEBUG
,
878 ACE_TEXT ("%s = %@\n"),
879 ACE_TEXT ("completion_key"),
880 result
.completion_key ()));
881 ACE_DEBUG ((LM_DEBUG
,
882 ACE_TEXT ("%s = %d\n"),
885 ACE_DEBUG ((LM_DEBUG
,
886 ACE_TEXT ("%s = %s\n"),
887 ACE_TEXT ("message_block"),
889 ACE_DEBUG ((LM_DEBUG
,
890 ACE_TEXT ("**** end of message ****************\n")));
892 else if (result
.error () != 0)
894 ACE_Log_Priority prio
;
895 #if defined (ACE_WIN32)
896 if (result
.error () == ERROR_OPERATION_ABORTED
)
899 if (result
.error () == ECANCELED
)
901 #endif /* ACE_WIN32 */
904 ACE_Log_Msg::instance ()->errnum (result
.error ());
905 ACE_Log_Msg::instance ()->log (prio
,
906 ACE_TEXT ("(%t) Server %d; %p\n"),
910 else if (loglevel
> 0)
912 ACE_DEBUG ((LM_DEBUG
,
913 ACE_TEXT ("(%t) Server %d: read %d bytes\n"),
915 result
.bytes_transferred ()));
918 if (result
.error () == 0 && result
.bytes_transferred () > 0)
920 this->total_rcv_
+= result
.bytes_transferred ();
922 if (this->initiate_write_stream (mb
,
923 result
.bytes_transferred ()) == 0)
925 if (duplex
!= 0) // Initiate new read from the stream.
926 this->initiate_read_stream ();
933 if (this->io_count_
> 0)
940 Server::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
)
943 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
945 ACE_Message_Block
& mb
= result
.message_block ();
951 //mb.rd_ptr () [0] = '\0';
952 mb
.rd_ptr (mb
.rd_ptr () - result
.bytes_transferred ());
954 ACE_DEBUG ((LM_DEBUG
,
955 ACE_TEXT ("(%t) **** Server %d: handle_write_stream() ****\n"),
957 ACE_DEBUG ((LM_DEBUG
,
958 ACE_TEXT ("%s = %d\n"),
959 ACE_TEXT ("bytes_to_write"),
960 result
.bytes_to_write ()));
961 ACE_DEBUG ((LM_DEBUG
,
962 ACE_TEXT ("%s = %d\n"),
965 ACE_DEBUG ((LM_DEBUG
,
966 ACE_TEXT ("%s = %d\n"),
967 ACE_TEXT ("bytes_transfered"),
968 result
.bytes_transferred ()));
969 ACE_DEBUG ((LM_DEBUG
,
970 ACE_TEXT ("%s = %@\n"),
973 ACE_DEBUG ((LM_DEBUG
,
974 ACE_TEXT ("%s = %d\n"),
975 ACE_TEXT ("success"),
977 ACE_DEBUG ((LM_DEBUG
,
978 ACE_TEXT ("%s = %@\n"),
979 ACE_TEXT ("completion_key"),
980 result
.completion_key ()));
981 ACE_DEBUG ((LM_DEBUG
,
982 ACE_TEXT ("%s = %d\n"),
985 ACE_DEBUG ((LM_DEBUG
,
986 ACE_TEXT ("%s = %s\n"),
987 ACE_TEXT ("message_block"),
989 ACE_DEBUG ((LM_DEBUG
,
990 ACE_TEXT ("**** end of message ****************\n")));
992 else if (result
.error () != 0)
994 ACE_Log_Priority prio
;
995 #if defined (ACE_WIN32)
996 if (result
.error () == ERROR_OPERATION_ABORTED
)
999 if (result
.error () == ECANCELED
)
1001 #endif /* ACE_WIN32 */
1004 ACE_Log_Msg::instance ()->errnum (result
.error ());
1005 ACE_Log_Msg::instance ()->log (prio
,
1006 ACE_TEXT ("(%t) Server %d; %p\n"),
1008 ACE_TEXT ("write"));
1010 else if (loglevel
> 0)
1012 ACE_DEBUG ((LM_DEBUG
,
1013 ACE_TEXT ("(%t) Server %d: wrote %d bytes ok\n"),
1015 result
.bytes_transferred ()));
1020 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1022 this->total_snd_
+= result
.bytes_transferred ();
1025 this->initiate_read_stream ();
1029 if (this->io_count_
> 0)
1035 // *******************************************
1037 // *******************************************
1039 class Connector
: public ACE_Asynch_Connector
<Client
>
1042 Connector (TestData
*tester
);
1043 ~Connector () override
;
1045 int start (const ACE_INET_Addr
&addr
, int num
);
1047 // Virtual from ACE_Asynch_Connector
1048 Client
*make_handler () override
;
1054 // *************************************************************
1056 Connector::Connector (TestData
*tester
)
1061 Connector::~Connector ()
1067 Connector::make_handler ()
1069 return this->tester_
->client_up ();
1074 Connector::start (const ACE_INET_Addr
& addr
, int num
)
1076 if (num
> MAX_CLIENTS
)
1084 // int open ( int pass_addresses = 0,
1085 // ACE_Proactor *proactor = 0,
1086 // int validate_new_connection = 0 );
1088 if (this->open (1, 0, 1) != 0)
1090 ACE_ERROR ((LM_ERROR
,
1091 ACE_TEXT ("(%t) %p\n"),
1092 ACE_TEXT ("Connector::open failed")));
1096 for (; rc
< num
; rc
++)
1098 ACE_INET_Addr localAddr
;
1099 if (this->connect (addr
, localAddr
) != 0)
1101 ACE_ERROR ((LM_ERROR
,
1102 ACE_TEXT ("(%t) %p\n"),
1103 ACE_TEXT ("Connector::connect failed for IPv6")));
1113 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
1116 Client::Client (TestData
*tester
, int id
)
1119 handle_ (ACE_INVALID_HANDLE
),
1132 ACE_DEBUG ((LM_DEBUG
,
1133 ACE_TEXT ("(%t) Client %d dtor; %d sends (%d bytes); ")
1134 ACE_TEXT ("%d recvs (%d bytes)\n"),
1136 this->total_w_
, this->total_snd_
,
1137 this->total_r_
, this->total_rcv_
));
1138 if (this->io_count_
!= 0)
1139 ACE_ERROR ((LM_WARNING
,
1140 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1144 // This test bounces data back and forth between Clients and Servers.
1145 // Therefore, if there was significantly more data in one direction, that's
1146 // a problem. Remember, the byte counts are unsigned values.
1147 int issue_data_warning
= 0;
1148 if (this->total_snd_
> this->total_rcv_
)
1150 if (this->total_rcv_
== 0)
1151 issue_data_warning
= 1;
1152 else if (this->total_snd_
/ this->total_rcv_
> 2)
1153 issue_data_warning
= 1;
1157 if (this->total_snd_
== 0)
1158 issue_data_warning
= 1;
1159 else if (this->total_rcv_
/ this->total_snd_
> 2)
1160 issue_data_warning
= 1;
1162 if (issue_data_warning
)
1163 ACE_DEBUG ((LM_WARNING
,
1164 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1166 if (this->tester_
!= 0)
1167 this->tester_
->client_done (this);
1170 this->handle_
= ACE_INVALID_HANDLE
;
1171 if (this->handle_
!= ACE_INVALID_HANDLE
)
1173 ACE_OS::closesocket (this->handle_
);
1175 this->handle_
= ACE_INVALID_HANDLE
;
1181 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1183 this->flg_cancel_
= 1;
1184 this->ws_
.cancel ();
1185 this->rs_
.cancel ();
1192 // This must be called with the lock_ held.
1193 ACE_DEBUG ((LM_DEBUG
,
1194 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1195 this->id_
, this->io_count_
));
1196 ACE_OS::shutdown (this->handle_
, ACE_SHUTDOWN_WRITE
);
1197 this->stop_writing_
= 1;
1203 Client::addresses (const ACE_INET_Addr
& peer
, const ACE_INET_Addr
& local
)
1206 char peer_name
[256];
1207 ACE_TCHAR local_str
[256];
1208 ACE_INET_Addr
addr ((u_short
) 0, host
);
1210 // This checks to make sure the peer address given to us matches what
1211 // we expect it to be.
1212 if (0 != peer
.get_host_addr (peer_name
, sizeof (peer_name
)))
1214 if (0 != addr
.get_host_addr (my_name
, sizeof (my_name
)))
1216 if (0 != ACE_OS::strncmp (peer_name
, my_name
, sizeof (my_name
)))
1220 ACE_TEXT ("(%t) Sender %d peer address (%C) does not ")
1221 ACE_TEXT ("match host address (%C)\n"),
1223 peer_name
, my_name
));
1231 ACE_TEXT ("(%t) Sender %d unable to convert host addr\n"),
1238 ACE_ERROR ((LM_ERROR
,
1239 ACE_TEXT ("(%t) Sender %d unable to convert peer addr\n"),
1244 if (0 == local
.addr_to_string (local_str
,
1245 sizeof (local_str
) / sizeof (ACE_TCHAR
)))
1246 ACE_DEBUG ((LM_DEBUG
,
1247 ACE_TEXT ("(%t) Client %d connected on %s\n"),
1251 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) Client %d %p\n"),
1253 ACE_TEXT ("addr_to_string")));
1259 Client::open (ACE_HANDLE handle
, ACE_Message_Block
&)
1262 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1264 // Don't buffer serial sends.
1265 this->handle_
= handle
;
1267 ACE_SOCK_Stream
option_setter (handle
);
1268 if (option_setter
.set_option (ACE_IPPROTO_TCP
,
1272 ACE_ERROR ((LM_ERROR
, "%p\n", "set_option"));
1274 // Open ACE_Asynch_Write_Stream
1275 if (this->ws_
.open (*this, this->handle_
) == -1)
1276 ACE_ERROR ((LM_ERROR
,
1277 ACE_TEXT ("(%t) %p\n"),
1278 ACE_TEXT ("Client::ACE_Asynch_Write_Stream::open")));
1280 // Open ACE_Asynch_Read_Stream
1281 else if (this->rs_
.open (*this, this->handle_
) == -1)
1282 ACE_ERROR ((LM_ERROR
,
1283 ACE_TEXT ("(%t) %p\n"),
1284 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::open")));
1286 else if (this->initiate_write_stream () == 0)
1288 if (duplex
!= 0) // Start an asynchronous read
1289 this->initiate_read_stream ();
1292 if (this->io_count_
> 0)
1299 Client::initiate_write_stream ()
1301 if (this->flg_cancel_
!= 0 ||
1302 this->stop_writing_
||
1303 this->handle_
== ACE_INVALID_HANDLE
)
1306 static const size_t complete_message_length
= ACE_OS::strlen (complete_message
);
1308 #if defined (ACE_WIN32)
1309 ACE_Message_Block
*mb1
= 0,
1313 // No need to allocate +1 for proper printing - the memory includes it already
1314 ACE_NEW_RETURN (mb1
,
1315 ACE_Message_Block ((char *)complete_message
,
1316 complete_message_length
),
1319 ACE_NEW_RETURN (mb2
,
1320 ACE_Message_Block ((char *)complete_message
,
1321 complete_message_length
),
1324 ACE_NEW_RETURN (mb3
,
1325 ACE_Message_Block ((char *)complete_message
,
1326 complete_message_length
),
1329 mb1
->wr_ptr (complete_message_length
);
1330 mb2
->wr_ptr (complete_message_length
);
1331 mb3
->wr_ptr (complete_message_length
);
1333 // chain them together
1337 if (this->ws_
.writev (*mb1
, mb1
->total_length ()) == -1)
1340 ACE_ERROR_RETURN((LM_ERROR
,
1341 ACE_TEXT ("(%t) %p\n"),
1342 ACE_TEXT ("Client::ACE_Asynch_Stream::writev")),
1345 #else /* defined (ACE_WIN32) */
1347 ACE_Message_Block
*mb
= 0;
1349 // No need to allocate +1 for proper printing - the memory includes it already
1351 ACE_Message_Block (complete_message
, complete_message_length
),
1353 mb
->wr_ptr (complete_message_length
);
1355 if (this->ws_
.write (*mb
, mb
->length ()) == -1)
1358 #if defined (ACE_WIN32)
1359 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
1360 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
1361 ACE_ERROR_RETURN ((LM_DEBUG
,
1362 ACE_TEXT ("(%t) Client %d, peer gone\n"),
1365 #endif /* ACE_WIN32 */
1366 ACE_ERROR_RETURN((LM_ERROR
,
1367 ACE_TEXT ("(%t) Client %d, %p\n"),
1369 ACE_TEXT ("write")),
1372 #endif /* defined (ACE_WIN32) */
1380 Client::initiate_read_stream ()
1382 if (this->flg_cancel_
!= 0 || this->handle_
== ACE_INVALID_HANDLE
)
1385 static const size_t complete_message_length
=
1386 ACE_OS::strlen (complete_message
);
1388 #if defined (ACE_WIN32)
1389 ACE_Message_Block
*mb1
= 0,
1396 // We allocate +1 only for proper printing - we can just set the last byte
1397 // to '\0' before printing out
1398 ACE_NEW_RETURN (mb1
, ACE_Message_Block (complete_message_length
+ 1), -1);
1399 ACE_NEW_RETURN (mb2
, ACE_Message_Block (complete_message_length
+ 1), -1);
1400 ACE_NEW_RETURN (mb3
, ACE_Message_Block (complete_message_length
+ 1), -1);
1402 // Let allocate memory for one more triplet,
1403 // This improves performance
1404 // as we can receive more the than one block at once
1405 // Generally, we can receive more triplets ....
1406 ACE_NEW_RETURN (mb4
, ACE_Message_Block (complete_message_length
+ 1), -1);
1407 ACE_NEW_RETURN (mb5
, ACE_Message_Block (complete_message_length
+ 1), -1);
1408 ACE_NEW_RETURN (mb6
, ACE_Message_Block (complete_message_length
+ 1), -1);
1418 // hide last byte in each message block, reserving it for later to set '\0'
1419 // for proper printouts
1420 mb1
->size (mb1
->size () - 1);
1421 mb2
->size (mb2
->size () - 1);
1422 mb3
->size (mb3
->size () - 1);
1424 mb4
->size (mb4
->size () - 1);
1425 mb5
->size (mb5
->size () - 1);
1426 mb6
->size (mb6
->size () - 1);
1429 if (this->rs_
.readv (*mb1
, mb1
->total_size () - 1) == -1)
1432 ACE_ERROR_RETURN ((LM_ERROR
,
1433 ACE_TEXT ("(%t) %p\n"),
1434 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1437 #else /* defined (ACE_WIN32) */
1439 // Try to read more chunks
1440 size_t blksize
= ( complete_message_length
> BUFSIZ
) ?
1441 complete_message_length
: BUFSIZ
;
1443 ACE_Message_Block
*mb
= 0;
1445 // We allocate +1 only for proper printing - we can just set the last byte
1446 // to '\0' before printing out
1448 ACE_Message_Block (blksize
+ 1),
1452 if (this->rs_
.read (*mb
, mb
->size () - 1) == -1)
1455 #if defined (ACE_WIN32)
1456 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
1457 // a 0-byte read as we would if underlying calls used WSARecv.
1458 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
1459 ACE_ERROR_RETURN ((LM_DEBUG
,
1460 ACE_TEXT ("(%t) Client %d, peer closed\n"),
1463 #endif /* ACE_WIN32 */
1464 ACE_ERROR_RETURN ((LM_ERROR
,
1465 ACE_TEXT ("(%t) Client %d, %p\n"),
1470 #endif /* defined (ACE_WIN32) */
1478 Client::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
)
1481 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1483 ACE_Message_Block
& mb
= result
.message_block ();
1489 ACE_DEBUG ((LM_DEBUG
,
1490 ACE_TEXT ("(%t) **** Client %d: handle_write_stream() ****\n"),
1492 ACE_DEBUG ((LM_DEBUG
,
1493 ACE_TEXT ("%s = %d\n"),
1494 ACE_TEXT ("bytes_to_write"),
1495 result
.bytes_to_write ()));
1496 ACE_DEBUG ((LM_DEBUG
,
1497 ACE_TEXT ("%s = %d\n"),
1498 ACE_TEXT ("handle"),
1500 ACE_DEBUG ((LM_DEBUG
,
1501 ACE_TEXT ("%s = %d\n"),
1502 ACE_TEXT ("bytes_transfered"),
1503 result
.bytes_transferred ()));
1504 ACE_DEBUG ((LM_DEBUG
,
1505 ACE_TEXT ("%s = %@\n"),
1508 ACE_DEBUG ((LM_DEBUG
,
1509 ACE_TEXT ("%s = %d\n"),
1510 ACE_TEXT ("success"),
1511 result
.success ()));
1512 ACE_DEBUG ((LM_DEBUG
,
1513 ACE_TEXT ("%s = %@\n"),
1514 ACE_TEXT ("completion_key"),
1515 result
.completion_key ()));
1516 ACE_DEBUG ((LM_DEBUG
,
1517 ACE_TEXT ("%s = %d\n"),
1521 #if defined (ACE_WIN32)
1522 size_t bytes_transferred
= result
.bytes_transferred ();
1524 for (ACE_Message_Block
* mb_i
= &mb
;
1525 (mb_i
!= 0) && (bytes_transferred
> 0);
1526 mb_i
= mb_i
->cont ())
1528 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1529 mb_i
->rd_ptr()[0] = '\0';
1531 size_t len
= mb_i
->rd_ptr () - mb_i
->base ();
1533 // move rd_ptr backwards as required for printout
1534 if (len
>= bytes_transferred
)
1536 mb_i
->rd_ptr (0 - bytes_transferred
);
1537 bytes_transferred
= 0;
1541 mb_i
->rd_ptr (0 - len
);
1542 bytes_transferred
-= len
;
1546 ACE_DEBUG ((LM_DEBUG
,
1547 ACE_TEXT ("%s%d = %s\n"),
1548 ACE_TEXT ("message_block, part "),
1552 #else /* defined (ACE_WIN32) */
1553 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1554 mb
.rd_ptr()[0] = '\0';
1555 // move rd_ptr backwards as required for printout
1556 mb
.rd_ptr (- result
.bytes_transferred ());
1557 ACE_DEBUG ((LM_DEBUG
,
1558 ACE_TEXT ("%s = %s\n"),
1559 ACE_TEXT ("message_block"),
1561 #endif /* defined (ACE_WIN32) */
1563 ACE_DEBUG ((LM_DEBUG
,
1564 ACE_TEXT ("**** end of message ****************\n")));
1566 else if (result
.error () != 0)
1568 ACE_Log_Priority prio
;
1569 #if defined (ACE_WIN32)
1570 if (result
.error () == ERROR_OPERATION_ABORTED
)
1573 if (result
.error () == ECANCELED
)
1575 #endif /* ACE_WIN32 */
1578 ACE_Log_Msg::instance ()->errnum (result
.error ());
1579 ACE_Log_Msg::instance ()->log (prio
,
1580 ACE_TEXT ("(%t) Client %d; %p\n"),
1582 ACE_TEXT ("write"));
1584 else if (loglevel
> 0)
1586 ACE_DEBUG ((LM_DEBUG
,
1587 ACE_TEXT ("(%t) Client %d: wrote %d bytes ok\n"),
1589 result
.bytes_transferred ()));
1594 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1596 this->total_snd_
+= result
.bytes_transferred ();
1597 if (this->total_snd_
>= xfer_limit
)
1599 ACE_DEBUG ((LM_DEBUG
,
1600 ACE_TEXT ("(%t) Client %d sent %d, limit %d\n"),
1601 this->id_
, this->total_snd_
, xfer_limit
));
1604 if (duplex
!= 0) // full duplex, continue write
1606 if ((this->total_snd_
- this->total_rcv_
) < 1024*32 ) //flow control
1607 this->initiate_write_stream ();
1609 else // half-duplex read reply, after read we will start write
1610 this->initiate_read_stream ();
1614 if (this->io_count_
> 0)
1621 Client::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
1624 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1626 ACE_Message_Block
& mb
= result
.message_block ();
1632 ACE_DEBUG ((LM_DEBUG
,
1633 ACE_TEXT ("(%t) **** Client %d: handle_read_stream() ****\n"),
1635 ACE_DEBUG ((LM_DEBUG
,
1636 ACE_TEXT ("%s = %d\n"),
1637 ACE_TEXT ("bytes_to_read"),
1638 result
.bytes_to_read ()));
1639 ACE_DEBUG ((LM_DEBUG
,
1640 ACE_TEXT ("%s = %d\n"),
1641 ACE_TEXT ("handle"),
1643 ACE_DEBUG ((LM_DEBUG
,
1644 ACE_TEXT ("%s = %d\n"),
1645 ACE_TEXT ("bytes_transfered"),
1646 result
.bytes_transferred ()));
1647 ACE_DEBUG ((LM_DEBUG
,
1648 ACE_TEXT ("%s = %@\n"),
1651 ACE_DEBUG ((LM_DEBUG
,
1652 ACE_TEXT ("%s = %d\n"),
1653 ACE_TEXT ("success"),
1654 result
.success ()));
1655 ACE_DEBUG ((LM_DEBUG
,
1656 ACE_TEXT ("%s = %@\n"),
1657 ACE_TEXT ("completion_key"),
1658 result
.completion_key ()));
1659 ACE_DEBUG ((LM_DEBUG
,
1660 ACE_TEXT ("%s = %d\n"),
1664 #if defined (ACE_WIN32)
1666 for (ACE_Message_Block
* mb_i
= &mb
;
1668 mb_i
= mb_i
->cont ())
1671 // write 0 at string end for proper printout
1672 mb_i
->wr_ptr()[0] = '\0';
1674 ACE_DEBUG ((LM_DEBUG
,
1675 ACE_TEXT ("%s%d = %s\n"),
1676 ACE_TEXT ("message_block, part "),
1680 #else /* ACE_WIN32 */
1681 // write 0 at string end for proper printout
1682 mb
.rd_ptr()[result
.bytes_transferred ()] = '\0'; // for proper printout
1683 ACE_DEBUG ((LM_DEBUG
,
1684 ACE_TEXT ("%s = %s\n"),
1685 ACE_TEXT ("message_block"),
1687 #endif /* ACE_WIN32 */
1689 ACE_DEBUG ((LM_DEBUG
,
1690 ACE_TEXT ("**** end of message ****************\n")));
1692 else if (result
.error () != 0)
1694 ACE_Log_Priority prio
;
1695 #if defined (ACE_WIN32)
1696 if (result
.error () == ERROR_OPERATION_ABORTED
)
1699 if (result
.error () == ECANCELED
)
1701 #endif /* ACE_WIN32 */
1704 ACE_Log_Msg::instance ()->errnum (result
.error ());
1705 ACE_Log_Msg::instance ()->log (prio
,
1706 ACE_TEXT ("(%t) Client %d; %p\n"),
1710 else if (loglevel
> 0)
1712 ACE_DEBUG ((LM_DEBUG
,
1713 ACE_TEXT ("(%t) Client %d: read %d bytes ok\n"),
1715 result
.bytes_transferred ()));
1720 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1722 this->total_rcv_
+= result
.bytes_transferred ();
1724 if (duplex
!= 0 || this->stop_writing_
) // full duplex, continue read
1725 this->initiate_read_stream ();
1726 else // half-duplex write, after write we will start read
1727 this->initiate_write_stream ();
1731 if (this->io_count_
> 0)
1737 // *************************************************************
1738 // Configuration helpers
1739 // *************************************************************
1741 print_usage (int /* argc */, ACE_TCHAR
*argv
[])
1745 ACE_TEXT ("\nusage: %s")
1746 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1747 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1748 ACE_TEXT ("\n a AIOCB")
1749 ACE_TEXT ("\n i SIG")
1750 ACE_TEXT ("\n c CB")
1751 ACE_TEXT ("\n d default")
1752 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1753 ACE_TEXT ("\n-h <host> for Client mode")
1754 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1755 ACE_TEXT ("\n-p <port to listen/connect>")
1756 ACE_TEXT ("\n-c <number of client instances>")
1757 ACE_TEXT ("\n-b run client and server at the same time")
1758 ACE_TEXT ("\n f file")
1759 ACE_TEXT ("\n c console")
1760 ACE_TEXT ("\n-v log level")
1761 ACE_TEXT ("\n 0 - log errors and highlights")
1762 ACE_TEXT ("\n 1 - log level 0 plus progress information")
1763 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
1764 ACE_TEXT ("\n-x max transfer byte count per Client")
1765 ACE_TEXT ("\n-u show this message")
1773 set_proactor_type (const ACE_TCHAR
*ptype
)
1778 switch (ACE_OS::ace_toupper (*ptype
))
1781 proactor_type
= DEFAULT
;
1784 proactor_type
= AIOCB
;
1787 proactor_type
= SIG
;
1789 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
1793 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
1801 parse_args (int argc
, ACE_TCHAR
*argv
[])
1803 // First, set up all the defaults then let any args change them.
1804 both
= 1; // client and server simultaneosly
1805 duplex
= 1; // full duplex is on
1806 #if defined (ACE_HAS_IPV6)
1807 host
= ACE_IPV6_LOCALHOST
; // server to connect (IPv6 localhost)
1809 host
= ACE_LOCALHOST
;
1810 #endif /*ACE_HAS_IPV6*/
1811 port
= ACE_DEFAULT_SERVER_PORT
; // port to connect/listen
1812 max_aio_operations
= 512; // POSIX Proactor params
1813 proactor_type
= DEFAULT
; // Proactor type = default
1814 threads
= 3; // size of Proactor thread pool
1815 clients
= 10; // number of clients
1816 loglevel
= 0; // log level : only errors and highlights
1817 // Default transfer limit 50 messages per Sender
1818 xfer_limit
= 50 * ACE_OS::strlen (complete_message
);
1820 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
1821 # if defined (ACE_LINUX)
1825 if (argc
== 1) // no arguments , so one button test
1828 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
1831 while ((c
= get_opt ()) != EOF
)
1835 case 'x': // xfer limit
1836 xfer_limit
= static_cast<size_t> (ACE_OS::atoi (get_opt
.opt_arg ()));
1837 if (xfer_limit
== 0)
1838 xfer_limit
= 1; // Bare minimum.
1840 case 'b': // both client and server
1843 case 'v': // log level
1844 loglevel
= ACE_OS::atoi (get_opt
.opt_arg ());
1847 duplex
= ACE_OS::atoi (get_opt
.opt_arg ());
1849 case 'h': // host for sender
1850 host
= get_opt
.opt_arg ();
1852 case 'p': // port number
1853 port
= ACE_OS::atoi (get_opt
.opt_arg ());
1855 case 'n': // thread pool size
1856 threads
= ACE_OS::atoi (get_opt
.opt_arg ());
1858 case 'c': // number of clients
1859 clients
= ACE_OS::atoi (get_opt
.opt_arg ());
1860 if (clients
> MAX_CLIENTS
)
1861 clients
= MAX_CLIENTS
;
1863 case 'o': // max number of aio for proactor
1864 max_aio_operations
= ACE_OS::atoi (get_opt
.opt_arg ());
1866 case 't': // Proactor Type
1867 if (set_proactor_type (get_opt
.opt_arg ()))
1869 return print_usage (argc
, argv
);
1872 return print_usage (argc
, argv
);
1880 run_main (int argc
, ACE_TCHAR
*argv
[])
1882 ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPV6"));
1884 if (::parse_args (argc
, argv
) == -1)
1887 #if defined (ACE_HAS_IPV6)
1888 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
1889 disable_signal (SIGPIPE
, SIGPIPE
);
1894 if (task1
.start (threads
, proactor_type
, max_aio_operations
) == 0)
1896 Acceptor
acceptor (&test
);
1897 Connector
connector (&test
);
1898 ACE_INET_Addr
addr (port
, "::");
1902 if (both
!= 0 || host
== 0) // Acceptor
1904 // Simplify, initial read with zero size
1905 if (acceptor
.open (addr
, 0, 1) == 0)
1909 if (both
!= 0 || host
!= 0)
1912 host
= ACE_IPV6_LOCALHOST
;
1914 if (addr
.set (port
, host
, 1, addr
.get_type ()) == -1)
1915 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), host
));
1917 rc
+= connector
.start (addr
, clients
);
1920 // Wait a few seconds to let things get going, then poll til
1921 // all sessions are done. Note that when we exit this scope, the
1922 // Acceptor and Connector will be destroyed, which should prevent
1923 // further connections and also test how well destroyed handlers
1927 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
1928 while (!test
.testing_done ())
1933 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
1936 #endif /* ACE_HAS_IPV6 */
1946 run_main (int, ACE_TCHAR
*[])
1948 ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPV6"));
1950 ACE_DEBUG ((LM_INFO
,
1951 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
1952 ACE_TEXT ("Proactor_Test_IPV6 will not be run.\n")));
1959 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */