1 // ============================================================================
3 * @file Proactor_Test_IPV6.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that does various asynchronous
9 * @author Alexander Libman <alibman@baltimore.com>
10 * @author Brian Buesker <bbuesker@qualcomm.com> - modified for IPv6 operation
12 // ============================================================================
14 #include "test_config.h"
16 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
17 // This only works on Win32 platforms and on Unix platforms
18 // supporting POSIX aio calls.
20 #include "ace/Signal.h"
22 #include "ace/Service_Config.h"
23 #include "ace/INET_Addr.h"
24 #include "ace/SOCK_Connector.h"
25 #include "ace/SOCK_Acceptor.h"
26 #include "ace/SOCK_Stream.h"
27 #include "ace/Object_Manager.h"
28 #include "ace/Get_Opt.h"
30 #include "ace/Proactor.h"
31 #include "ace/Asynch_Acceptor.h"
32 #include "ace/Asynch_Connector.h"
34 #include "ace/Thread_Semaphore.h"
35 #include "ace/OS_NS_ctype.h"
36 #include "ace/OS_NS_errno.h"
37 #include "ace/OS_NS_signal.h"
38 #include "ace/OS_NS_string.h"
39 #include "ace/OS_NS_unistd.h"
40 #include "ace/OS_NS_sys_socket.h"
41 #include "ace/os_include/netinet/os_tcp.h"
43 #include "ace/Atomic_Op.h"
44 #include "ace/Synch_Traits.h"
46 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
48 # include "ace/WIN32_Proactor.h"
50 #elif defined (ACE_HAS_AIO_CALLS)
52 # include "ace/POSIX_Proactor.h"
53 # include "ace/POSIX_CB_Proactor.h"
54 # include "ace/SUN_Proactor.h"
56 #endif /* defined (ACE_HAS_WIN32_OVERLAPPED_IO) */
58 #include "Proactor_Test.h"
61 // Proactor Type (UNIX only, Win32 ignored)
62 typedef enum { DEFAULT
= 0, AIOCB
, SIG
, SUN
, CB
} ProactorType
;
63 static ProactorType proactor_type
= DEFAULT
;
65 // POSIX : > 0 max number aio operations proactor,
66 static size_t max_aio_operations
= 0;
68 // both: 0 run client or server / depends on host
69 // != 0 run client and server
72 // Host that we're connecting to.
73 static const ACE_TCHAR
*host
= 0;
75 // number of Client instances
76 static int clients
= 1;
77 const int MAX_CLIENTS
= 1000;
78 const int MAX_SERVERS
= 1000;
80 // duplex mode: == 0 half-duplex
82 static int duplex
= 0;
84 // number threads in the Proactor thread pool
85 static int threads
= 1;
87 // Port that we're receiving connections on.
88 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
91 static int loglevel
; // 0 full , 1 only errors
93 static size_t xfer_limit
; // Number of bytes for Client to send.
95 static char complete_message
[] =
98 "Accept-Language: C++\r\n"
99 "Accept-Encoding: gzip, deflate\r\n"
100 "User-Agent: Proactor_Test_IPv6/1.0 (non-compatible)\r\n"
101 "Connection: Keep-Alive\r\n"
108 LogLocker () { ACE_LOG_MSG
->acquire (); }
109 virtual ~LogLocker () { ACE_LOG_MSG
->release (); }
114 // Function to remove signals from the signal mask.
116 disable_signal (int sigmin
, int sigmax
)
118 #if !defined (ACE_LACKS_UNIX_SIGNALS)
120 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
121 ACE_ERROR ((LM_ERROR
,
122 ACE_TEXT ("Error: (%P|%t):%p\n"),
123 ACE_TEXT ("sigemptyset failed")));
125 for (int i
= sigmin
; i
<= sigmax
; i
++)
126 ACE_OS::sigaddset (&signal_set
, i
);
128 // Put the <signal_set>.
129 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
130 // In multi-threaded application this is not POSIX compliant
131 // but let's leave it just in case.
132 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
134 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
135 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
136 ACE_ERROR_RETURN ((LM_ERROR
,
137 ACE_TEXT ("Error: (%P|%t): %p\n"),
138 ACE_TEXT ("SIG_BLOCK failed")),
141 ACE_UNUSED_ARG (sigmin
);
142 ACE_UNUSED_ARG (sigmax
);
143 #endif /* ACE_LACKS_UNIX_SIGNALS */
149 // *************************************************************
150 // MyTask is ACE_Task resposible for :
151 // 1. creation and deletion of
152 // Proactor and Proactor thread pool
153 // 2. running Proactor event loop
154 // *************************************************************
159 * MyTask plays role for Proactor threads pool
161 * MyTask is ACE_Task resposible for:
162 * 1. Creation and deletion of Proactor and Proactor thread pool
163 * 2. Running Proactor event loop
165 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
170 sem_ ((unsigned int) 0),
175 (void) this->stop ();
176 (void) this->delete_proactor();
179 virtual int svc (void);
181 int start (int num_threads
,
182 ProactorType type_proactor
,
187 int create_proactor (ProactorType type_proactor
,
189 int delete_proactor (void);
191 ACE_SYNCH_RECURSIVE_MUTEX lock_
;
192 ACE_Thread_Semaphore sem_
;
193 ACE_Proactor
* proactor_
;
198 MyTask::create_proactor (ProactorType type_proactor
, size_t max_op
)
200 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
205 ACE_TEST_ASSERT (this->proactor_
== 0);
207 #if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
209 ACE_UNUSED_ARG (type_proactor
);
210 ACE_UNUSED_ARG (max_op
);
212 ACE_WIN32_Proactor
*proactor_impl
= 0;
214 ACE_NEW_RETURN (proactor_impl
,
218 ACE_DEBUG ((LM_DEBUG
,
219 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
221 #elif defined (ACE_HAS_AIO_CALLS)
223 ACE_POSIX_Proactor
* proactor_impl
= 0;
225 switch (type_proactor
)
228 ACE_NEW_RETURN (proactor_impl
,
229 ACE_POSIX_AIOCB_Proactor (max_op
),
231 ACE_DEBUG ((LM_DEBUG
,
232 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
235 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
237 ACE_NEW_RETURN (proactor_impl
,
238 ACE_POSIX_SIG_Proactor (max_op
),
240 ACE_DEBUG ((LM_DEBUG
,
241 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
243 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
247 ACE_NEW_RETURN (proactor_impl
,
248 ACE_SUN_Proactor (max_op
),
250 ACE_DEBUG ((LM_DEBUG
,
251 ACE_TEXT("(%t) Create Proactor Type = SUN\n")));
255 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
257 ACE_NEW_RETURN (proactor_impl
,
258 ACE_POSIX_CB_Proactor (max_op
),
260 ACE_DEBUG ((LM_DEBUG
,
261 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
263 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
266 ACE_DEBUG ((LM_DEBUG
,
267 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
271 #endif // (ACE_WIN32) && !defined (ACE_HAS_WINCE)
273 // always delete implementation 1 , not !(proactor_impl == 0)
274 ACE_NEW_RETURN (this->proactor_
,
275 ACE_Proactor (proactor_impl
, 1 ),
277 // Set new singleton and delete it in close_singleton()
278 ACE_Proactor::instance (this->proactor_
, 1);
283 MyTask::delete_proactor (void)
285 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
290 ACE_DEBUG ((LM_DEBUG
,
291 ACE_TEXT ("(%t) Delete Proactor\n")));
293 ACE_Proactor::close_singleton ();
300 MyTask::start (int num_threads
,
301 ProactorType type_proactor
,
304 if (this->create_proactor (type_proactor
, max_op
) == -1)
305 ACE_ERROR_RETURN ((LM_ERROR
,
307 ACE_TEXT ("unable to create proactor")),
310 if (this->activate (THR_NEW_LWP
, num_threads
) == -1)
311 ACE_ERROR_RETURN ((LM_ERROR
,
313 ACE_TEXT ("unable to activate thread pool")),
316 for (; num_threads
> 0; num_threads
--)
328 if (this->proactor_
!= 0)
330 ACE_DEBUG ((LM_DEBUG
,
331 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
333 ACE_Proactor::end_event_loop ();
336 if (this->wait () == -1)
337 ACE_ERROR ((LM_ERROR
,
339 ACE_TEXT ("unable to stop thread pool")));
347 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask started\n")));
349 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
350 disable_signal (SIGPIPE
, SIGPIPE
);
352 // signal that we are ready
355 ACE_Proactor::run_event_loop ();
357 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) MyTask finished\n")));
362 // TestData collects and reports on test-related transfer and connection
368 bool testing_done (void);
369 Server
*server_up (void);
370 Client
*client_up (void);
371 void server_done (Server
*s
);
372 void client_done (Client
*c
);
373 void stop_all (void);
379 // Track number of sessions that report start, and those that report
380 // their end (and stats).
381 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_up_
;
382 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> sessions_down_
;
384 // Total read and write bytes for all sessions.
385 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_cnt_
;
386 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_cnt_
;
387 // Total read and write operations issues for all sessions.
388 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> w_ops_
;
389 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, size_t> r_ops_
;
390 } servers_
, clients_
;
392 ACE_SYNCH_MUTEX list_lock_
;
393 Server
*server_list_
[MAX_SERVERS
];
394 Client
*client_list_
[MAX_CLIENTS
];
397 TestData::TestData ()
400 for (i
= 0; i
< MAX_SERVERS
; ++i
)
401 this->server_list_
[i
] = 0;
402 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
403 this->client_list_
[i
] = 0;
407 TestData::testing_done (void)
409 int svr_up
= this->servers_
.sessions_up_
.value ();
410 int svr_dn
= this->servers_
.sessions_down_
.value ();
411 int clt_up
= this->clients_
.sessions_up_
.value ();
412 int clt_dn
= this->clients_
.sessions_down_
.value ();
414 if (svr_up
== 0 && clt_up
== 0) // No connections up yet
417 return (svr_dn
>= svr_up
&& clt_dn
>= clt_up
);
421 TestData::server_up (void)
423 ++this->servers_
.sessions_up_
;
424 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
426 for (int i
= 0; i
< MAX_SERVERS
; ++i
)
428 if (this->server_list_
[i
] == 0)
430 ACE_NEW_RETURN (this->server_list_
[i
], Server (this, i
), 0);
431 ACE_DEBUG ((LM_DEBUG
,
432 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
434 this->servers_
.sessions_up_
.value (),
435 this->servers_
.sessions_down_
.value ()));
436 return this->server_list_
[i
];
443 TestData::client_up (void)
445 ++this->clients_
.sessions_up_
;
446 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
, 0);
448 for (int i
= 0; i
< MAX_CLIENTS
; ++i
)
450 if (this->client_list_
[i
] == 0)
452 ACE_NEW_RETURN (this->client_list_
[i
], Client (this, i
), 0);
453 ACE_DEBUG ((LM_DEBUG
,
454 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
456 this->clients_
.sessions_up_
.value (),
457 this->clients_
.sessions_down_
.value ()));
458 return this->client_list_
[i
];
465 TestData::server_done (Server
*s
)
467 this->servers_
.w_cnt_
+= s
->get_total_snd ();
468 this->servers_
.r_cnt_
+= s
->get_total_rcv ();
469 this->servers_
.w_ops_
+= s
->get_total_w ();
470 this->servers_
.r_ops_
+= s
->get_total_r ();
471 ++this->servers_
.sessions_down_
;
472 ACE_DEBUG ((LM_DEBUG
,
473 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
475 this->servers_
.sessions_up_
.value (),
476 this->servers_
.sessions_down_
.value ()));
478 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
480 for (i
= 0; i
< MAX_SERVERS
; ++i
)
482 if (this->server_list_
[i
] == s
)
485 ACE_ERROR ((LM_ERROR
,
486 ACE_TEXT ("Server %d is pos %d in list\n"),
489 this->server_list_
[i
] = 0;
493 if (i
>= MAX_SERVERS
)
494 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Server %@ done but not listed\n"), s
));
500 TestData::client_done (Client
*c
)
502 this->clients_
.w_cnt_
+= c
->get_total_snd ();
503 this->clients_
.r_cnt_
+= c
->get_total_rcv ();
504 this->clients_
.w_ops_
+= c
->get_total_w ();
505 this->clients_
.r_ops_
+= c
->get_total_r ();
506 ++this->clients_
.sessions_down_
;
507 ACE_DEBUG ((LM_DEBUG
,
508 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
510 this->clients_
.sessions_up_
.value (),
511 this->clients_
.sessions_down_
.value ()));
513 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
515 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
517 if (this->client_list_
[i
] == c
)
520 ACE_ERROR ((LM_ERROR
,
521 ACE_TEXT ("Client %d is pos %d in list\n"),
524 this->client_list_
[i
] = 0;
528 if (i
>= MAX_CLIENTS
)
529 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Client %@ done but not listed\n"), c
));
535 TestData::stop_all (void)
539 // Lock and cancel everything. Then release the lock, possibly allowing
540 // cleanups, then grab it again and delete all Servers and Clients.
542 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
543 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
545 if (this->client_list_
[i
] != 0)
546 this->client_list_
[i
]->cancel ();
549 for (i
= 0; i
< MAX_SERVERS
; ++i
)
551 if (this->server_list_
[i
] != 0)
552 this->server_list_
[i
]->cancel ();
556 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->list_lock_
);
557 for (i
= 0; i
< MAX_CLIENTS
; ++i
)
559 if (this->client_list_
[i
] != 0)
560 delete this->client_list_
[i
];
563 for (i
= 0; i
< MAX_SERVERS
; ++i
)
565 if (this->server_list_
[i
] != 0)
566 delete this->server_list_
[i
];
572 TestData::report (void)
575 ACE_TCHAR bufs
[256];
576 ACE_TCHAR bufr
[256];
578 ACE_OS::snprintf (bufs
, 256,
579 ACE_SIZE_T_FORMAT_SPECIFIER
580 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
581 this->clients_
.w_cnt_
.value (),
582 this->clients_
.w_ops_
.value ());
584 ACE_OS::snprintf (bufr
, 256,
585 ACE_SIZE_T_FORMAT_SPECIFIER
586 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
587 this->clients_
.r_cnt_
.value (),
588 this->clients_
.r_ops_
.value ());
590 ACE_DEBUG ((LM_DEBUG
,
591 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
595 ACE_OS::snprintf (bufs
, 256,
596 ACE_SIZE_T_FORMAT_SPECIFIER
597 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
598 this->servers_
.w_cnt_
.value (),
599 this->servers_
.w_ops_
.value ());
601 ACE_OS::snprintf (bufr
, 256,
602 ACE_SIZE_T_FORMAT_SPECIFIER
603 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT (")"),
604 this->servers_
.r_cnt_
.value (),
605 this->servers_
.r_ops_
.value ());
607 ACE_DEBUG ((LM_DEBUG
,
608 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
612 if (this->clients_
.w_cnt_
.value () == 0 ||
613 this->clients_
.r_cnt_
.value () == 0 ||
614 this->servers_
.w_cnt_
.value () == 0 ||
615 this->servers_
.r_cnt_
.value () == 0 )
616 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("It appears that this test didn't ")
617 ACE_TEXT ("really do anything. Something is very wrong.\n")));
621 class Acceptor
: public ACE_Asynch_Acceptor
<Server
>
624 Acceptor (TestData
*tester
);
625 virtual ~Acceptor (void);
627 // Virtual from ACE_Asynch_Acceptor
628 Server
*make_handler (void);
634 // *************************************************************
635 Acceptor::Acceptor (TestData
*tester
)
640 Acceptor::~Acceptor (void)
646 Acceptor::make_handler (void)
648 return this->tester_
->server_up ();
651 // ***************************************************
654 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
657 Server::Server (TestData
*tester
, int id
)
660 handle_ (ACE_INVALID_HANDLE
),
670 Server::~Server (void)
672 ACE_DEBUG ((LM_DEBUG
,
673 ACE_TEXT ("(%t) Server %d dtor; %d sends (%d bytes); ")
674 ACE_TEXT ("%d recvs (%d bytes)\n"),
676 this->total_w_
, this->total_snd_
,
677 this->total_r_
, this->total_rcv_
));
678 if (this->io_count_
!= 0)
679 ACE_ERROR ((LM_WARNING
,
680 ACE_TEXT ("(%t) Server %d deleted with ")
681 ACE_TEXT ("%d I/O outstanding\n"),
685 // This test bounces data back and forth between Clients and Servers.
686 // Therefore, if there was significantly more data in one direction, that's
687 // a problem. Remember, the byte counts are unsigned values.
688 int issue_data_warning
= 0;
689 if (this->total_snd_
> this->total_rcv_
)
691 if (this->total_rcv_
== 0)
692 issue_data_warning
= 1;
693 else if (this->total_snd_
/ this->total_rcv_
> 2)
694 issue_data_warning
= 1;
698 if (this->total_snd_
== 0)
699 issue_data_warning
= 1;
700 else if (this->total_rcv_
/ this->total_snd_
> 2)
701 issue_data_warning
= 1;
703 if (issue_data_warning
)
704 ACE_DEBUG ((LM_WARNING
,
705 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
707 if (this->tester_
!= 0)
708 this->tester_
->server_done (this);
710 if (this->handle_
!= ACE_INVALID_HANDLE
)
711 ACE_OS::closesocket (this->handle_
);
714 this->handle_
= ACE_INVALID_HANDLE
;
720 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
722 this->flg_cancel_
= 1;
730 Server::addresses (const ACE_INET_Addr
& peer
, const ACE_INET_Addr
&)
733 if (0 == peer
.addr_to_string (str
, sizeof (str
) / sizeof (ACE_TCHAR
)))
734 ACE_DEBUG ((LM_DEBUG
,
735 ACE_TEXT ("(%t) Server %d connection from %s\n"),
739 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) Server %d %p\n"),
741 ACE_TEXT ("addr_to_string")));
747 Server::open (ACE_HANDLE handle
, ACE_Message_Block
&)
750 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
752 // Don't buffer serial sends.
753 this->handle_
= handle
;
755 ACE_SOCK_Stream
option_setter (handle
);
756 if (-1 == option_setter
.set_option (ACE_IPPROTO_TCP
,
760 ACE_ERROR ((LM_ERROR
, "%p\n", "set_option"));
762 if (this->ws_
.open (*this, this->handle_
) == -1)
763 ACE_ERROR ((LM_ERROR
,
764 ACE_TEXT ("(%t) %p\n"),
765 ACE_TEXT ("Server::ACE_Asynch_Write_Stream::open")));
766 else if (this->rs_
.open (*this, this->handle_
) == -1)
767 ACE_ERROR ((LM_ERROR
,
768 ACE_TEXT ("(%t) %p\n"),
769 ACE_TEXT ("Server::ACE_Asynch_Read_Stream::open")));
771 this->initiate_read_stream ();
773 if (this->io_count_
> 0)
780 Server::initiate_read_stream (void)
782 if (this->flg_cancel_
!= 0 || this->handle_
== ACE_INVALID_HANDLE
)
785 ACE_Message_Block
*mb
= 0;
787 ACE_Message_Block (1024), //BUFSIZ + 1),
791 if (this->rs_
.read (*mb
, mb
->size () - 1) == -1)
794 #if defined (ACE_WIN32)
795 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
796 // a 0-byte read as we would if underlying calls used WSARecv.
797 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
798 ACE_ERROR_RETURN ((LM_DEBUG
,
799 ACE_TEXT ("(%t) Server %d, peer closed\n"),
802 #endif /* ACE_WIN32 */
803 ACE_ERROR_RETURN ((LM_ERROR
,
804 ACE_TEXT ("(%t) Server %d, %p\n"),
816 Server::initiate_write_stream (ACE_Message_Block
&mb
, size_t nbytes
)
818 if (this->flg_cancel_
!= 0 || this->handle_
== ACE_INVALID_HANDLE
)
827 ACE_ERROR_RETURN((LM_ERROR
,
828 ACE_TEXT ("(%t) Server::ACE_Asynch_Write_Stream::write nbytes <0 ")),
832 if (this->ws_
.write (mb
, nbytes
) == -1)
835 #if defined (ACE_WIN32)
836 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
837 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
838 ACE_ERROR_RETURN ((LM_DEBUG
,
839 ACE_TEXT ("(%t) Server %d, peer gone\n"),
842 #endif /* ACE_WIN32 */
843 ACE_ERROR_RETURN((LM_ERROR
,
844 ACE_TEXT ("(%t) Server %d, %p\n"),
856 Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
859 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
861 ACE_Message_Block
& mb
= result
.message_block ();
864 mb
.rd_ptr ()[result
.bytes_transferred ()] = '\0';
870 ACE_DEBUG ((LM_DEBUG
,
871 ACE_TEXT ("(%t) **** Server %d: handle_read_stream() ****\n"),
873 ACE_DEBUG ((LM_DEBUG
,
874 ACE_TEXT ("%s = %d\n"),
875 ACE_TEXT ("bytes_to_read"),
876 result
.bytes_to_read ()));
877 ACE_DEBUG ((LM_DEBUG
,
878 ACE_TEXT ("%s = %d\n"),
881 ACE_DEBUG ((LM_DEBUG
,
882 ACE_TEXT ("%s = %d\n"),
883 ACE_TEXT ("bytes_transfered"),
884 result
.bytes_transferred ()));
885 ACE_DEBUG ((LM_DEBUG
,
886 ACE_TEXT ("%s = %@\n"),
889 ACE_DEBUG ((LM_DEBUG
,
890 ACE_TEXT ("%s = %d\n"),
891 ACE_TEXT ("success"),
893 ACE_DEBUG ((LM_DEBUG
,
894 ACE_TEXT ("%s = %@\n"),
895 ACE_TEXT ("completion_key"),
896 result
.completion_key ()));
897 ACE_DEBUG ((LM_DEBUG
,
898 ACE_TEXT ("%s = %d\n"),
901 ACE_DEBUG ((LM_DEBUG
,
902 ACE_TEXT ("%s = %s\n"),
903 ACE_TEXT ("message_block"),
905 ACE_DEBUG ((LM_DEBUG
,
906 ACE_TEXT ("**** end of message ****************\n")));
908 else if (result
.error () != 0)
910 ACE_Log_Priority prio
;
911 #if defined (ACE_WIN32)
912 if (result
.error () == ERROR_OPERATION_ABORTED
)
915 if (result
.error () == ECANCELED
)
917 #endif /* ACE_WIN32 */
920 ACE_Log_Msg::instance ()->errnum (result
.error ());
921 ACE_Log_Msg::instance ()->log (prio
,
922 ACE_TEXT ("(%t) Server %d; %p\n"),
926 else if (loglevel
> 0)
928 ACE_DEBUG ((LM_DEBUG
,
929 ACE_TEXT ("(%t) Server %d: read %d bytes\n"),
931 result
.bytes_transferred ()));
934 if (result
.error () == 0 && result
.bytes_transferred () > 0)
936 this->total_rcv_
+= result
.bytes_transferred ();
938 if (this->initiate_write_stream (mb
,
939 result
.bytes_transferred ()) == 0)
941 if (duplex
!= 0) // Initiate new read from the stream.
942 this->initiate_read_stream ();
949 if (this->io_count_
> 0)
956 Server::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
)
959 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
961 ACE_Message_Block
& mb
= result
.message_block ();
967 //mb.rd_ptr () [0] = '\0';
968 mb
.rd_ptr (mb
.rd_ptr () - result
.bytes_transferred ());
970 ACE_DEBUG ((LM_DEBUG
,
971 ACE_TEXT ("(%t) **** Server %d: handle_write_stream() ****\n"),
973 ACE_DEBUG ((LM_DEBUG
,
974 ACE_TEXT ("%s = %d\n"),
975 ACE_TEXT ("bytes_to_write"),
976 result
.bytes_to_write ()));
977 ACE_DEBUG ((LM_DEBUG
,
978 ACE_TEXT ("%s = %d\n"),
981 ACE_DEBUG ((LM_DEBUG
,
982 ACE_TEXT ("%s = %d\n"),
983 ACE_TEXT ("bytes_transfered"),
984 result
.bytes_transferred ()));
985 ACE_DEBUG ((LM_DEBUG
,
986 ACE_TEXT ("%s = %@\n"),
989 ACE_DEBUG ((LM_DEBUG
,
990 ACE_TEXT ("%s = %d\n"),
991 ACE_TEXT ("success"),
993 ACE_DEBUG ((LM_DEBUG
,
994 ACE_TEXT ("%s = %@\n"),
995 ACE_TEXT ("completion_key"),
996 result
.completion_key ()));
997 ACE_DEBUG ((LM_DEBUG
,
998 ACE_TEXT ("%s = %d\n"),
1001 ACE_DEBUG ((LM_DEBUG
,
1002 ACE_TEXT ("%s = %s\n"),
1003 ACE_TEXT ("message_block"),
1005 ACE_DEBUG ((LM_DEBUG
,
1006 ACE_TEXT ("**** end of message ****************\n")));
1008 else if (result
.error () != 0)
1010 ACE_Log_Priority prio
;
1011 #if defined (ACE_WIN32)
1012 if (result
.error () == ERROR_OPERATION_ABORTED
)
1015 if (result
.error () == ECANCELED
)
1017 #endif /* ACE_WIN32 */
1020 ACE_Log_Msg::instance ()->errnum (result
.error ());
1021 ACE_Log_Msg::instance ()->log (prio
,
1022 ACE_TEXT ("(%t) Server %d; %p\n"),
1024 ACE_TEXT ("write"));
1026 else if (loglevel
> 0)
1028 ACE_DEBUG ((LM_DEBUG
,
1029 ACE_TEXT ("(%t) Server %d: wrote %d bytes ok\n"),
1031 result
.bytes_transferred ()));
1036 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1038 this->total_snd_
+= result
.bytes_transferred ();
1041 this->initiate_read_stream ();
1045 if (this->io_count_
> 0)
1051 // *******************************************
1053 // *******************************************
1055 class Connector
: public ACE_Asynch_Connector
<Client
>
1058 Connector (TestData
*tester
);
1059 virtual ~Connector (void);
1061 int start (const ACE_INET_Addr
&addr
, int num
);
1063 // Virtual from ACE_Asynch_Connector
1064 Client
*make_handler (void);
1070 // *************************************************************
1072 Connector::Connector (TestData
*tester
)
1077 Connector::~Connector (void)
1083 Connector::make_handler (void)
1085 return this->tester_
->client_up ();
1090 Connector::start (const ACE_INET_Addr
& addr
, int num
)
1092 if (num
> MAX_CLIENTS
)
1100 // int open ( int pass_addresses = 0,
1101 // ACE_Proactor *proactor = 0,
1102 // int validate_new_connection = 0 );
1104 if (this->open (1, 0, 1) != 0)
1106 ACE_ERROR ((LM_ERROR
,
1107 ACE_TEXT ("(%t) %p\n"),
1108 ACE_TEXT ("Connector::open failed")));
1112 for (; rc
< num
; rc
++)
1114 ACE_INET_Addr localAddr
;
1115 if (this->connect (addr
, localAddr
) != 0)
1117 ACE_ERROR ((LM_ERROR
,
1118 ACE_TEXT ("(%t) %p\n"),
1119 ACE_TEXT ("Connector::connect failed for IPv6")));
1129 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Shouldn't use this constructor!\n")));
1132 Client::Client (TestData
*tester
, int id
)
1135 handle_ (ACE_INVALID_HANDLE
),
1146 Client::~Client (void)
1148 ACE_DEBUG ((LM_DEBUG
,
1149 ACE_TEXT ("(%t) Client %d dtor; %d sends (%d bytes); ")
1150 ACE_TEXT ("%d recvs (%d bytes)\n"),
1152 this->total_w_
, this->total_snd_
,
1153 this->total_r_
, this->total_rcv_
));
1154 if (this->io_count_
!= 0)
1155 ACE_ERROR ((LM_WARNING
,
1156 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1160 // This test bounces data back and forth between Clients and Servers.
1161 // Therefore, if there was significantly more data in one direction, that's
1162 // a problem. Remember, the byte counts are unsigned values.
1163 int issue_data_warning
= 0;
1164 if (this->total_snd_
> this->total_rcv_
)
1166 if (this->total_rcv_
== 0)
1167 issue_data_warning
= 1;
1168 else if (this->total_snd_
/ this->total_rcv_
> 2)
1169 issue_data_warning
= 1;
1173 if (this->total_snd_
== 0)
1174 issue_data_warning
= 1;
1175 else if (this->total_rcv_
/ this->total_snd_
> 2)
1176 issue_data_warning
= 1;
1178 if (issue_data_warning
)
1179 ACE_DEBUG ((LM_WARNING
,
1180 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1182 if (this->tester_
!= 0)
1183 this->tester_
->client_done (this);
1186 this->handle_
= ACE_INVALID_HANDLE
;
1187 if (this->handle_
!= ACE_INVALID_HANDLE
)
1189 ACE_OS::closesocket (this->handle_
);
1191 this->handle_
= ACE_INVALID_HANDLE
;
1197 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1199 this->flg_cancel_
= 1;
1200 this->ws_
.cancel ();
1201 this->rs_
.cancel ();
1208 // This must be called with the lock_ held.
1209 ACE_DEBUG ((LM_DEBUG
,
1210 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1211 this->id_
, this->io_count_
));
1212 ACE_OS::shutdown (this->handle_
, ACE_SHUTDOWN_WRITE
);
1213 this->stop_writing_
= 1;
1219 Client::addresses (const ACE_INET_Addr
& peer
, const ACE_INET_Addr
& local
)
1222 char peer_name
[256];
1223 ACE_TCHAR local_str
[256];
1224 ACE_INET_Addr
addr ((u_short
) 0, host
);
1226 // This checks to make sure the peer address given to us matches what
1227 // we expect it to be.
1228 if (0 != peer
.get_host_addr (peer_name
, sizeof (peer_name
)))
1230 if (0 != addr
.get_host_addr (my_name
, sizeof (my_name
)))
1232 if (0 != ACE_OS::strncmp (peer_name
, my_name
, sizeof (my_name
)))
1236 ACE_TEXT ("(%t) Sender %d peer address (%C) does not ")
1237 ACE_TEXT ("match host address (%C)\n"),
1239 peer_name
, my_name
));
1247 ACE_TEXT ("(%t) Sender %d unable to convert host addr\n"),
1254 ACE_ERROR ((LM_ERROR
,
1255 ACE_TEXT ("(%t) Sender %d unable to convert peer addr\n"),
1260 if (0 == local
.addr_to_string (local_str
,
1261 sizeof (local_str
) / sizeof (ACE_TCHAR
)))
1262 ACE_DEBUG ((LM_DEBUG
,
1263 ACE_TEXT ("(%t) Client %d connected on %s\n"),
1267 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) Client %d %p\n"),
1269 ACE_TEXT ("addr_to_string")));
1275 Client::open (ACE_HANDLE handle
, ACE_Message_Block
&)
1278 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1280 // Don't buffer serial sends.
1281 this->handle_
= handle
;
1283 ACE_SOCK_Stream
option_setter (handle
);
1284 if (option_setter
.set_option (ACE_IPPROTO_TCP
,
1288 ACE_ERROR ((LM_ERROR
, "%p\n", "set_option"));
1290 // Open ACE_Asynch_Write_Stream
1291 if (this->ws_
.open (*this, this->handle_
) == -1)
1292 ACE_ERROR ((LM_ERROR
,
1293 ACE_TEXT ("(%t) %p\n"),
1294 ACE_TEXT ("Client::ACE_Asynch_Write_Stream::open")));
1296 // Open ACE_Asynch_Read_Stream
1297 else if (this->rs_
.open (*this, this->handle_
) == -1)
1298 ACE_ERROR ((LM_ERROR
,
1299 ACE_TEXT ("(%t) %p\n"),
1300 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::open")));
1302 else if (this->initiate_write_stream () == 0)
1304 if (duplex
!= 0) // Start an asynchronous read
1305 this->initiate_read_stream ();
1308 if (this->io_count_
> 0)
1315 Client::initiate_write_stream (void)
1317 if (this->flg_cancel_
!= 0 ||
1318 this->stop_writing_
||
1319 this->handle_
== ACE_INVALID_HANDLE
)
1322 static const size_t complete_message_length
= ACE_OS::strlen (complete_message
);
1324 #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
1326 ACE_Message_Block
*mb1
= 0,
1330 // No need to allocate +1 for proper printing - the memory includes it already
1331 ACE_NEW_RETURN (mb1
,
1332 ACE_Message_Block ((char *)complete_message
,
1333 complete_message_length
),
1336 ACE_NEW_RETURN (mb2
,
1337 ACE_Message_Block ((char *)complete_message
,
1338 complete_message_length
),
1341 ACE_NEW_RETURN (mb3
,
1342 ACE_Message_Block ((char *)complete_message
,
1343 complete_message_length
),
1346 mb1
->wr_ptr (complete_message_length
);
1347 mb2
->wr_ptr (complete_message_length
);
1348 mb3
->wr_ptr (complete_message_length
);
1350 // chain them together
1354 if (this->ws_
.writev (*mb1
, mb1
->total_length ()) == -1)
1357 ACE_ERROR_RETURN((LM_ERROR
,
1358 ACE_TEXT ("(%t) %p\n"),
1359 ACE_TEXT ("Client::ACE_Asynch_Stream::writev")),
1362 #else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1364 ACE_Message_Block
*mb
= 0;
1366 // No need to allocate +1 for proper printing - the memory includes it already
1368 ACE_Message_Block (complete_message
, complete_message_length
),
1370 mb
->wr_ptr (complete_message_length
);
1372 if (this->ws_
.write (*mb
, mb
->length ()) == -1)
1375 #if defined (ACE_WIN32)
1376 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
1377 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
1378 ACE_ERROR_RETURN ((LM_DEBUG
,
1379 ACE_TEXT ("(%t) Client %d, peer gone\n"),
1382 #endif /* ACE_WIN32 */
1383 ACE_ERROR_RETURN((LM_ERROR
,
1384 ACE_TEXT ("(%t) Client %d, %p\n"),
1386 ACE_TEXT ("write")),
1389 #endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1397 Client::initiate_read_stream (void)
1399 if (this->flg_cancel_
!= 0 || this->handle_
== ACE_INVALID_HANDLE
)
1402 static const size_t complete_message_length
=
1403 ACE_OS::strlen (complete_message
);
1405 #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
1406 ACE_Message_Block
*mb1
= 0,
1413 // We allocate +1 only for proper printing - we can just set the last byte
1414 // to '\0' before printing out
1415 ACE_NEW_RETURN (mb1
, ACE_Message_Block (complete_message_length
+ 1), -1);
1416 ACE_NEW_RETURN (mb2
, ACE_Message_Block (complete_message_length
+ 1), -1);
1417 ACE_NEW_RETURN (mb3
, ACE_Message_Block (complete_message_length
+ 1), -1);
1419 // Let allocate memory for one more triplet,
1420 // This improves performance
1421 // as we can receive more the than one block at once
1422 // Generally, we can receive more triplets ....
1423 ACE_NEW_RETURN (mb4
, ACE_Message_Block (complete_message_length
+ 1), -1);
1424 ACE_NEW_RETURN (mb5
, ACE_Message_Block (complete_message_length
+ 1), -1);
1425 ACE_NEW_RETURN (mb6
, ACE_Message_Block (complete_message_length
+ 1), -1);
1435 // hide last byte in each message block, reserving it for later to set '\0'
1436 // for proper printouts
1437 mb1
->size (mb1
->size () - 1);
1438 mb2
->size (mb2
->size () - 1);
1439 mb3
->size (mb3
->size () - 1);
1441 mb4
->size (mb4
->size () - 1);
1442 mb5
->size (mb5
->size () - 1);
1443 mb6
->size (mb6
->size () - 1);
1446 if (this->rs_
.readv (*mb1
, mb1
->total_size () - 1) == -1)
1449 ACE_ERROR_RETURN ((LM_ERROR
,
1450 ACE_TEXT ("(%t) %p\n"),
1451 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1454 #else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1456 // Try to read more chunks
1457 size_t blksize
= ( complete_message_length
> BUFSIZ
) ?
1458 complete_message_length
: BUFSIZ
;
1460 ACE_Message_Block
*mb
= 0;
1462 // We allocate +1 only for proper printing - we can just set the last byte
1463 // to '\0' before printing out
1465 ACE_Message_Block (blksize
+ 1),
1469 if (this->rs_
.read (*mb
, mb
->size () - 1) == -1)
1472 #if defined (ACE_WIN32)
1473 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
1474 // a 0-byte read as we would if underlying calls used WSARecv.
1475 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED
)
1476 ACE_ERROR_RETURN ((LM_DEBUG
,
1477 ACE_TEXT ("(%t) Client %d, peer closed\n"),
1480 #endif /* ACE_WIN32 */
1481 ACE_ERROR_RETURN ((LM_ERROR
,
1482 ACE_TEXT ("(%t) Client %d, %p\n"),
1487 #endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1495 Client::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
)
1498 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1500 ACE_Message_Block
& mb
= result
.message_block ();
1506 ACE_DEBUG ((LM_DEBUG
,
1507 ACE_TEXT ("(%t) **** Client %d: handle_write_stream() ****\n"),
1509 ACE_DEBUG ((LM_DEBUG
,
1510 ACE_TEXT ("%s = %d\n"),
1511 ACE_TEXT ("bytes_to_write"),
1512 result
.bytes_to_write ()));
1513 ACE_DEBUG ((LM_DEBUG
,
1514 ACE_TEXT ("%s = %d\n"),
1515 ACE_TEXT ("handle"),
1517 ACE_DEBUG ((LM_DEBUG
,
1518 ACE_TEXT ("%s = %d\n"),
1519 ACE_TEXT ("bytes_transfered"),
1520 result
.bytes_transferred ()));
1521 ACE_DEBUG ((LM_DEBUG
,
1522 ACE_TEXT ("%s = %@\n"),
1525 ACE_DEBUG ((LM_DEBUG
,
1526 ACE_TEXT ("%s = %d\n"),
1527 ACE_TEXT ("success"),
1528 result
.success ()));
1529 ACE_DEBUG ((LM_DEBUG
,
1530 ACE_TEXT ("%s = %@\n"),
1531 ACE_TEXT ("completion_key"),
1532 result
.completion_key ()));
1533 ACE_DEBUG ((LM_DEBUG
,
1534 ACE_TEXT ("%s = %d\n"),
1538 #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
1539 size_t bytes_transferred
= result
.bytes_transferred ();
1541 for (ACE_Message_Block
* mb_i
= &mb
;
1542 (mb_i
!= 0) && (bytes_transferred
> 0);
1543 mb_i
= mb_i
->cont ())
1545 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1546 mb_i
->rd_ptr()[0] = '\0';
1548 size_t len
= mb_i
->rd_ptr () - mb_i
->base ();
1550 // move rd_ptr backwards as required for printout
1551 if (len
>= bytes_transferred
)
1553 mb_i
->rd_ptr (0 - bytes_transferred
);
1554 bytes_transferred
= 0;
1558 mb_i
->rd_ptr (0 - len
);
1559 bytes_transferred
-= len
;
1563 ACE_DEBUG ((LM_DEBUG
,
1564 ACE_TEXT ("%s%d = %s\n"),
1565 ACE_TEXT ("message_block, part "),
1569 #else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1570 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1571 mb
.rd_ptr()[0] = '\0';
1572 // move rd_ptr backwards as required for printout
1573 mb
.rd_ptr (- result
.bytes_transferred ());
1574 ACE_DEBUG ((LM_DEBUG
,
1575 ACE_TEXT ("%s = %s\n"),
1576 ACE_TEXT ("message_block"),
1578 #endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1580 ACE_DEBUG ((LM_DEBUG
,
1581 ACE_TEXT ("**** end of message ****************\n")));
1583 else if (result
.error () != 0)
1585 ACE_Log_Priority prio
;
1586 #if defined (ACE_WIN32)
1587 if (result
.error () == ERROR_OPERATION_ABORTED
)
1590 if (result
.error () == ECANCELED
)
1592 #endif /* ACE_WIN32 */
1595 ACE_Log_Msg::instance ()->errnum (result
.error ());
1596 ACE_Log_Msg::instance ()->log (prio
,
1597 ACE_TEXT ("(%t) Client %d; %p\n"),
1599 ACE_TEXT ("write"));
1601 else if (loglevel
> 0)
1603 ACE_DEBUG ((LM_DEBUG
,
1604 ACE_TEXT ("(%t) Client %d: wrote %d bytes ok\n"),
1606 result
.bytes_transferred ()));
1611 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1613 this->total_snd_
+= result
.bytes_transferred ();
1614 if (this->total_snd_
>= xfer_limit
)
1616 ACE_DEBUG ((LM_DEBUG
,
1617 ACE_TEXT ("(%t) Client %d sent %d, limit %d\n"),
1618 this->id_
, this->total_snd_
, xfer_limit
));
1621 if (duplex
!= 0) // full duplex, continue write
1623 if ((this->total_snd_
- this->total_rcv_
) < 1024*32 ) //flow control
1624 this->initiate_write_stream ();
1626 else // half-duplex read reply, after read we will start write
1627 this->initiate_read_stream ();
1631 if (this->io_count_
> 0)
1638 Client::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
1641 ACE_GUARD (ACE_SYNCH_MUTEX
, monitor
, this->lock_
);
1643 ACE_Message_Block
& mb
= result
.message_block ();
1649 ACE_DEBUG ((LM_DEBUG
,
1650 ACE_TEXT ("(%t) **** Client %d: handle_read_stream() ****\n"),
1652 ACE_DEBUG ((LM_DEBUG
,
1653 ACE_TEXT ("%s = %d\n"),
1654 ACE_TEXT ("bytes_to_read"),
1655 result
.bytes_to_read ()));
1656 ACE_DEBUG ((LM_DEBUG
,
1657 ACE_TEXT ("%s = %d\n"),
1658 ACE_TEXT ("handle"),
1660 ACE_DEBUG ((LM_DEBUG
,
1661 ACE_TEXT ("%s = %d\n"),
1662 ACE_TEXT ("bytes_transfered"),
1663 result
.bytes_transferred ()));
1664 ACE_DEBUG ((LM_DEBUG
,
1665 ACE_TEXT ("%s = %@\n"),
1668 ACE_DEBUG ((LM_DEBUG
,
1669 ACE_TEXT ("%s = %d\n"),
1670 ACE_TEXT ("success"),
1671 result
.success ()));
1672 ACE_DEBUG ((LM_DEBUG
,
1673 ACE_TEXT ("%s = %@\n"),
1674 ACE_TEXT ("completion_key"),
1675 result
.completion_key ()));
1676 ACE_DEBUG ((LM_DEBUG
,
1677 ACE_TEXT ("%s = %d\n"),
1681 #if defined (ACE_WIN32)
1683 for (ACE_Message_Block
* mb_i
= &mb
;
1685 mb_i
= mb_i
->cont ())
1688 // write 0 at string end for proper printout
1689 mb_i
->wr_ptr()[0] = '\0';
1691 ACE_DEBUG ((LM_DEBUG
,
1692 ACE_TEXT ("%s%d = %s\n"),
1693 ACE_TEXT ("message_block, part "),
1697 #else /* ACE_WIN32 */
1698 // write 0 at string end for proper printout
1699 mb
.rd_ptr()[result
.bytes_transferred ()] = '\0'; // for proper printout
1700 ACE_DEBUG ((LM_DEBUG
,
1701 ACE_TEXT ("%s = %s\n"),
1702 ACE_TEXT ("message_block"),
1704 #endif /* ACE_WIN32 */
1706 ACE_DEBUG ((LM_DEBUG
,
1707 ACE_TEXT ("**** end of message ****************\n")));
1709 else if (result
.error () != 0)
1711 ACE_Log_Priority prio
;
1712 #if defined (ACE_WIN32)
1713 if (result
.error () == ERROR_OPERATION_ABORTED
)
1716 if (result
.error () == ECANCELED
)
1718 #endif /* ACE_WIN32 */
1721 ACE_Log_Msg::instance ()->errnum (result
.error ());
1722 ACE_Log_Msg::instance ()->log (prio
,
1723 ACE_TEXT ("(%t) Client %d; %p\n"),
1727 else if (loglevel
> 0)
1729 ACE_DEBUG ((LM_DEBUG
,
1730 ACE_TEXT ("(%t) Client %d: read %d bytes ok\n"),
1732 result
.bytes_transferred ()));
1737 if (result
.error () == 0 && result
.bytes_transferred () > 0)
1739 this->total_rcv_
+= result
.bytes_transferred ();
1741 if (duplex
!= 0 || this->stop_writing_
) // full duplex, continue read
1742 this->initiate_read_stream ();
1743 else // half-duplex write, after write we will start read
1744 this->initiate_write_stream ();
1748 if (this->io_count_
> 0)
1754 // *************************************************************
1755 // Configuration helpers
1756 // *************************************************************
1758 print_usage (int /* argc */, ACE_TCHAR
*argv
[])
1762 ACE_TEXT ("\nusage: %s")
1763 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1764 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1765 ACE_TEXT ("\n a AIOCB")
1766 ACE_TEXT ("\n i SIG")
1767 ACE_TEXT ("\n c CB")
1768 ACE_TEXT ("\n s SUN")
1769 ACE_TEXT ("\n d default")
1770 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1771 ACE_TEXT ("\n-h <host> for Client mode")
1772 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1773 ACE_TEXT ("\n-p <port to listen/connect>")
1774 ACE_TEXT ("\n-c <number of client instances>")
1775 ACE_TEXT ("\n-b run client and server at the same time")
1776 ACE_TEXT ("\n f file")
1777 ACE_TEXT ("\n c console")
1778 ACE_TEXT ("\n-v log level")
1779 ACE_TEXT ("\n 0 - log errors and highlights")
1780 ACE_TEXT ("\n 1 - log level 0 plus progress information")
1781 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
1782 ACE_TEXT ("\n-x max transfer byte count per Client")
1783 ACE_TEXT ("\n-u show this message")
1791 set_proactor_type (const ACE_TCHAR
*ptype
)
1796 switch (ACE_OS::ace_toupper (*ptype
))
1799 proactor_type
= DEFAULT
;
1802 proactor_type
= AIOCB
;
1805 proactor_type
= SIG
;
1809 proactor_type
= SUN
;
1812 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
1816 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
1824 parse_args (int argc
, ACE_TCHAR
*argv
[])
1826 // First, set up all the defaults then let any args change them.
1827 both
= 1; // client and server simultaneosly
1828 duplex
= 1; // full duplex is on
1829 #if defined (ACE_HAS_IPV6)
1830 host
= ACE_IPV6_LOCALHOST
; // server to connect (IPv6 localhost)
1832 host
= ACE_LOCALHOST
;
1833 #endif /*ACE_HAS_IPV6*/
1834 port
= ACE_DEFAULT_SERVER_PORT
; // port to connect/listen
1835 max_aio_operations
= 512; // POSIX Proactor params
1836 proactor_type
= DEFAULT
; // Proactor type = default
1837 threads
= 3; // size of Proactor thread pool
1838 clients
= 10; // number of clients
1839 loglevel
= 0; // log level : only errors and highlights
1840 // Default transfer limit 50 messages per Sender
1841 xfer_limit
= 50 * ACE_OS::strlen (complete_message
);
1843 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
1844 # if defined (ACE_LINUX)
1848 if (argc
== 1) // no arguments , so one button test
1851 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
1854 while ((c
= get_opt ()) != EOF
)
1858 case 'x': // xfer limit
1859 xfer_limit
= static_cast<size_t> (ACE_OS::atoi (get_opt
.opt_arg ()));
1860 if (xfer_limit
== 0)
1861 xfer_limit
= 1; // Bare minimum.
1863 case 'b': // both client and server
1866 case 'v': // log level
1867 loglevel
= ACE_OS::atoi (get_opt
.opt_arg ());
1870 duplex
= ACE_OS::atoi (get_opt
.opt_arg ());
1872 case 'h': // host for sender
1873 host
= get_opt
.opt_arg ();
1875 case 'p': // port number
1876 port
= ACE_OS::atoi (get_opt
.opt_arg ());
1878 case 'n': // thread pool size
1879 threads
= ACE_OS::atoi (get_opt
.opt_arg ());
1881 case 'c': // number of clients
1882 clients
= ACE_OS::atoi (get_opt
.opt_arg ());
1883 if (clients
> MAX_CLIENTS
)
1884 clients
= MAX_CLIENTS
;
1886 case 'o': // max number of aio for proactor
1887 max_aio_operations
= ACE_OS::atoi (get_opt
.opt_arg ());
1889 case 't': // Proactor Type
1890 if (set_proactor_type (get_opt
.opt_arg ()))
1892 return print_usage (argc
, argv
);
1895 return print_usage (argc
, argv
);
1899 if (proactor_type
== SUN
&& threads
> 1)
1901 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Sun aiowait is not thread-safe; ")
1902 ACE_TEXT ("changing to 1 thread\n")));
1910 run_main (int argc
, ACE_TCHAR
*argv
[])
1912 ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPV6"));
1914 if (::parse_args (argc
, argv
) == -1)
1917 #if defined (ACE_HAS_IPV6)
1918 disable_signal (ACE_SIGRTMIN
, ACE_SIGRTMAX
);
1919 disable_signal (SIGPIPE
, SIGPIPE
);
1924 if (task1
.start (threads
, proactor_type
, max_aio_operations
) == 0)
1926 Acceptor
acceptor (&test
);
1927 Connector
connector (&test
);
1928 ACE_INET_Addr
addr (port
, "::");
1932 if (both
!= 0 || host
== 0) // Acceptor
1934 // Simplify, initial read with zero size
1935 if (acceptor
.open (addr
, 0, 1) == 0)
1939 if (both
!= 0 || host
!= 0)
1942 host
= ACE_IPV6_LOCALHOST
;
1944 if (addr
.set (port
, host
, 1, addr
.get_type ()) == -1)
1945 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), host
));
1947 rc
+= connector
.start (addr
, clients
);
1950 // Wait a few seconds to let things get going, then poll til
1951 // all sessions are done. Note that when we exit this scope, the
1952 // Acceptor and Connector will be destroyed, which should prevent
1953 // further connections and also test how well destroyed handlers
1957 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
1958 while (!test
.testing_done ())
1963 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
1966 #endif /* ACE_HAS_IPV6 */
1976 run_main (int, ACE_TCHAR
*[])
1978 ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPV6"));
1980 ACE_DEBUG ((LM_INFO
,
1981 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
1982 ACE_TEXT ("Proactor_Test_IPV6 will not be run.\n")));
1989 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */