Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Proactor_Test_IPV6.cpp
blob341c604c30d97d4f664ff1bfb78b69bcf41f7141
1 // ============================================================================
2 /**
3 * @file Proactor_Test_IPV6.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that does various asynchronous
7 * operations.
9 * @author Alexander Libman <alibman@baltimore.com>
10 * @author Brian Buesker <bbuesker@qualcomm.com> - modified for IPv6 operation
12 // ============================================================================
14 #include "test_config.h"
16 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
17 // This only works on Win32 platforms and on Unix platforms
18 // supporting POSIX aio calls.
20 #include "ace/Signal.h"
22 #include "ace/Service_Config.h"
23 #include "ace/INET_Addr.h"
24 #include "ace/SOCK_Connector.h"
25 #include "ace/SOCK_Acceptor.h"
26 #include "ace/SOCK_Stream.h"
27 #include "ace/Object_Manager.h"
28 #include "ace/Get_Opt.h"
30 #include "ace/Proactor.h"
31 #include "ace/Asynch_Acceptor.h"
32 #include "ace/Asynch_Connector.h"
33 #include "ace/Task.h"
34 #include "ace/Thread_Semaphore.h"
35 #include "ace/OS_NS_ctype.h"
36 #include "ace/OS_NS_errno.h"
37 #include "ace/OS_NS_signal.h"
38 #include "ace/OS_NS_string.h"
39 #include "ace/OS_NS_unistd.h"
40 #include "ace/OS_NS_sys_socket.h"
41 #include "ace/os_include/netinet/os_tcp.h"
43 #include "ace/Atomic_Op.h"
44 #include "ace/Synch_Traits.h"
46 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
48 # include "ace/WIN32_Proactor.h"
50 #elif defined (ACE_HAS_AIO_CALLS)
52 # include "ace/POSIX_Proactor.h"
53 # include "ace/POSIX_CB_Proactor.h"
54 # include "ace/SUN_Proactor.h"
56 #endif /* defined (ACE_HAS_WIN32_OVERLAPPED_IO) */
58 #include "Proactor_Test.h"
61 // Proactor Type (UNIX only, Win32 ignored)
62 typedef enum { DEFAULT = 0, AIOCB, SIG, SUN, CB } ProactorType;
63 static ProactorType proactor_type = DEFAULT;
65 // POSIX : > 0 max number aio operations proactor,
66 static size_t max_aio_operations = 0;
68 // both: 0 run client or server / depends on host
69 // != 0 run client and server
70 static int both = 0;
72 // Host that we're connecting to.
73 static const ACE_TCHAR *host = 0;
75 // number of Client instances
76 static int clients = 1;
77 const int MAX_CLIENTS = 1000;
78 const int MAX_SERVERS = 1000;
80 // duplex mode: == 0 half-duplex
81 // != 0 full duplex
82 static int duplex = 0;
84 // number threads in the Proactor thread pool
85 static int threads = 1;
87 // Port that we're receiving connections on.
88 static u_short port = ACE_DEFAULT_SERVER_PORT;
90 // Log options
91 static int loglevel; // 0 full , 1 only errors
93 static size_t xfer_limit; // Number of bytes for Client to send.
95 static char complete_message[] =
96 "GET / HTTP/1.1\r\n"
97 "Accept: */*\r\n"
98 "Accept-Language: C++\r\n"
99 "Accept-Encoding: gzip, deflate\r\n"
100 "User-Agent: Proactor_Test_IPv6/1.0 (non-compatible)\r\n"
101 "Connection: Keep-Alive\r\n"
102 "\r\n";
104 class LogLocker
106 public:
108 LogLocker () { ACE_LOG_MSG->acquire (); }
109 virtual ~LogLocker () { ACE_LOG_MSG->release (); }
114 // Function to remove signals from the signal mask.
115 static int
116 disable_signal (int sigmin, int sigmax)
118 #if !defined (ACE_LACKS_UNIX_SIGNALS)
119 sigset_t signal_set;
120 if (ACE_OS::sigemptyset (&signal_set) == - 1)
121 ACE_ERROR ((LM_ERROR,
122 ACE_TEXT ("Error: (%P|%t):%p\n"),
123 ACE_TEXT ("sigemptyset failed")));
125 for (int i = sigmin; i <= sigmax; i++)
126 ACE_OS::sigaddset (&signal_set, i);
128 // Put the <signal_set>.
129 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
130 // In multi-threaded application this is not POSIX compliant
131 // but let's leave it just in case.
132 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
133 # else
134 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
135 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
136 ACE_ERROR_RETURN ((LM_ERROR,
137 ACE_TEXT ("Error: (%P|%t): %p\n"),
138 ACE_TEXT ("SIG_BLOCK failed")),
139 -1);
140 #else
141 ACE_UNUSED_ARG (sigmin);
142 ACE_UNUSED_ARG (sigmax);
143 #endif /* ACE_LACKS_UNIX_SIGNALS */
145 return 0;
149 // *************************************************************
150 // MyTask is ACE_Task resposible for :
151 // 1. creation and deletion of
152 // Proactor and Proactor thread pool
153 // 2. running Proactor event loop
154 // *************************************************************
157 * @class MyTask
159 * MyTask plays role for Proactor threads pool
161 * MyTask is ACE_Task resposible for:
162 * 1. Creation and deletion of Proactor and Proactor thread pool
163 * 2. Running Proactor event loop
165 class MyTask : public ACE_Task<ACE_MT_SYNCH>
167 public:
168 MyTask (void):
169 lock_ (),
170 sem_ ((unsigned int) 0),
171 proactor_(0) {}
173 virtual ~MyTask()
175 (void) this->stop ();
176 (void) this->delete_proactor();
179 virtual int svc (void);
181 int start (int num_threads,
182 ProactorType type_proactor,
183 size_t max_op );
184 int stop (void);
186 private:
187 int create_proactor (ProactorType type_proactor,
188 size_t max_op);
189 int delete_proactor (void);
191 ACE_SYNCH_RECURSIVE_MUTEX lock_;
192 ACE_Thread_Semaphore sem_;
193 ACE_Proactor * proactor_;
198 MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
200 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
201 monitor,
202 this->lock_,
203 -1);
205 ACE_TEST_ASSERT (this->proactor_ == 0);
207 #if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
209 ACE_UNUSED_ARG (type_proactor);
210 ACE_UNUSED_ARG (max_op);
212 ACE_WIN32_Proactor *proactor_impl = 0;
214 ACE_NEW_RETURN (proactor_impl,
215 ACE_WIN32_Proactor,
216 -1);
218 ACE_DEBUG ((LM_DEBUG,
219 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
221 #elif defined (ACE_HAS_AIO_CALLS)
223 ACE_POSIX_Proactor * proactor_impl = 0;
225 switch (type_proactor)
227 case AIOCB:
228 ACE_NEW_RETURN (proactor_impl,
229 ACE_POSIX_AIOCB_Proactor (max_op),
230 -1);
231 ACE_DEBUG ((LM_DEBUG,
232 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
233 break;
235 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
236 case SIG:
237 ACE_NEW_RETURN (proactor_impl,
238 ACE_POSIX_SIG_Proactor (max_op),
239 -1);
240 ACE_DEBUG ((LM_DEBUG,
241 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
242 break;
243 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
245 # if defined (sun)
246 case SUN:
247 ACE_NEW_RETURN (proactor_impl,
248 ACE_SUN_Proactor (max_op),
249 -1);
250 ACE_DEBUG ((LM_DEBUG,
251 ACE_TEXT("(%t) Create Proactor Type = SUN\n")));
252 break;
253 # endif /* sun */
255 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
256 case CB:
257 ACE_NEW_RETURN (proactor_impl,
258 ACE_POSIX_CB_Proactor (max_op),
259 -1);
260 ACE_DEBUG ((LM_DEBUG,
261 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
262 break;
263 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
265 default:
266 ACE_DEBUG ((LM_DEBUG,
267 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
268 break;
271 #endif // (ACE_WIN32) && !defined (ACE_HAS_WINCE)
273 // always delete implementation 1 , not !(proactor_impl == 0)
274 ACE_NEW_RETURN (this->proactor_,
275 ACE_Proactor (proactor_impl, 1 ),
276 -1);
277 // Set new singleton and delete it in close_singleton()
278 ACE_Proactor::instance (this->proactor_, 1);
279 return 0;
283 MyTask::delete_proactor (void)
285 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
286 monitor,
287 this->lock_,
288 -1);
290 ACE_DEBUG ((LM_DEBUG,
291 ACE_TEXT ("(%t) Delete Proactor\n")));
293 ACE_Proactor::close_singleton ();
294 this->proactor_ = 0;
296 return 0;
300 MyTask::start (int num_threads,
301 ProactorType type_proactor,
302 size_t max_op)
304 if (this->create_proactor (type_proactor, max_op) == -1)
305 ACE_ERROR_RETURN ((LM_ERROR,
306 ACE_TEXT ("%p.\n"),
307 ACE_TEXT ("unable to create proactor")),
308 -1);
310 if (this->activate (THR_NEW_LWP, num_threads) == -1)
311 ACE_ERROR_RETURN ((LM_ERROR,
312 ACE_TEXT ("%p.\n"),
313 ACE_TEXT ("unable to activate thread pool")),
314 -1);
316 for (; num_threads > 0; num_threads--)
318 sem_.acquire ();
321 return 0;
326 MyTask::stop ()
328 if (this->proactor_ != 0)
330 ACE_DEBUG ((LM_DEBUG,
331 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
333 ACE_Proactor::end_event_loop ();
336 if (this->wait () == -1)
337 ACE_ERROR ((LM_ERROR,
338 ACE_TEXT ("%p.\n"),
339 ACE_TEXT ("unable to stop thread pool")));
341 return 0;
345 MyTask::svc (void)
347 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n")));
349 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
350 disable_signal (SIGPIPE, SIGPIPE);
352 // signal that we are ready
353 sem_.release (1);
355 ACE_Proactor::run_event_loop ();
357 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n")));
358 return 0;
362 // TestData collects and reports on test-related transfer and connection
363 // statistics.
364 class TestData
366 public:
367 TestData ();
368 bool testing_done (void);
369 Server *server_up (void);
370 Client *client_up (void);
371 void server_done (Server *s);
372 void client_done (Client *c);
373 void stop_all (void);
374 void report (void);
376 private:
377 struct Local_Stats
379 // Track number of sessions that report start, and those that report
380 // their end (and stats).
381 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_up_;
382 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_down_;
384 // Total read and write bytes for all sessions.
385 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_cnt_;
386 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_cnt_;
387 // Total read and write operations issues for all sessions.
388 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_ops_;
389 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_ops_;
390 } servers_, clients_;
392 ACE_SYNCH_MUTEX list_lock_;
393 Server *server_list_[MAX_SERVERS];
394 Client *client_list_[MAX_CLIENTS];
397 TestData::TestData ()
399 int i;
400 for (i = 0; i < MAX_SERVERS; ++i)
401 this->server_list_[i] = 0;
402 for (i = 0; i < MAX_CLIENTS; ++i)
403 this->client_list_[i] = 0;
406 bool
407 TestData::testing_done (void)
409 int svr_up = this->servers_.sessions_up_.value ();
410 int svr_dn = this->servers_.sessions_down_.value ();
411 int clt_up = this->clients_.sessions_up_.value ();
412 int clt_dn = this->clients_.sessions_down_.value ();
414 if (svr_up == 0 && clt_up == 0) // No connections up yet
415 return false;
417 return (svr_dn >= svr_up && clt_dn >= clt_up);
420 Server *
421 TestData::server_up (void)
423 ++this->servers_.sessions_up_;
424 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
426 for (int i = 0; i < MAX_SERVERS; ++i)
428 if (this->server_list_[i] == 0)
430 ACE_NEW_RETURN (this->server_list_[i], Server (this, i), 0);
431 ACE_DEBUG ((LM_DEBUG,
432 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
434 this->servers_.sessions_up_.value (),
435 this->servers_.sessions_down_.value ()));
436 return this->server_list_[i];
439 return 0;
442 Client *
443 TestData::client_up (void)
445 ++this->clients_.sessions_up_;
446 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
448 for (int i = 0; i < MAX_CLIENTS; ++i)
450 if (this->client_list_[i] == 0)
452 ACE_NEW_RETURN (this->client_list_[i], Client (this, i), 0);
453 ACE_DEBUG ((LM_DEBUG,
454 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
456 this->clients_.sessions_up_.value (),
457 this->clients_.sessions_down_.value ()));
458 return this->client_list_[i];
461 return 0;
464 void
465 TestData::server_done (Server *s)
467 this->servers_.w_cnt_ += s->get_total_snd ();
468 this->servers_.r_cnt_ += s->get_total_rcv ();
469 this->servers_.w_ops_ += s->get_total_w ();
470 this->servers_.r_ops_ += s->get_total_r ();
471 ++this->servers_.sessions_down_;
472 ACE_DEBUG ((LM_DEBUG,
473 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
474 s->id (),
475 this->servers_.sessions_up_.value (),
476 this->servers_.sessions_down_.value ()));
478 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
479 int i;
480 for (i = 0; i < MAX_SERVERS; ++i)
482 if (this->server_list_[i] == s)
484 if (s->id () != i)
485 ACE_ERROR ((LM_ERROR,
486 ACE_TEXT ("Server %d is pos %d in list\n"),
487 s->id (),
488 i));
489 this->server_list_[i] = 0;
490 break;
493 if (i >= MAX_SERVERS)
494 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Server %@ done but not listed\n"), s));
496 return;
499 void
500 TestData::client_done (Client *c)
502 this->clients_.w_cnt_ += c->get_total_snd ();
503 this->clients_.r_cnt_ += c->get_total_rcv ();
504 this->clients_.w_ops_ += c->get_total_w ();
505 this->clients_.r_ops_ += c->get_total_r ();
506 ++this->clients_.sessions_down_;
507 ACE_DEBUG ((LM_DEBUG,
508 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
509 c->id (),
510 this->clients_.sessions_up_.value (),
511 this->clients_.sessions_down_.value ()));
513 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
514 int i;
515 for (i = 0; i < MAX_CLIENTS; ++i)
517 if (this->client_list_[i] == c)
519 if (c->id () != i)
520 ACE_ERROR ((LM_ERROR,
521 ACE_TEXT ("Client %d is pos %d in list\n"),
522 c->id (),
523 i));
524 this->client_list_[i] = 0;
525 break;
528 if (i >= MAX_CLIENTS)
529 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Client %@ done but not listed\n"), c));
531 return;
534 void
535 TestData::stop_all (void)
537 int i;
539 // Lock and cancel everything. Then release the lock, possibly allowing
540 // cleanups, then grab it again and delete all Servers and Clients.
542 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
543 for (i = 0; i < MAX_CLIENTS; ++i)
545 if (this->client_list_[i] != 0)
546 this->client_list_[i]->cancel ();
549 for (i = 0; i < MAX_SERVERS; ++i)
551 if (this->server_list_[i] != 0)
552 this->server_list_[i]->cancel ();
556 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
557 for (i = 0; i < MAX_CLIENTS; ++i)
559 if (this->client_list_[i] != 0)
560 delete this->client_list_[i];
563 for (i = 0; i < MAX_SERVERS; ++i)
565 if (this->server_list_[i] != 0)
566 delete this->server_list_[i];
571 void
572 TestData::report (void)
574 // Print statistics
575 ACE_TCHAR bufs [256];
576 ACE_TCHAR bufr [256];
578 ACE_OS::snprintf (bufs, 256,
579 ACE_SIZE_T_FORMAT_SPECIFIER
580 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
581 this->clients_.w_cnt_.value (),
582 this->clients_.w_ops_.value ());
584 ACE_OS::snprintf (bufr, 256,
585 ACE_SIZE_T_FORMAT_SPECIFIER
586 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
587 this->clients_.r_cnt_.value (),
588 this->clients_.r_ops_.value ());
590 ACE_DEBUG ((LM_DEBUG,
591 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
592 bufs,
593 bufr));
595 ACE_OS::snprintf (bufs, 256,
596 ACE_SIZE_T_FORMAT_SPECIFIER
597 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
598 this->servers_.w_cnt_.value (),
599 this->servers_.w_ops_.value ());
601 ACE_OS::snprintf (bufr, 256,
602 ACE_SIZE_T_FORMAT_SPECIFIER
603 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
604 this->servers_.r_cnt_.value (),
605 this->servers_.r_ops_.value ());
607 ACE_DEBUG ((LM_DEBUG,
608 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
609 bufs,
610 bufr));
612 if (this->clients_.w_cnt_.value () == 0 ||
613 this->clients_.r_cnt_.value () == 0 ||
614 this->servers_.w_cnt_.value () == 0 ||
615 this->servers_.r_cnt_.value () == 0 )
616 ACE_ERROR ((LM_ERROR, ACE_TEXT ("It appears that this test didn't ")
617 ACE_TEXT ("really do anything. Something is very wrong.\n")));
621 class Acceptor : public ACE_Asynch_Acceptor<Server>
623 public:
624 Acceptor (TestData *tester);
625 virtual ~Acceptor (void);
627 // Virtual from ACE_Asynch_Acceptor
628 Server *make_handler (void);
630 private:
631 TestData *tester_;
634 // *************************************************************
635 Acceptor::Acceptor (TestData *tester)
636 : tester_ (tester)
640 Acceptor::~Acceptor (void)
642 this->cancel ();
645 Server *
646 Acceptor::make_handler (void)
648 return this->tester_->server_up ();
651 // ***************************************************
652 Server::Server ()
654 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
657 Server::Server (TestData *tester, int id)
658 : tester_ (tester),
659 id_ (id),
660 handle_ (ACE_INVALID_HANDLE),
661 io_count_ (0),
662 flg_cancel_(0),
663 total_snd_(0),
664 total_rcv_(0),
665 total_w_ (0),
666 total_r_ (0)
670 Server::~Server (void)
672 ACE_DEBUG ((LM_DEBUG,
673 ACE_TEXT ("(%t) Server %d dtor; %d sends (%d bytes); ")
674 ACE_TEXT ("%d recvs (%d bytes)\n"),
675 this->id_,
676 this->total_w_, this->total_snd_,
677 this->total_r_, this->total_rcv_));
678 if (this->io_count_ != 0)
679 ACE_ERROR ((LM_WARNING,
680 ACE_TEXT ("(%t) Server %d deleted with ")
681 ACE_TEXT ("%d I/O outstanding\n"),
682 this->id_,
683 this->io_count_));
685 // This test bounces data back and forth between Clients and Servers.
686 // Therefore, if there was significantly more data in one direction, that's
687 // a problem. Remember, the byte counts are unsigned values.
688 int issue_data_warning = 0;
689 if (this->total_snd_ > this->total_rcv_)
691 if (this->total_rcv_ == 0)
692 issue_data_warning = 1;
693 else if (this->total_snd_ / this->total_rcv_ > 2)
694 issue_data_warning = 1;
696 else
698 if (this->total_snd_ == 0)
699 issue_data_warning = 1;
700 else if (this->total_rcv_ / this->total_snd_ > 2)
701 issue_data_warning = 1;
703 if (issue_data_warning)
704 ACE_DEBUG ((LM_WARNING,
705 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
707 if (this->tester_ != 0)
708 this->tester_->server_done (this);
710 if (this->handle_ != ACE_INVALID_HANDLE)
711 ACE_OS::closesocket (this->handle_);
713 this->id_ = -1;
714 this->handle_= ACE_INVALID_HANDLE;
717 void
718 Server::cancel ()
720 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
722 this->flg_cancel_ = 1;
723 this->ws_.cancel ();
724 this->rs_.cancel ();
725 return;
729 void
730 Server::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&)
732 ACE_TCHAR str[256];
733 if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
734 ACE_DEBUG ((LM_DEBUG,
735 ACE_TEXT ("(%t) Server %d connection from %s\n"),
736 this->id_,
737 str));
738 else
739 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Server %d %p\n"),
740 this->id_,
741 ACE_TEXT ("addr_to_string")));
742 return;
746 void
747 Server::open (ACE_HANDLE handle, ACE_Message_Block &)
750 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
752 // Don't buffer serial sends.
753 this->handle_ = handle;
754 int nodelay = 1;
755 ACE_SOCK_Stream option_setter (handle);
756 if (-1 == option_setter.set_option (ACE_IPPROTO_TCP,
757 TCP_NODELAY,
758 &nodelay,
759 sizeof (nodelay)))
760 ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
762 if (this->ws_.open (*this, this->handle_) == -1)
763 ACE_ERROR ((LM_ERROR,
764 ACE_TEXT ("(%t) %p\n"),
765 ACE_TEXT ("Server::ACE_Asynch_Write_Stream::open")));
766 else if (this->rs_.open (*this, this->handle_) == -1)
767 ACE_ERROR ((LM_ERROR,
768 ACE_TEXT ("(%t) %p\n"),
769 ACE_TEXT ("Server::ACE_Asynch_Read_Stream::open")));
770 else
771 this->initiate_read_stream ();
773 if (this->io_count_ > 0)
774 return;
776 delete this;
780 Server::initiate_read_stream (void)
782 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
783 return -1;
785 ACE_Message_Block *mb = 0;
786 ACE_NEW_RETURN (mb,
787 ACE_Message_Block (1024), //BUFSIZ + 1),
788 -1);
790 // Inititiate read
791 if (this->rs_.read (*mb, mb->size () - 1) == -1)
793 mb->release ();
794 #if defined (ACE_WIN32)
795 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
796 // a 0-byte read as we would if underlying calls used WSARecv.
797 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
798 ACE_ERROR_RETURN ((LM_DEBUG,
799 ACE_TEXT ("(%t) Server %d, peer closed\n"),
800 this->id_),
801 -1);
802 #endif /* ACE_WIN32 */
803 ACE_ERROR_RETURN ((LM_ERROR,
804 ACE_TEXT ("(%t) Server %d, %p\n"),
805 this->id_,
806 ACE_TEXT ("read")),
807 -1);
810 this->io_count_++;
811 this->total_r_++;
812 return 0;
816 Server::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
818 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
820 mb.release ();
821 return -1;
824 if (nbytes == 0)
826 mb.release ();
827 ACE_ERROR_RETURN((LM_ERROR,
828 ACE_TEXT ("(%t) Server::ACE_Asynch_Write_Stream::write nbytes <0 ")),
829 -1);
832 if (this->ws_.write (mb, nbytes) == -1)
834 mb.release ();
835 #if defined (ACE_WIN32)
836 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
837 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
838 ACE_ERROR_RETURN ((LM_DEBUG,
839 ACE_TEXT ("(%t) Server %d, peer gone\n"),
840 this->id_),
841 -1);
842 #endif /* ACE_WIN32 */
843 ACE_ERROR_RETURN((LM_ERROR,
844 ACE_TEXT ("(%t) Server %d, %p\n"),
845 this->id_,
846 ACE_TEXT ("write")),
847 -1);
850 this->io_count_++;
851 this->total_w_++;
852 return 0;
855 void
856 Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
859 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
861 ACE_Message_Block & mb = result.message_block ();
863 // Reset pointers.
864 mb.rd_ptr ()[result.bytes_transferred ()] = '\0';
866 if (loglevel > 1)
868 LogLocker log_lock;
870 ACE_DEBUG ((LM_DEBUG,
871 ACE_TEXT ("(%t) **** Server %d: handle_read_stream() ****\n"),
872 this->id_));
873 ACE_DEBUG ((LM_DEBUG,
874 ACE_TEXT ("%s = %d\n"),
875 ACE_TEXT ("bytes_to_read"),
876 result.bytes_to_read ()));
877 ACE_DEBUG ((LM_DEBUG,
878 ACE_TEXT ("%s = %d\n"),
879 ACE_TEXT ("handle"),
880 result.handle ()));
881 ACE_DEBUG ((LM_DEBUG,
882 ACE_TEXT ("%s = %d\n"),
883 ACE_TEXT ("bytes_transfered"),
884 result.bytes_transferred ()));
885 ACE_DEBUG ((LM_DEBUG,
886 ACE_TEXT ("%s = %@\n"),
887 ACE_TEXT ("act"),
888 result.act ()));
889 ACE_DEBUG ((LM_DEBUG,
890 ACE_TEXT ("%s = %d\n"),
891 ACE_TEXT ("success"),
892 result.success ()));
893 ACE_DEBUG ((LM_DEBUG,
894 ACE_TEXT ("%s = %@\n"),
895 ACE_TEXT ("completion_key"),
896 result.completion_key ()));
897 ACE_DEBUG ((LM_DEBUG,
898 ACE_TEXT ("%s = %d\n"),
899 ACE_TEXT ("error"),
900 result.error ()));
901 ACE_DEBUG ((LM_DEBUG,
902 ACE_TEXT ("%s = %s\n"),
903 ACE_TEXT ("message_block"),
904 mb.rd_ptr ()));
905 ACE_DEBUG ((LM_DEBUG,
906 ACE_TEXT ("**** end of message ****************\n")));
908 else if (result.error () != 0)
910 ACE_Log_Priority prio;
911 #if defined (ACE_WIN32)
912 if (result.error () == ERROR_OPERATION_ABORTED)
913 prio = LM_DEBUG;
914 #else
915 if (result.error () == ECANCELED)
916 prio = LM_DEBUG;
917 #endif /* ACE_WIN32 */
918 else
919 prio = LM_ERROR;
920 ACE_Log_Msg::instance ()->errnum (result.error ());
921 ACE_Log_Msg::instance ()->log (prio,
922 ACE_TEXT ("(%t) Server %d; %p\n"),
923 this->id_,
924 ACE_TEXT ("read"));
926 else if (loglevel > 0)
928 ACE_DEBUG ((LM_DEBUG,
929 ACE_TEXT ("(%t) Server %d: read %d bytes\n"),
930 this->id_,
931 result.bytes_transferred ()));
934 if (result.error () == 0 && result.bytes_transferred () > 0)
936 this->total_rcv_ += result.bytes_transferred ();
938 if (this->initiate_write_stream (mb,
939 result.bytes_transferred ()) == 0)
941 if (duplex != 0) // Initiate new read from the stream.
942 this->initiate_read_stream ();
945 else
946 mb.release ();
948 --this->io_count_;
949 if (this->io_count_ > 0)
950 return;
952 delete this;
955 void
956 Server::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
959 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
961 ACE_Message_Block & mb = result.message_block ();
963 if (loglevel > 1)
965 LogLocker log_lock;
967 //mb.rd_ptr () [0] = '\0';
968 mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
970 ACE_DEBUG ((LM_DEBUG,
971 ACE_TEXT ("(%t) **** Server %d: handle_write_stream() ****\n"),
972 this->id_));
973 ACE_DEBUG ((LM_DEBUG,
974 ACE_TEXT ("%s = %d\n"),
975 ACE_TEXT ("bytes_to_write"),
976 result.bytes_to_write ()));
977 ACE_DEBUG ((LM_DEBUG,
978 ACE_TEXT ("%s = %d\n"),
979 ACE_TEXT ("handle"),
980 result.handle ()));
981 ACE_DEBUG ((LM_DEBUG,
982 ACE_TEXT ("%s = %d\n"),
983 ACE_TEXT ("bytes_transfered"),
984 result.bytes_transferred ()));
985 ACE_DEBUG ((LM_DEBUG,
986 ACE_TEXT ("%s = %@\n"),
987 ACE_TEXT ("act"),
988 result.act ()));
989 ACE_DEBUG ((LM_DEBUG,
990 ACE_TEXT ("%s = %d\n"),
991 ACE_TEXT ("success"),
992 result.success ()));
993 ACE_DEBUG ((LM_DEBUG,
994 ACE_TEXT ("%s = %@\n"),
995 ACE_TEXT ("completion_key"),
996 result.completion_key ()));
997 ACE_DEBUG ((LM_DEBUG,
998 ACE_TEXT ("%s = %d\n"),
999 ACE_TEXT ("error"),
1000 result.error ()));
1001 ACE_DEBUG ((LM_DEBUG,
1002 ACE_TEXT ("%s = %s\n"),
1003 ACE_TEXT ("message_block"),
1004 mb.rd_ptr ()));
1005 ACE_DEBUG ((LM_DEBUG,
1006 ACE_TEXT ("**** end of message ****************\n")));
1008 else if (result.error () != 0)
1010 ACE_Log_Priority prio;
1011 #if defined (ACE_WIN32)
1012 if (result.error () == ERROR_OPERATION_ABORTED)
1013 prio = LM_DEBUG;
1014 #else
1015 if (result.error () == ECANCELED)
1016 prio = LM_DEBUG;
1017 #endif /* ACE_WIN32 */
1018 else
1019 prio = LM_ERROR;
1020 ACE_Log_Msg::instance ()->errnum (result.error ());
1021 ACE_Log_Msg::instance ()->log (prio,
1022 ACE_TEXT ("(%t) Server %d; %p\n"),
1023 this->id_,
1024 ACE_TEXT ("write"));
1026 else if (loglevel > 0)
1028 ACE_DEBUG ((LM_DEBUG,
1029 ACE_TEXT ("(%t) Server %d: wrote %d bytes ok\n"),
1030 this->id_,
1031 result.bytes_transferred ()));
1034 mb.release ();
1036 if (result.error () == 0 && result.bytes_transferred () > 0)
1038 this->total_snd_ += result.bytes_transferred ();
1040 if (duplex == 0)
1041 this->initiate_read_stream ();
1044 --this->io_count_;
1045 if (this->io_count_ > 0)
1046 return;
1048 delete this;
1051 // *******************************************
1052 // Connector
1053 // *******************************************
1055 class Connector : public ACE_Asynch_Connector<Client>
1057 public:
1058 Connector (TestData *tester);
1059 virtual ~Connector (void);
1061 int start (const ACE_INET_Addr &addr, int num);
1063 // Virtual from ACE_Asynch_Connector
1064 Client *make_handler (void);
1066 private:
1067 TestData *tester_;
1070 // *************************************************************
1072 Connector::Connector (TestData *tester)
1073 : tester_ (tester)
1077 Connector::~Connector (void)
1079 this->cancel ();
1082 Client *
1083 Connector::make_handler (void)
1085 return this->tester_->client_up ();
1090 Connector::start (const ACE_INET_Addr& addr, int num)
1092 if (num > MAX_CLIENTS)
1093 num = MAX_CLIENTS;
1095 if (num < 0)
1096 num = 1;
1098 int rc = 0;
1100 // int open ( int pass_addresses = 0,
1101 // ACE_Proactor *proactor = 0,
1102 // int validate_new_connection = 0 );
1104 if (this->open (1, 0, 1) != 0)
1106 ACE_ERROR ((LM_ERROR,
1107 ACE_TEXT ("(%t) %p\n"),
1108 ACE_TEXT ("Connector::open failed")));
1109 return rc;
1112 for (; rc < num; rc++)
1114 ACE_INET_Addr localAddr;
1115 if (this->connect (addr, localAddr) != 0)
1117 ACE_ERROR ((LM_ERROR,
1118 ACE_TEXT ("(%t) %p\n"),
1119 ACE_TEXT ("Connector::connect failed for IPv6")));
1120 break;
1123 return rc;
1127 Client::Client ()
1129 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
1132 Client::Client (TestData *tester, int id)
1133 : tester_ (tester),
1134 id_ (id),
1135 handle_ (ACE_INVALID_HANDLE),
1136 io_count_ (0),
1137 stop_writing_ (0),
1138 flg_cancel_ (0),
1139 total_snd_ (0),
1140 total_rcv_ (0),
1141 total_w_ (0),
1142 total_r_ (0)
1146 Client::~Client (void)
1148 ACE_DEBUG ((LM_DEBUG,
1149 ACE_TEXT ("(%t) Client %d dtor; %d sends (%d bytes); ")
1150 ACE_TEXT ("%d recvs (%d bytes)\n"),
1151 this->id_,
1152 this->total_w_, this->total_snd_,
1153 this->total_r_, this->total_rcv_));
1154 if (this->io_count_ != 0)
1155 ACE_ERROR ((LM_WARNING,
1156 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1157 this->id_,
1158 this->io_count_));
1160 // This test bounces data back and forth between Clients and Servers.
1161 // Therefore, if there was significantly more data in one direction, that's
1162 // a problem. Remember, the byte counts are unsigned values.
1163 int issue_data_warning = 0;
1164 if (this->total_snd_ > this->total_rcv_)
1166 if (this->total_rcv_ == 0)
1167 issue_data_warning = 1;
1168 else if (this->total_snd_ / this->total_rcv_ > 2)
1169 issue_data_warning = 1;
1171 else
1173 if (this->total_snd_ == 0)
1174 issue_data_warning = 1;
1175 else if (this->total_rcv_ / this->total_snd_ > 2)
1176 issue_data_warning = 1;
1178 if (issue_data_warning)
1179 ACE_DEBUG ((LM_WARNING,
1180 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1182 if (this->tester_ != 0)
1183 this->tester_->client_done (this);
1185 this->id_ = -1;
1186 this->handle_= ACE_INVALID_HANDLE;
1187 if (this->handle_ != ACE_INVALID_HANDLE)
1189 ACE_OS::closesocket (this->handle_);
1191 this->handle_= ACE_INVALID_HANDLE;
1194 void
1195 Client::cancel ()
1197 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1199 this->flg_cancel_ = 1;
1200 this->ws_.cancel ();
1201 this->rs_.cancel ();
1202 return;
1205 void
1206 Client::close ()
1208 // This must be called with the lock_ held.
1209 ACE_DEBUG ((LM_DEBUG,
1210 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1211 this->id_, this->io_count_));
1212 ACE_OS::shutdown (this->handle_, ACE_SHUTDOWN_WRITE);
1213 this->stop_writing_ = 1;
1214 return;
1218 void
1219 Client::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr& local)
1221 char my_name[256];
1222 char peer_name[256];
1223 ACE_TCHAR local_str[256];
1224 ACE_INET_Addr addr ((u_short) 0, host);
1226 // This checks to make sure the peer address given to us matches what
1227 // we expect it to be.
1228 if (0 != peer.get_host_addr (peer_name, sizeof (peer_name)))
1230 if (0 != addr.get_host_addr (my_name, sizeof (my_name)))
1232 if (0 != ACE_OS::strncmp (peer_name, my_name, sizeof (my_name)))
1234 ACE_ERROR
1235 ((LM_ERROR,
1236 ACE_TEXT ("(%t) Sender %d peer address (%C) does not ")
1237 ACE_TEXT ("match host address (%C)\n"),
1238 this->id_,
1239 peer_name, my_name));
1240 return;
1243 else
1245 ACE_ERROR
1246 ((LM_ERROR,
1247 ACE_TEXT ("(%t) Sender %d unable to convert host addr\n"),
1248 this->id_));
1249 return;
1252 else
1254 ACE_ERROR ((LM_ERROR,
1255 ACE_TEXT ("(%t) Sender %d unable to convert peer addr\n"),
1256 this->id_));
1257 return;
1260 if (0 == local.addr_to_string (local_str,
1261 sizeof (local_str) / sizeof (ACE_TCHAR)))
1262 ACE_DEBUG ((LM_DEBUG,
1263 ACE_TEXT ("(%t) Client %d connected on %s\n"),
1264 this->id_,
1265 local_str));
1266 else
1267 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Client %d %p\n"),
1268 this->id_,
1269 ACE_TEXT ("addr_to_string")));
1270 return;
1274 void
1275 Client::open (ACE_HANDLE handle, ACE_Message_Block &)
1278 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1280 // Don't buffer serial sends.
1281 this->handle_ = handle;
1282 int nodelay = 1;
1283 ACE_SOCK_Stream option_setter (handle);
1284 if (option_setter.set_option (ACE_IPPROTO_TCP,
1285 TCP_NODELAY,
1286 &nodelay,
1287 sizeof (nodelay)))
1288 ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
1290 // Open ACE_Asynch_Write_Stream
1291 if (this->ws_.open (*this, this->handle_) == -1)
1292 ACE_ERROR ((LM_ERROR,
1293 ACE_TEXT ("(%t) %p\n"),
1294 ACE_TEXT ("Client::ACE_Asynch_Write_Stream::open")));
1296 // Open ACE_Asynch_Read_Stream
1297 else if (this->rs_.open (*this, this->handle_) == -1)
1298 ACE_ERROR ((LM_ERROR,
1299 ACE_TEXT ("(%t) %p\n"),
1300 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::open")));
1302 else if (this->initiate_write_stream () == 0)
1304 if (duplex != 0) // Start an asynchronous read
1305 this->initiate_read_stream ();
1308 if (this->io_count_ > 0)
1309 return;
1311 delete this;
1315 Client::initiate_write_stream (void)
1317 if (this->flg_cancel_ != 0 ||
1318 this->stop_writing_ ||
1319 this->handle_ == ACE_INVALID_HANDLE)
1320 return -1;
1322 static const size_t complete_message_length = ACE_OS::strlen (complete_message);
1324 #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
1326 ACE_Message_Block *mb1 = 0,
1327 *mb2 = 0,
1328 *mb3 = 0;
1330 // No need to allocate +1 for proper printing - the memory includes it already
1331 ACE_NEW_RETURN (mb1,
1332 ACE_Message_Block ((char *)complete_message,
1333 complete_message_length),
1334 -1);
1336 ACE_NEW_RETURN (mb2,
1337 ACE_Message_Block ((char *)complete_message,
1338 complete_message_length),
1339 -1);
1341 ACE_NEW_RETURN (mb3,
1342 ACE_Message_Block ((char *)complete_message,
1343 complete_message_length),
1344 -1);
1346 mb1->wr_ptr (complete_message_length);
1347 mb2->wr_ptr (complete_message_length);
1348 mb3->wr_ptr (complete_message_length);
1350 // chain them together
1351 mb1->cont (mb2);
1352 mb2->cont (mb3);
1354 if (this->ws_.writev (*mb1, mb1->total_length ()) == -1)
1356 mb1->release ();
1357 ACE_ERROR_RETURN((LM_ERROR,
1358 ACE_TEXT ("(%t) %p\n"),
1359 ACE_TEXT ("Client::ACE_Asynch_Stream::writev")),
1360 -1);
1362 #else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1364 ACE_Message_Block *mb = 0;
1366 // No need to allocate +1 for proper printing - the memory includes it already
1367 ACE_NEW_RETURN (mb,
1368 ACE_Message_Block (complete_message, complete_message_length),
1369 -1);
1370 mb->wr_ptr (complete_message_length);
1372 if (this->ws_.write (*mb, mb->length ()) == -1)
1374 mb->release ();
1375 #if defined (ACE_WIN32)
1376 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
1377 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
1378 ACE_ERROR_RETURN ((LM_DEBUG,
1379 ACE_TEXT ("(%t) Client %d, peer gone\n"),
1380 this->id_),
1381 -1);
1382 #endif /* ACE_WIN32 */
1383 ACE_ERROR_RETURN((LM_ERROR,
1384 ACE_TEXT ("(%t) Client %d, %p\n"),
1385 this->id_,
1386 ACE_TEXT ("write")),
1387 -1);
1389 #endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1391 this->io_count_++;
1392 this->total_w_++;
1393 return 0;
1397 Client::initiate_read_stream (void)
1399 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
1400 return -1;
1402 static const size_t complete_message_length =
1403 ACE_OS::strlen (complete_message);
1405 #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
1406 ACE_Message_Block *mb1 = 0,
1407 *mb2 = 0,
1408 *mb3 = 0,
1409 *mb4 = 0,
1410 *mb5 = 0,
1411 *mb6 = 0;
1413 // We allocate +1 only for proper printing - we can just set the last byte
1414 // to '\0' before printing out
1415 ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1);
1416 ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1);
1417 ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1);
1419 // Let allocate memory for one more triplet,
1420 // This improves performance
1421 // as we can receive more the than one block at once
1422 // Generally, we can receive more triplets ....
1423 ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1);
1424 ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1);
1425 ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1);
1427 mb1->cont (mb2);
1428 mb2->cont (mb3);
1430 mb3->cont (mb4);
1431 mb4->cont (mb5);
1432 mb5->cont (mb6);
1435 // hide last byte in each message block, reserving it for later to set '\0'
1436 // for proper printouts
1437 mb1->size (mb1->size () - 1);
1438 mb2->size (mb2->size () - 1);
1439 mb3->size (mb3->size () - 1);
1441 mb4->size (mb4->size () - 1);
1442 mb5->size (mb5->size () - 1);
1443 mb6->size (mb6->size () - 1);
1445 // Inititiate read
1446 if (this->rs_.readv (*mb1, mb1->total_size () - 1) == -1)
1448 mb1->release ();
1449 ACE_ERROR_RETURN ((LM_ERROR,
1450 ACE_TEXT ("(%t) %p\n"),
1451 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1452 -1);
1454 #else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1456 // Try to read more chunks
1457 size_t blksize = ( complete_message_length > BUFSIZ ) ?
1458 complete_message_length : BUFSIZ;
1460 ACE_Message_Block *mb = 0;
1462 // We allocate +1 only for proper printing - we can just set the last byte
1463 // to '\0' before printing out
1464 ACE_NEW_RETURN (mb,
1465 ACE_Message_Block (blksize + 1),
1466 -1);
1468 // Inititiate read
1469 if (this->rs_.read (*mb, mb->size () - 1) == -1)
1471 mb->release ();
1472 #if defined (ACE_WIN32)
1473 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
1474 // a 0-byte read as we would if underlying calls used WSARecv.
1475 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
1476 ACE_ERROR_RETURN ((LM_DEBUG,
1477 ACE_TEXT ("(%t) Client %d, peer closed\n"),
1478 this->id_),
1479 -1);
1480 #endif /* ACE_WIN32 */
1481 ACE_ERROR_RETURN ((LM_ERROR,
1482 ACE_TEXT ("(%t) Client %d, %p\n"),
1483 this->id_,
1484 ACE_TEXT ("read")),
1485 -1);
1487 #endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1489 this->io_count_++;
1490 this->total_r_++;
1491 return 0;
1494 void
1495 Client::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
1498 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1500 ACE_Message_Block & mb = result.message_block ();
1502 if (loglevel > 1)
1504 LogLocker log_lock;
1506 ACE_DEBUG ((LM_DEBUG,
1507 ACE_TEXT ("(%t) **** Client %d: handle_write_stream() ****\n"),
1508 this->id_));
1509 ACE_DEBUG ((LM_DEBUG,
1510 ACE_TEXT ("%s = %d\n"),
1511 ACE_TEXT ("bytes_to_write"),
1512 result.bytes_to_write ()));
1513 ACE_DEBUG ((LM_DEBUG,
1514 ACE_TEXT ("%s = %d\n"),
1515 ACE_TEXT ("handle"),
1516 result.handle ()));
1517 ACE_DEBUG ((LM_DEBUG,
1518 ACE_TEXT ("%s = %d\n"),
1519 ACE_TEXT ("bytes_transfered"),
1520 result.bytes_transferred ()));
1521 ACE_DEBUG ((LM_DEBUG,
1522 ACE_TEXT ("%s = %@\n"),
1523 ACE_TEXT ("act"),
1524 result.act ()));
1525 ACE_DEBUG ((LM_DEBUG,
1526 ACE_TEXT ("%s = %d\n"),
1527 ACE_TEXT ("success"),
1528 result.success ()));
1529 ACE_DEBUG ((LM_DEBUG,
1530 ACE_TEXT ("%s = %@\n"),
1531 ACE_TEXT ("completion_key"),
1532 result.completion_key ()));
1533 ACE_DEBUG ((LM_DEBUG,
1534 ACE_TEXT ("%s = %d\n"),
1535 ACE_TEXT ("error"),
1536 result.error ()));
1538 #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
1539 size_t bytes_transferred = result.bytes_transferred ();
1540 char index = 0;
1541 for (ACE_Message_Block* mb_i = &mb;
1542 (mb_i != 0) && (bytes_transferred > 0);
1543 mb_i = mb_i->cont ())
1545 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1546 mb_i->rd_ptr()[0] = '\0';
1548 size_t len = mb_i->rd_ptr () - mb_i->base ();
1550 // move rd_ptr backwards as required for printout
1551 if (len >= bytes_transferred)
1553 mb_i->rd_ptr (0 - bytes_transferred);
1554 bytes_transferred = 0;
1556 else
1558 mb_i->rd_ptr (0 - len);
1559 bytes_transferred -= len;
1562 ++index;
1563 ACE_DEBUG ((LM_DEBUG,
1564 ACE_TEXT ("%s%d = %s\n"),
1565 ACE_TEXT ("message_block, part "),
1566 index,
1567 mb_i->rd_ptr ()));
1569 #else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1570 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1571 mb.rd_ptr()[0] = '\0';
1572 // move rd_ptr backwards as required for printout
1573 mb.rd_ptr (- result.bytes_transferred ());
1574 ACE_DEBUG ((LM_DEBUG,
1575 ACE_TEXT ("%s = %s\n"),
1576 ACE_TEXT ("message_block"),
1577 mb.rd_ptr ()));
1578 #endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
1580 ACE_DEBUG ((LM_DEBUG,
1581 ACE_TEXT ("**** end of message ****************\n")));
1583 else if (result.error () != 0)
1585 ACE_Log_Priority prio;
1586 #if defined (ACE_WIN32)
1587 if (result.error () == ERROR_OPERATION_ABORTED)
1588 prio = LM_DEBUG;
1589 #else
1590 if (result.error () == ECANCELED)
1591 prio = LM_DEBUG;
1592 #endif /* ACE_WIN32 */
1593 else
1594 prio = LM_ERROR;
1595 ACE_Log_Msg::instance ()->errnum (result.error ());
1596 ACE_Log_Msg::instance ()->log (prio,
1597 ACE_TEXT ("(%t) Client %d; %p\n"),
1598 this->id_,
1599 ACE_TEXT ("write"));
1601 else if (loglevel > 0)
1603 ACE_DEBUG ((LM_DEBUG,
1604 ACE_TEXT ("(%t) Client %d: wrote %d bytes ok\n"),
1605 this->id_,
1606 result.bytes_transferred ()));
1609 mb.release ();
1611 if (result.error () == 0 && result.bytes_transferred () > 0)
1613 this->total_snd_ += result.bytes_transferred ();
1614 if (this->total_snd_ >= xfer_limit)
1616 ACE_DEBUG ((LM_DEBUG,
1617 ACE_TEXT ("(%t) Client %d sent %d, limit %d\n"),
1618 this->id_, this->total_snd_, xfer_limit));
1619 this->close ();
1621 if (duplex != 0) // full duplex, continue write
1623 if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
1624 this->initiate_write_stream ();
1626 else // half-duplex read reply, after read we will start write
1627 this->initiate_read_stream ();
1630 --this->io_count_;
1631 if (this->io_count_ > 0)
1632 return;
1634 delete this;
1637 void
1638 Client::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
1641 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1643 ACE_Message_Block & mb = result.message_block ();
1645 if (loglevel > 1)
1647 LogLocker log_lock;
1649 ACE_DEBUG ((LM_DEBUG,
1650 ACE_TEXT ("(%t) **** Client %d: handle_read_stream() ****\n"),
1651 this->id_));
1652 ACE_DEBUG ((LM_DEBUG,
1653 ACE_TEXT ("%s = %d\n"),
1654 ACE_TEXT ("bytes_to_read"),
1655 result.bytes_to_read ()));
1656 ACE_DEBUG ((LM_DEBUG,
1657 ACE_TEXT ("%s = %d\n"),
1658 ACE_TEXT ("handle"),
1659 result.handle ()));
1660 ACE_DEBUG ((LM_DEBUG,
1661 ACE_TEXT ("%s = %d\n"),
1662 ACE_TEXT ("bytes_transfered"),
1663 result.bytes_transferred ()));
1664 ACE_DEBUG ((LM_DEBUG,
1665 ACE_TEXT ("%s = %@\n"),
1666 ACE_TEXT ("act"),
1667 result.act ()));
1668 ACE_DEBUG ((LM_DEBUG,
1669 ACE_TEXT ("%s = %d\n"),
1670 ACE_TEXT ("success"),
1671 result.success ()));
1672 ACE_DEBUG ((LM_DEBUG,
1673 ACE_TEXT ("%s = %@\n"),
1674 ACE_TEXT ("completion_key"),
1675 result.completion_key ()));
1676 ACE_DEBUG ((LM_DEBUG,
1677 ACE_TEXT ("%s = %d\n"),
1678 ACE_TEXT ("error"),
1679 result.error ()));
1681 #if defined (ACE_WIN32)
1682 char index = 0;
1683 for (ACE_Message_Block* mb_i = &mb;
1684 mb_i != 0;
1685 mb_i = mb_i->cont ())
1687 ++index;
1688 // write 0 at string end for proper printout
1689 mb_i->wr_ptr()[0] = '\0';
1691 ACE_DEBUG ((LM_DEBUG,
1692 ACE_TEXT ("%s%d = %s\n"),
1693 ACE_TEXT ("message_block, part "),
1694 index,
1695 mb_i->rd_ptr ()));
1697 #else /* ACE_WIN32 */
1698 // write 0 at string end for proper printout
1699 mb.rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout
1700 ACE_DEBUG ((LM_DEBUG,
1701 ACE_TEXT ("%s = %s\n"),
1702 ACE_TEXT ("message_block"),
1703 mb.rd_ptr ()));
1704 #endif /* ACE_WIN32 */
1706 ACE_DEBUG ((LM_DEBUG,
1707 ACE_TEXT ("**** end of message ****************\n")));
1709 else if (result.error () != 0)
1711 ACE_Log_Priority prio;
1712 #if defined (ACE_WIN32)
1713 if (result.error () == ERROR_OPERATION_ABORTED)
1714 prio = LM_DEBUG;
1715 #else
1716 if (result.error () == ECANCELED)
1717 prio = LM_DEBUG;
1718 #endif /* ACE_WIN32 */
1719 else
1720 prio = LM_ERROR;
1721 ACE_Log_Msg::instance ()->errnum (result.error ());
1722 ACE_Log_Msg::instance ()->log (prio,
1723 ACE_TEXT ("(%t) Client %d; %p\n"),
1724 this->id_,
1725 ACE_TEXT ("read"));
1727 else if (loglevel > 0)
1729 ACE_DEBUG ((LM_DEBUG,
1730 ACE_TEXT ("(%t) Client %d: read %d bytes ok\n"),
1731 this->id_,
1732 result.bytes_transferred ()));
1735 mb.release ();
1737 if (result.error () == 0 && result.bytes_transferred () > 0)
1739 this->total_rcv_ += result.bytes_transferred ();
1741 if (duplex != 0 || this->stop_writing_) // full duplex, continue read
1742 this->initiate_read_stream ();
1743 else // half-duplex write, after write we will start read
1744 this->initiate_write_stream ();
1747 --this->io_count_;
1748 if (this->io_count_ > 0)
1749 return;
1751 delete this;
1754 // *************************************************************
1755 // Configuration helpers
1756 // *************************************************************
1758 print_usage (int /* argc */, ACE_TCHAR *argv[])
1760 ACE_ERROR
1761 ((LM_ERROR,
1762 ACE_TEXT ("\nusage: %s")
1763 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1764 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1765 ACE_TEXT ("\n a AIOCB")
1766 ACE_TEXT ("\n i SIG")
1767 ACE_TEXT ("\n c CB")
1768 ACE_TEXT ("\n s SUN")
1769 ACE_TEXT ("\n d default")
1770 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1771 ACE_TEXT ("\n-h <host> for Client mode")
1772 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1773 ACE_TEXT ("\n-p <port to listen/connect>")
1774 ACE_TEXT ("\n-c <number of client instances>")
1775 ACE_TEXT ("\n-b run client and server at the same time")
1776 ACE_TEXT ("\n f file")
1777 ACE_TEXT ("\n c console")
1778 ACE_TEXT ("\n-v log level")
1779 ACE_TEXT ("\n 0 - log errors and highlights")
1780 ACE_TEXT ("\n 1 - log level 0 plus progress information")
1781 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
1782 ACE_TEXT ("\n-x max transfer byte count per Client")
1783 ACE_TEXT ("\n-u show this message")
1784 ACE_TEXT ("\n"),
1785 argv[0]
1787 return -1;
1790 static int
1791 set_proactor_type (const ACE_TCHAR *ptype)
1793 if (!ptype)
1794 return 0;
1796 switch (ACE_OS::ace_toupper (*ptype))
1798 case 'D':
1799 proactor_type = DEFAULT;
1800 return 1;
1801 case 'A':
1802 proactor_type = AIOCB;
1803 return 1;
1804 case 'I':
1805 proactor_type = SIG;
1806 return 1;
1807 #if defined (sun)
1808 case 'S':
1809 proactor_type = SUN;
1810 return 1;
1811 #endif /* sun */
1812 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
1813 case 'C':
1814 proactor_type = CB;
1815 return 1;
1816 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
1817 default:
1818 break;
1820 return 0;
1823 static int
1824 parse_args (int argc, ACE_TCHAR *argv[])
1826 // First, set up all the defaults then let any args change them.
1827 both = 1; // client and server simultaneosly
1828 duplex = 1; // full duplex is on
1829 #if defined (ACE_HAS_IPV6)
1830 host = ACE_IPV6_LOCALHOST; // server to connect (IPv6 localhost)
1831 #else
1832 host = ACE_LOCALHOST;
1833 #endif /*ACE_HAS_IPV6*/
1834 port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
1835 max_aio_operations = 512; // POSIX Proactor params
1836 proactor_type = DEFAULT; // Proactor type = default
1837 threads = 3; // size of Proactor thread pool
1838 clients = 10; // number of clients
1839 loglevel = 0; // log level : only errors and highlights
1840 // Default transfer limit 50 messages per Sender
1841 xfer_limit = 50 * ACE_OS::strlen (complete_message);
1843 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
1844 # if defined (ACE_LINUX)
1845 duplex = 0;
1846 #endif
1848 if (argc == 1) // no arguments , so one button test
1849 return 0;
1851 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
1852 int c;
1854 while ((c = get_opt ()) != EOF)
1856 switch (c)
1858 case 'x': // xfer limit
1859 xfer_limit = static_cast<size_t> (ACE_OS::atoi (get_opt.opt_arg ()));
1860 if (xfer_limit == 0)
1861 xfer_limit = 1; // Bare minimum.
1862 break;
1863 case 'b': // both client and server
1864 both = 1;
1865 break;
1866 case 'v': // log level
1867 loglevel = ACE_OS::atoi (get_opt.opt_arg ());
1868 break;
1869 case 'd': // duplex
1870 duplex = ACE_OS::atoi (get_opt.opt_arg ());
1871 break;
1872 case 'h': // host for sender
1873 host = get_opt.opt_arg ();
1874 break;
1875 case 'p': // port number
1876 port = ACE_OS::atoi (get_opt.opt_arg ());
1877 break;
1878 case 'n': // thread pool size
1879 threads = ACE_OS::atoi (get_opt.opt_arg ());
1880 break;
1881 case 'c': // number of clients
1882 clients = ACE_OS::atoi (get_opt.opt_arg ());
1883 if (clients > MAX_CLIENTS)
1884 clients = MAX_CLIENTS;
1885 break;
1886 case 'o': // max number of aio for proactor
1887 max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
1888 break;
1889 case 't': // Proactor Type
1890 if (set_proactor_type (get_opt.opt_arg ()))
1891 break;
1892 return print_usage (argc, argv);
1893 case 'u':
1894 default:
1895 return print_usage (argc, argv);
1896 } // switch
1897 } // while
1899 if (proactor_type == SUN && threads > 1)
1901 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sun aiowait is not thread-safe; ")
1902 ACE_TEXT ("changing to 1 thread\n")));
1903 threads = 1;
1906 return 0;
1910 run_main (int argc, ACE_TCHAR *argv[])
1912 ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPV6"));
1914 if (::parse_args (argc, argv) == -1)
1915 return -1;
1917 #if defined (ACE_HAS_IPV6)
1918 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
1919 disable_signal (SIGPIPE, SIGPIPE);
1921 MyTask task1;
1922 TestData test;
1924 if (task1.start (threads, proactor_type, max_aio_operations) == 0)
1926 Acceptor acceptor (&test);
1927 Connector connector (&test);
1928 ACE_INET_Addr addr (port, "::");
1930 int rc = 0;
1932 if (both != 0 || host == 0) // Acceptor
1934 // Simplify, initial read with zero size
1935 if (acceptor.open (addr, 0, 1) == 0)
1936 rc = 1;
1939 if (both != 0 || host != 0)
1941 if (host == 0)
1942 host = ACE_IPV6_LOCALHOST;
1944 if (addr.set (port, host, 1, addr.get_type ()) == -1)
1945 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
1946 else
1947 rc += connector.start (addr, clients);
1950 // Wait a few seconds to let things get going, then poll til
1951 // all sessions are done. Note that when we exit this scope, the
1952 // Acceptor and Connector will be destroyed, which should prevent
1953 // further connections and also test how well destroyed handlers
1954 // are handled.
1955 ACE_OS::sleep (3);
1957 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
1958 while (!test.testing_done ())
1959 ACE_OS::sleep (1);
1961 test.stop_all ();
1963 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
1964 task1.stop ();
1966 #endif /* ACE_HAS_IPV6 */
1968 ACE_END_TEST;
1970 return 0;
1973 #else
1976 run_main (int, ACE_TCHAR *[])
1978 ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPV6"));
1980 ACE_DEBUG ((LM_INFO,
1981 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
1982 ACE_TEXT ("Proactor_Test_IPV6 will not be run.\n")));
1984 ACE_END_TEST;
1986 return 0;
1989 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */