Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Proactor_Test.cpp
blob6424635ae5246189523ec37a1f8d6c0530fda6ef
1 // ============================================================================
2 /**
3 * @file Proactor_Test.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that does various asynchronous
7 * operations.
9 * @author Alexander Libman <alibman@baltimore.com>
11 // ============================================================================
13 #include "test_config.h"
15 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
16 // This only works on Win32 platforms and on Unix platforms
17 // supporting POSIX aio calls.
19 #include "ace/Signal.h"
21 #include "ace/Service_Config.h"
22 #include "ace/INET_Addr.h"
23 #include "ace/SOCK_Connector.h"
24 #include "ace/SOCK_Acceptor.h"
25 #include "ace/SOCK_Stream.h"
26 #include "ace/Object_Manager.h"
27 #include "ace/Get_Opt.h"
29 #include "ace/Proactor.h"
30 #include "ace/Asynch_Acceptor.h"
31 #include "ace/Asynch_Connector.h"
32 #include "ace/Task.h"
33 #include "ace/Thread_Semaphore.h"
34 #include "ace/OS_NS_ctype.h"
35 #include "ace/OS_NS_errno.h"
36 #include "ace/OS_NS_signal.h"
37 #include "ace/OS_NS_string.h"
38 #include "ace/OS_NS_unistd.h"
39 #include "ace/OS_NS_sys_socket.h"
40 #include "ace/os_include/netinet/os_tcp.h"
42 #include "ace/Atomic_Op.h"
43 #include "ace/Synch_Traits.h"
45 #if defined (ACE_WIN32)
47 # include "ace/WIN32_Proactor.h"
49 #elif defined (ACE_HAS_AIO_CALLS)
51 # include "ace/POSIX_Proactor.h"
52 # include "ace/POSIX_CB_Proactor.h"
53 # include "ace/SUN_Proactor.h"
55 #endif /* ACE_WIN32 */
57 #include "Proactor_Test.h"
60 // Proactor Type (UNIX only, Win32 ignored)
61 typedef enum { DEFAULT = 0, AIOCB, SIG, SUN, CB } ProactorType;
62 static ProactorType proactor_type = DEFAULT;
64 // POSIX : > 0 max number aio operations proactor,
65 static size_t max_aio_operations = 0;
67 // both: 0 run client or server / depends on host
68 // != 0 run client and server
69 static int both = 0;
71 // Host that we're connecting to.
72 static const ACE_TCHAR *host = 0;
74 // number of Client instances
75 static int clients = 1;
76 const int MAX_CLIENTS = 1000;
77 const int MAX_SERVERS = 1000;
79 // duplex mode: == 0 half-duplex
80 // != 0 full duplex
81 static int duplex = 0;
83 // number threads in the Proactor thread pool
84 static int threads = 1;
86 // Port that we're receiving connections on.
87 static u_short port = ACE_DEFAULT_SERVER_PORT;
89 // Log options
90 static int loglevel; // 0 full , 1 only errors
92 static size_t xfer_limit; // Number of bytes for Client to send.
94 static char complete_message[] =
95 "GET / HTTP/1.1\r\n"
96 "Accept: */*\r\n"
97 "Accept-Language: C++\r\n"
98 "Accept-Encoding: gzip, deflate\r\n"
99 "User-Agent: Proactor_Test/1.0 (non-compatible)\r\n"
100 "Connection: Keep-Alive\r\n"
101 "\r\n";
103 class LogLocker
105 public:
107 LogLocker () { ACE_LOG_MSG->acquire (); }
108 virtual ~LogLocker () { ACE_LOG_MSG->release (); }
113 // Function to remove signals from the signal mask.
114 static int
115 disable_signal (int sigmin, int sigmax)
117 #if !defined (ACE_LACKS_UNIX_SIGNALS)
118 sigset_t signal_set;
119 if (ACE_OS::sigemptyset (&signal_set) == - 1)
120 ACE_ERROR ((LM_ERROR,
121 ACE_TEXT ("Error: (%P|%t):%p\n"),
122 ACE_TEXT ("sigemptyset failed")));
124 for (int i = sigmin; i <= sigmax; i++)
125 ACE_OS::sigaddset (&signal_set, i);
127 // Put the <signal_set>.
128 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
129 // In multi-threaded application this is not POSIX compliant
130 // but let's leave it just in case.
131 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
132 # else
133 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
134 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
135 ACE_ERROR_RETURN ((LM_ERROR,
136 ACE_TEXT ("Error: (%P|%t): %p\n"),
137 ACE_TEXT ("SIG_BLOCK failed")),
138 -1);
139 #else
140 ACE_UNUSED_ARG (sigmin);
141 ACE_UNUSED_ARG (sigmax);
142 #endif /* ACE_LACKS_UNIX_SIGNALS */
144 return 0;
148 // *************************************************************
149 // MyTask is ACE_Task resposible for :
150 // 1. creation and deletion of
151 // Proactor and Proactor thread pool
152 // 2. running Proactor event loop
153 // *************************************************************
156 * @class MyTask
158 * MyTask plays role for Proactor threads pool
160 * MyTask is ACE_Task resposible for:
161 * 1. Creation and deletion of Proactor and Proactor thread pool
162 * 2. Running Proactor event loop
164 class MyTask : public ACE_Task<ACE_MT_SYNCH>
166 public:
167 MyTask (void):
168 lock_ (),
169 sem_ ((unsigned int) 0),
170 proactor_(0) {}
172 virtual ~MyTask()
174 (void) this->stop ();
175 (void) this->delete_proactor();
178 virtual int svc (void);
180 int start (int num_threads,
181 ProactorType type_proactor,
182 size_t max_op );
183 int stop (void);
185 private:
186 int create_proactor (ProactorType type_proactor,
187 size_t max_op);
188 int delete_proactor (void);
190 ACE_SYNCH_RECURSIVE_MUTEX lock_;
191 ACE_Thread_Semaphore sem_;
192 ACE_Proactor * proactor_;
197 MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
199 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
200 monitor,
201 this->lock_,
202 -1);
204 ACE_TEST_ASSERT (this->proactor_ == 0);
206 #if defined (ACE_WIN32)
208 ACE_UNUSED_ARG (type_proactor);
209 ACE_UNUSED_ARG (max_op);
211 ACE_WIN32_Proactor *proactor_impl = 0;
213 ACE_NEW_RETURN (proactor_impl,
214 ACE_WIN32_Proactor,
215 -1);
217 ACE_DEBUG ((LM_DEBUG,
218 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
220 #elif defined (ACE_HAS_AIO_CALLS)
222 ACE_POSIX_Proactor * proactor_impl = 0;
224 switch (type_proactor)
226 case AIOCB:
227 ACE_NEW_RETURN (proactor_impl,
228 ACE_POSIX_AIOCB_Proactor (max_op),
229 -1);
230 ACE_DEBUG ((LM_DEBUG,
231 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
232 break;
234 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
235 case SIG:
236 ACE_NEW_RETURN (proactor_impl,
237 ACE_POSIX_SIG_Proactor (max_op),
238 -1);
239 ACE_DEBUG ((LM_DEBUG,
240 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
241 break;
242 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
244 # if defined (sun)
245 case SUN:
246 ACE_NEW_RETURN (proactor_impl,
247 ACE_SUN_Proactor (max_op),
248 -1);
249 ACE_DEBUG ((LM_DEBUG,
250 ACE_TEXT("(%t) Create Proactor Type = SUN\n")));
251 break;
252 # endif /* sun */
254 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
255 case CB:
256 ACE_NEW_RETURN (proactor_impl,
257 ACE_POSIX_CB_Proactor (max_op),
258 -1);
259 ACE_DEBUG ((LM_DEBUG,
260 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
261 break;
262 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
264 default:
265 ACE_DEBUG ((LM_DEBUG,
266 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
267 break;
270 #endif /* ACE_WIN32 */
272 // always delete implementation 1 , not !(proactor_impl == 0)
273 ACE_NEW_RETURN (this->proactor_,
274 ACE_Proactor (proactor_impl, 1 ),
275 -1);
276 // Set new singleton and delete it in close_singleton()
277 ACE_Proactor::instance (this->proactor_, 1);
278 return 0;
282 MyTask::delete_proactor (void)
284 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
285 monitor,
286 this->lock_,
287 -1);
289 ACE_DEBUG ((LM_DEBUG,
290 ACE_TEXT ("(%t) Delete Proactor\n")));
292 ACE_Proactor::close_singleton ();
293 this->proactor_ = 0;
295 return 0;
299 MyTask::start (int num_threads,
300 ProactorType type_proactor,
301 size_t max_op)
303 if (this->create_proactor (type_proactor, max_op) == -1)
304 ACE_ERROR_RETURN ((LM_ERROR,
305 ACE_TEXT ("%p.\n"),
306 ACE_TEXT ("unable to create proactor")),
307 -1);
309 if (this->activate (THR_NEW_LWP, num_threads) == -1)
310 ACE_ERROR_RETURN ((LM_ERROR,
311 ACE_TEXT ("%p.\n"),
312 ACE_TEXT ("unable to activate thread pool")),
313 -1);
315 for (; num_threads > 0; num_threads--)
317 sem_.acquire ();
320 return 0;
325 MyTask::stop ()
327 if (this->proactor_ != 0)
329 ACE_DEBUG ((LM_DEBUG,
330 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
332 this->proactor_->proactor_end_event_loop ();
335 if (this->wait () == -1)
336 ACE_ERROR ((LM_ERROR,
337 ACE_TEXT ("%p.\n"),
338 ACE_TEXT ("unable to stop thread pool")));
340 return 0;
344 MyTask::svc (void)
346 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n")));
348 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
349 disable_signal (SIGPIPE, SIGPIPE);
351 // signal that we are ready
352 sem_.release (1);
354 ACE_Proactor::run_event_loop ();
356 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n")));
357 return 0;
361 // TestData collects and reports on test-related transfer and connection
362 // statistics.
363 class TestData
365 public:
366 TestData ();
367 bool testing_done (void);
368 Server *server_up (void);
369 Client *client_up (void);
370 void server_done (Server *s);
371 void client_done (Client *c);
372 void stop_all (void);
373 void report (void);
375 private:
376 struct Local_Stats
378 // Track number of sessions that report start, and those that report
379 // their end (and stats).
380 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_up_;
381 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_down_;
383 // Total read and write bytes for all sessions.
384 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_cnt_;
385 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_cnt_;
386 // Total read and write operations issues for all sessions.
387 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_ops_;
388 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_ops_;
389 } servers_, clients_;
391 ACE_SYNCH_MUTEX list_lock_;
392 Server *server_list_[MAX_SERVERS];
393 Client *client_list_[MAX_CLIENTS];
396 TestData::TestData ()
398 int i;
399 for (i = 0; i < MAX_SERVERS; ++i)
400 this->server_list_[i] = 0;
401 for (i = 0; i < MAX_CLIENTS; ++i)
402 this->client_list_[i] = 0;
405 bool
406 TestData::testing_done (void)
408 int svr_up = this->servers_.sessions_up_.value ();
409 int svr_dn = this->servers_.sessions_down_.value ();
410 int clt_up = this->clients_.sessions_up_.value ();
411 int clt_dn = this->clients_.sessions_down_.value ();
413 if (svr_up == 0 && clt_up == 0) // No connections up yet
414 return false;
416 return (svr_dn >= svr_up && clt_dn >= clt_up);
419 Server *
420 TestData::server_up (void)
422 ++this->servers_.sessions_up_;
423 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
425 for (int i = 0; i < MAX_SERVERS; ++i)
427 if (this->server_list_[i] == 0)
429 ACE_NEW_RETURN (this->server_list_[i], Server (this, i), 0);
430 ACE_DEBUG ((LM_DEBUG,
431 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
433 this->servers_.sessions_up_.value (),
434 this->servers_.sessions_down_.value ()));
435 return this->server_list_[i];
438 return 0;
441 Client *
442 TestData::client_up (void)
444 ++this->clients_.sessions_up_;
445 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
447 for (int i = 0; i < MAX_CLIENTS; ++i)
449 if (this->client_list_[i] == 0)
451 ACE_NEW_RETURN (this->client_list_[i], Client (this, i), 0);
452 ACE_DEBUG ((LM_DEBUG,
453 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
455 this->clients_.sessions_up_.value (),
456 this->clients_.sessions_down_.value ()));
457 return this->client_list_[i];
460 return 0;
463 void
464 TestData::server_done (Server *s)
466 this->servers_.w_cnt_ += s->get_total_snd ();
467 this->servers_.r_cnt_ += s->get_total_rcv ();
468 this->servers_.w_ops_ += s->get_total_w ();
469 this->servers_.r_ops_ += s->get_total_r ();
470 ++this->servers_.sessions_down_;
471 ACE_DEBUG ((LM_DEBUG,
472 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
473 s->id (),
474 this->servers_.sessions_up_.value (),
475 this->servers_.sessions_down_.value ()));
477 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
478 int i;
479 for (i = 0; i < MAX_SERVERS; ++i)
481 if (this->server_list_[i] == s)
483 if (s->id () != i)
484 ACE_ERROR ((LM_ERROR,
485 ACE_TEXT ("Server %d is pos %d in list\n"),
486 s->id (),
487 i));
488 this->server_list_[i] = 0;
489 break;
492 if (i >= MAX_SERVERS)
493 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Server %@ done but not listed\n"), s));
495 return;
498 void
499 TestData::client_done (Client *c)
501 this->clients_.w_cnt_ += c->get_total_snd ();
502 this->clients_.r_cnt_ += c->get_total_rcv ();
503 this->clients_.w_ops_ += c->get_total_w ();
504 this->clients_.r_ops_ += c->get_total_r ();
505 ++this->clients_.sessions_down_;
506 ACE_DEBUG ((LM_DEBUG,
507 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
508 c->id (),
509 this->clients_.sessions_up_.value (),
510 this->clients_.sessions_down_.value ()));
512 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
513 int i;
514 for (i = 0; i < MAX_CLIENTS; ++i)
516 if (this->client_list_[i] == c)
518 if (c->id () != i)
519 ACE_ERROR ((LM_ERROR,
520 ACE_TEXT ("Client %d is pos %d in list\n"),
521 c->id (),
522 i));
523 this->client_list_[i] = 0;
524 break;
527 if (i >= MAX_CLIENTS)
528 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Client %@ done but not listed\n"), c));
530 return;
533 void
534 TestData::stop_all (void)
536 int i;
538 // Lock and cancel everything. Then release the lock, possibly allowing
539 // cleanups, then grab it again and delete all Servers and Clients.
541 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
542 for (i = 0; i < MAX_CLIENTS; ++i)
544 if (this->client_list_[i] != 0)
545 this->client_list_[i]->cancel ();
548 for (i = 0; i < MAX_SERVERS; ++i)
550 if (this->server_list_[i] != 0)
551 this->server_list_[i]->cancel ();
555 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
556 for (i = 0; i < MAX_CLIENTS; ++i)
558 if (this->client_list_[i] != 0)
559 delete this->client_list_[i];
562 for (i = 0; i < MAX_SERVERS; ++i)
564 if (this->server_list_[i] != 0)
565 delete this->server_list_[i];
570 void
571 TestData::report (void)
573 // Print statistics
574 ACE_TCHAR bufs [256];
575 ACE_TCHAR bufr [256];
577 ACE_OS::snprintf (bufs, 256,
578 ACE_SIZE_T_FORMAT_SPECIFIER
579 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
580 this->clients_.w_cnt_.value (),
581 this->clients_.w_ops_.value ());
583 ACE_OS::snprintf (bufr, 256,
584 ACE_SIZE_T_FORMAT_SPECIFIER
585 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
586 this->clients_.r_cnt_.value (),
587 this->clients_.r_ops_.value ());
589 ACE_DEBUG ((LM_DEBUG,
590 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
591 bufs,
592 bufr));
594 ACE_OS::snprintf (bufs, 256,
595 ACE_SIZE_T_FORMAT_SPECIFIER
596 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
597 this->servers_.w_cnt_.value (),
598 this->servers_.w_ops_.value ());
600 ACE_OS::snprintf (bufr, 256,
601 ACE_SIZE_T_FORMAT_SPECIFIER
602 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
603 this->servers_.r_cnt_.value (),
604 this->servers_.r_ops_.value ());
606 ACE_DEBUG ((LM_DEBUG,
607 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
608 bufs,
609 bufr));
611 if (this->clients_.w_cnt_.value () == 0 ||
612 this->clients_.r_cnt_.value () == 0 ||
613 this->servers_.w_cnt_.value () == 0 ||
614 this->servers_.r_cnt_.value () == 0 )
615 ACE_ERROR ((LM_ERROR, ACE_TEXT ("It appears that this test didn't ")
616 ACE_TEXT ("really do anything. Something is very wrong.\n")));
620 class Acceptor : public ACE_Asynch_Acceptor<Server>
622 public:
623 Acceptor (TestData *tester);
624 virtual ~Acceptor (void);
626 // Virtual from ACE_Asynch_Acceptor
627 Server *make_handler (void);
629 private:
630 TestData *tester_;
633 // *************************************************************
634 Acceptor::Acceptor (TestData *tester)
635 : tester_ (tester)
639 Acceptor::~Acceptor (void)
641 this->cancel ();
644 Server *
645 Acceptor::make_handler (void)
647 return this->tester_->server_up ();
650 // ***************************************************
651 Server::Server ()
653 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
656 Server::Server (TestData *tester, int id)
657 : tester_ (tester),
658 id_ (id),
659 handle_ (ACE_INVALID_HANDLE),
660 io_count_ (0),
661 flg_cancel_(0),
662 total_snd_(0),
663 total_rcv_(0),
664 total_w_ (0),
665 total_r_ (0)
669 Server::~Server (void)
671 ACE_DEBUG ((LM_DEBUG,
672 ACE_TEXT ("(%t) Server %d dtor; %d sends (%B bytes); ")
673 ACE_TEXT ("%d recvs (%B bytes)\n"),
674 this->id_,
675 this->total_w_, this->total_snd_,
676 this->total_r_, this->total_rcv_));
677 if (this->io_count_ != 0)
678 ACE_ERROR ((LM_WARNING,
679 ACE_TEXT ("(%t) Server %d deleted with ")
680 ACE_TEXT ("%d I/O outstanding\n"),
681 this->id_,
682 this->io_count_));
684 // This test bounces data back and forth between Clients and Servers.
685 // Therefore, if there was significantly more data in one direction, that's
686 // a problem. Remember, the byte counts are unsigned values.
687 int issue_data_warning = 0;
688 if (this->total_snd_ > this->total_rcv_)
690 if (this->total_rcv_ == 0)
691 issue_data_warning = 1;
692 else if (this->total_snd_ / this->total_rcv_ > 2)
693 issue_data_warning = 1;
695 else
697 if (this->total_snd_ == 0)
698 issue_data_warning = 1;
699 else if (this->total_rcv_ / this->total_snd_ > 2)
700 issue_data_warning = 1;
702 if (issue_data_warning)
703 ACE_DEBUG ((LM_WARNING,
704 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
706 if (this->tester_ != 0)
707 this->tester_->server_done (this);
709 if (this->handle_ != ACE_INVALID_HANDLE)
710 ACE_OS::closesocket (this->handle_);
712 this->id_ = -1;
713 this->handle_= ACE_INVALID_HANDLE;
716 void
717 Server::cancel ()
719 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
721 this->flg_cancel_ = 1;
722 this->ws_.cancel ();
723 this->rs_.cancel ();
724 return;
728 void
729 Server::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&)
731 ACE_TCHAR str[256];
732 if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
733 ACE_DEBUG ((LM_DEBUG,
734 ACE_TEXT ("(%t) Server %d connection from %s\n"),
735 this->id_,
736 str));
737 else
738 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Server %d %p\n"),
739 this->id_,
740 ACE_TEXT ("addr_to_string")));
741 return;
745 void
746 Server::open (ACE_HANDLE handle, ACE_Message_Block &)
749 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
751 // Don't buffer serial sends.
752 this->handle_ = handle;
753 int nodelay = 1;
754 ACE_SOCK_Stream option_setter (handle);
755 if (-1 == option_setter.set_option (ACE_IPPROTO_TCP,
756 TCP_NODELAY,
757 &nodelay,
758 sizeof (nodelay)))
759 ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
761 if (this->ws_.open (*this, this->handle_) == -1)
762 ACE_ERROR ((LM_ERROR,
763 ACE_TEXT ("(%t) %p\n"),
764 ACE_TEXT ("Server::ACE_Asynch_Write_Stream::open")));
765 else if (this->rs_.open (*this, this->handle_) == -1)
766 ACE_ERROR ((LM_ERROR,
767 ACE_TEXT ("(%t) %p\n"),
768 ACE_TEXT ("Server::ACE_Asynch_Read_Stream::open")));
769 else
770 this->initiate_read_stream ();
772 if (this->io_count_ > 0)
773 return;
775 delete this;
779 Server::initiate_read_stream (void)
781 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
782 return -1;
784 ACE_Message_Block *mb = 0;
785 ACE_NEW_RETURN (mb,
786 ACE_Message_Block (1024), //BUFSIZ + 1),
787 -1);
789 // Inititiate read
790 if (this->rs_.read (*mb, mb->size () - 1) == -1)
792 mb->release ();
793 #if defined (ACE_WIN32)
794 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
795 // a 0-byte read as we would if underlying calls used WSARecv.
796 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
797 ACE_ERROR_RETURN ((LM_DEBUG,
798 ACE_TEXT ("(%t) Server %d, peer closed\n"),
799 this->id_),
800 -1);
801 #endif /* ACE_WIN32 */
802 ACE_ERROR_RETURN ((LM_ERROR,
803 ACE_TEXT ("(%t) Server %d, %p\n"),
804 this->id_,
805 ACE_TEXT ("read")),
806 -1);
809 this->io_count_++;
810 this->total_r_++;
811 return 0;
815 Server::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
817 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
819 mb.release ();
820 return -1;
823 if (nbytes == 0)
825 mb.release ();
826 ACE_ERROR_RETURN((LM_ERROR,
827 ACE_TEXT ("(%t) Server::ACE_Asynch_Write_Stream::write nbytes <0 ")),
828 -1);
831 if (this->ws_.write (mb, nbytes) == -1)
833 mb.release ();
834 #if defined (ACE_WIN32)
835 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
836 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
837 ACE_ERROR_RETURN ((LM_DEBUG,
838 ACE_TEXT ("(%t) Server %d, peer gone\n"),
839 this->id_),
840 -1);
841 #endif /* ACE_WIN32 */
842 ACE_ERROR_RETURN((LM_ERROR,
843 ACE_TEXT ("(%t) Server %d, %p\n"),
844 this->id_,
845 ACE_TEXT ("write")),
846 -1);
849 this->io_count_++;
850 this->total_w_++;
851 return 0;
854 void
855 Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
858 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
860 ACE_Message_Block & mb = result.message_block ();
862 // Reset pointers.
863 mb.rd_ptr ()[result.bytes_transferred ()] = '\0';
865 if (loglevel > 1)
867 LogLocker log_lock;
869 ACE_DEBUG ((LM_DEBUG,
870 ACE_TEXT ("(%t) **** Server %d: handle_read_stream() ****\n"),
871 this->id_));
872 ACE_DEBUG ((LM_DEBUG,
873 ACE_TEXT ("%s = %B\n"),
874 ACE_TEXT ("bytes_to_read"),
875 result.bytes_to_read ()));
876 ACE_DEBUG ((LM_DEBUG,
877 ACE_TEXT ("%s = %d\n"),
878 ACE_TEXT ("handle"),
879 result.handle ()));
880 ACE_DEBUG ((LM_DEBUG,
881 ACE_TEXT ("%s = %B\n"),
882 ACE_TEXT ("bytes_transfered"),
883 result.bytes_transferred ()));
884 ACE_DEBUG ((LM_DEBUG,
885 ACE_TEXT ("%s = %@\n"),
886 ACE_TEXT ("act"),
887 result.act ()));
888 ACE_DEBUG ((LM_DEBUG,
889 ACE_TEXT ("%s = %d\n"),
890 ACE_TEXT ("success"),
891 result.success ()));
892 ACE_DEBUG ((LM_DEBUG,
893 ACE_TEXT ("%s = %@\n"),
894 ACE_TEXT ("completion_key"),
895 result.completion_key ()));
896 ACE_DEBUG ((LM_DEBUG,
897 ACE_TEXT ("%s = %d\n"),
898 ACE_TEXT ("error"),
899 result.error ()));
900 ACE_DEBUG ((LM_DEBUG,
901 ACE_TEXT ("%s = %s\n"),
902 ACE_TEXT ("message_block"),
903 mb.rd_ptr ()));
904 ACE_DEBUG ((LM_DEBUG,
905 ACE_TEXT ("**** end of message ****************\n")));
907 else if (result.error () != 0)
909 ACE_Log_Priority prio;
910 #if defined (ACE_WIN32)
911 if (result.error () == ERROR_OPERATION_ABORTED)
912 prio = LM_DEBUG;
913 #else
914 if (result.error () == ECANCELED)
915 prio = LM_DEBUG;
916 #endif /* ACE_WIN32 */
917 else
918 prio = LM_ERROR;
919 ACE_Log_Msg::instance ()->errnum (result.error ());
920 ACE_Log_Msg::instance ()->log (prio,
921 ACE_TEXT ("(%t) Server %d; %p\n"),
922 this->id_,
923 ACE_TEXT ("read"));
925 else if (loglevel > 0)
927 ACE_DEBUG ((LM_DEBUG,
928 ACE_TEXT ("(%t) Server %d: read %B bytes\n"),
929 this->id_,
930 result.bytes_transferred ()));
933 if (result.error () == 0 && result.bytes_transferred () > 0)
935 this->total_rcv_ += result.bytes_transferred ();
937 if (this->initiate_write_stream (mb,
938 result.bytes_transferred ()) == 0)
940 if (duplex != 0) // Initiate new read from the stream.
941 this->initiate_read_stream ();
944 else
945 mb.release ();
947 --this->io_count_;
948 if (this->io_count_ > 0)
949 return;
951 delete this;
954 void
955 Server::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
958 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
960 ACE_Message_Block & mb = result.message_block ();
962 if (loglevel > 1)
964 LogLocker log_lock;
966 //mb.rd_ptr () [0] = '\0';
967 mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
969 ACE_DEBUG ((LM_DEBUG,
970 ACE_TEXT ("(%t) **** Server %d: handle_write_stream() ****\n"),
971 this->id_));
972 ACE_DEBUG ((LM_DEBUG,
973 ACE_TEXT ("%s = %B\n"),
974 ACE_TEXT ("bytes_to_write"),
975 result.bytes_to_write ()));
976 ACE_DEBUG ((LM_DEBUG,
977 ACE_TEXT ("%s = %d\n"),
978 ACE_TEXT ("handle"),
979 result.handle ()));
980 ACE_DEBUG ((LM_DEBUG,
981 ACE_TEXT ("%s = %B\n"),
982 ACE_TEXT ("bytes_transfered"),
983 result.bytes_transferred ()));
984 ACE_DEBUG ((LM_DEBUG,
985 ACE_TEXT ("%s = %@\n"),
986 ACE_TEXT ("act"),
987 result.act ()));
988 ACE_DEBUG ((LM_DEBUG,
989 ACE_TEXT ("%s = %d\n"),
990 ACE_TEXT ("success"),
991 result.success ()));
992 ACE_DEBUG ((LM_DEBUG,
993 ACE_TEXT ("%s = %@\n"),
994 ACE_TEXT ("completion_key"),
995 result.completion_key ()));
996 ACE_DEBUG ((LM_DEBUG,
997 ACE_TEXT ("%s = %d\n"),
998 ACE_TEXT ("error"),
999 result.error ()));
1000 ACE_DEBUG ((LM_DEBUG,
1001 ACE_TEXT ("%s = %s\n"),
1002 ACE_TEXT ("message_block"),
1003 mb.rd_ptr ()));
1004 ACE_DEBUG ((LM_DEBUG,
1005 ACE_TEXT ("**** end of message ****************\n")));
1007 else if (result.error () != 0)
1009 ACE_Log_Priority prio;
1010 #if defined (ACE_WIN32)
1011 if (result.error () == ERROR_OPERATION_ABORTED)
1012 prio = LM_DEBUG;
1013 #else
1014 if (result.error () == ECANCELED)
1015 prio = LM_DEBUG;
1016 #endif /* ACE_WIN32 */
1017 else
1018 prio = LM_ERROR;
1019 ACE_Log_Msg::instance ()->errnum (result.error ());
1020 ACE_Log_Msg::instance ()->log (prio,
1021 ACE_TEXT ("(%t) Server %d; %p\n"),
1022 this->id_,
1023 ACE_TEXT ("write"));
1025 else if (loglevel > 0)
1027 ACE_DEBUG ((LM_DEBUG,
1028 ACE_TEXT ("(%t) Server %d: wrote %B bytes ok\n"),
1029 this->id_,
1030 result.bytes_transferred ()));
1033 mb.release ();
1035 if (result.error () == 0 && result.bytes_transferred () > 0)
1037 this->total_snd_ += result.bytes_transferred ();
1039 if (duplex == 0)
1040 this->initiate_read_stream ();
1043 --this->io_count_;
1044 if (this->io_count_ > 0)
1045 return;
1047 delete this;
1050 // *******************************************
1051 // Connector
1052 // *******************************************
1054 class Connector : public ACE_Asynch_Connector<Client>
1056 public:
1057 Connector (TestData *tester);
1058 virtual ~Connector (void);
1060 int start (const ACE_INET_Addr &addr, int num);
1062 // Virtual from ACE_Asynch_Connector
1063 Client *make_handler (void);
1065 private:
1066 TestData *tester_;
1069 // *************************************************************
1071 Connector::Connector (TestData *tester)
1072 : tester_ (tester)
1076 Connector::~Connector (void)
1078 this->cancel ();
1081 Client *
1082 Connector::make_handler (void)
1084 return this->tester_->client_up ();
1089 Connector::start (const ACE_INET_Addr& addr, int num)
1091 if (num > MAX_CLIENTS)
1092 num = MAX_CLIENTS;
1094 if (num < 0)
1095 num = 1;
1097 int rc = 0;
1099 // int open ( int pass_addresses = 0,
1100 // ACE_Proactor *proactor = 0,
1101 // int validate_new_connection = 0 );
1103 if (this->open (1, 0, 1) != 0)
1105 ACE_ERROR ((LM_ERROR,
1106 ACE_TEXT ("(%t) %p\n"),
1107 ACE_TEXT ("Connector::open failed")));
1108 return rc;
1111 for (; rc < num; rc++)
1113 if (this->connect (addr) != 0)
1115 ACE_ERROR ((LM_ERROR,
1116 ACE_TEXT ("(%t) %p\n"),
1117 ACE_TEXT ("Connector::connect failed")));
1118 break;
1121 return rc;
1125 Client::Client ()
1127 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
1130 Client::Client (TestData *tester, int id)
1131 : tester_ (tester),
1132 id_ (id),
1133 handle_ (ACE_INVALID_HANDLE),
1134 io_count_ (0),
1135 stop_writing_ (0),
1136 flg_cancel_ (0),
1137 total_snd_ (0),
1138 total_rcv_ (0),
1139 total_w_ (0),
1140 total_r_ (0)
1144 Client::~Client (void)
1146 ACE_DEBUG ((LM_DEBUG,
1147 ACE_TEXT ("(%t) Client %d dtor; %d sends (%B bytes); ")
1148 ACE_TEXT ("%d recvs (%B bytes)\n"),
1149 this->id_,
1150 this->total_w_, this->total_snd_,
1151 this->total_r_, this->total_rcv_));
1152 if (this->io_count_ != 0)
1153 ACE_ERROR ((LM_WARNING,
1154 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1155 this->id_,
1156 this->io_count_));
1158 // This test bounces data back and forth between Clients and Servers.
1159 // Therefore, if there was significantly more data in one direction, that's
1160 // a problem. Remember, the byte counts are unsigned values.
1161 int issue_data_warning = 0;
1162 if (this->total_snd_ > this->total_rcv_)
1164 if (this->total_rcv_ == 0)
1165 issue_data_warning = 1;
1166 else if (this->total_snd_ / this->total_rcv_ > 2)
1167 issue_data_warning = 1;
1169 else
1171 if (this->total_snd_ == 0)
1172 issue_data_warning = 1;
1173 else if (this->total_rcv_ / this->total_snd_ > 2)
1174 issue_data_warning = 1;
1176 if (issue_data_warning)
1177 ACE_DEBUG ((LM_WARNING,
1178 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1180 if (this->tester_ != 0)
1181 this->tester_->client_done (this);
1183 this->id_ = -1;
1184 this->handle_= ACE_INVALID_HANDLE;
1185 if (this->handle_ != ACE_INVALID_HANDLE)
1187 ACE_OS::closesocket (this->handle_);
1189 this->handle_= ACE_INVALID_HANDLE;
1192 void
1193 Client::cancel ()
1195 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1197 this->flg_cancel_ = 1;
1198 this->ws_.cancel ();
1199 this->rs_.cancel ();
1200 return;
1203 void
1204 Client::close ()
1206 // This must be called with the lock_ held.
1207 ACE_DEBUG ((LM_DEBUG,
1208 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1209 this->id_, this->io_count_));
1210 ACE_OS::shutdown (this->handle_, ACE_SHUTDOWN_WRITE);
1211 this->stop_writing_ = 1;
1212 return;
1216 void
1217 Client::addresses (const ACE_INET_Addr& /* peer */, const ACE_INET_Addr& local)
1219 ACE_TCHAR str[256];
1220 if (0 == local.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
1221 ACE_DEBUG ((LM_DEBUG,
1222 ACE_TEXT ("(%t) Client %d connected on %s\n"),
1223 this->id_,
1224 str));
1225 else
1226 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Client %d %p\n"),
1227 this->id_,
1228 ACE_TEXT ("addr_to_string")));
1229 return;
1233 void
1234 Client::open (ACE_HANDLE handle, ACE_Message_Block &)
1237 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1239 // Don't buffer serial sends.
1240 this->handle_ = handle;
1241 int nodelay = 1;
1242 ACE_SOCK_Stream option_setter (handle);
1243 if (option_setter.set_option (ACE_IPPROTO_TCP,
1244 TCP_NODELAY,
1245 &nodelay,
1246 sizeof (nodelay)))
1247 ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
1249 // Open ACE_Asynch_Write_Stream
1250 if (this->ws_.open (*this, this->handle_) == -1)
1251 ACE_ERROR ((LM_ERROR,
1252 ACE_TEXT ("(%t) %p\n"),
1253 ACE_TEXT ("Client::ACE_Asynch_Write_Stream::open")));
1255 // Open ACE_Asynch_Read_Stream
1256 else if (this->rs_.open (*this, this->handle_) == -1)
1257 ACE_ERROR ((LM_ERROR,
1258 ACE_TEXT ("(%t) %p\n"),
1259 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::open")));
1261 else if (this->initiate_write_stream () == 0)
1263 if (duplex != 0) // Start an asynchronous read
1264 this->initiate_read_stream ();
1267 if (this->io_count_ > 0)
1268 return;
1270 delete this;
1274 Client::initiate_write_stream (void)
1276 if (this->flg_cancel_ != 0 ||
1277 this->stop_writing_ ||
1278 this->handle_ == ACE_INVALID_HANDLE)
1279 return -1;
1281 static const size_t complete_message_length = ACE_OS::strlen (complete_message);
1283 #if defined (ACE_WIN32)
1285 ACE_Message_Block *mb1 = 0,
1286 *mb2 = 0,
1287 *mb3 = 0;
1289 // No need to allocate +1 for proper printing - the memory includes it already
1290 ACE_NEW_RETURN (mb1,
1291 ACE_Message_Block ((char *)complete_message,
1292 complete_message_length),
1293 -1);
1295 ACE_NEW_RETURN (mb2,
1296 ACE_Message_Block ((char *)complete_message,
1297 complete_message_length),
1298 -1);
1300 ACE_NEW_RETURN (mb3,
1301 ACE_Message_Block ((char *)complete_message,
1302 complete_message_length),
1303 -1);
1305 mb1->wr_ptr (complete_message_length);
1306 mb2->wr_ptr (complete_message_length);
1307 mb3->wr_ptr (complete_message_length);
1309 // chain them together
1310 mb1->cont (mb2);
1311 mb2->cont (mb3);
1313 if (this->ws_.writev (*mb1, mb1->total_length ()) == -1)
1315 mb1->release ();
1316 ACE_ERROR_RETURN((LM_ERROR,
1317 ACE_TEXT ("(%t) %p\n"),
1318 ACE_TEXT ("Client::ACE_Asynch_Stream::writev")),
1319 -1);
1321 #else /* ACE_WIN32 */
1323 ACE_Message_Block *mb = 0;
1325 // No need to allocate +1 for proper printing - the memory includes it already
1326 ACE_NEW_RETURN (mb,
1327 ACE_Message_Block (complete_message, complete_message_length),
1328 -1);
1329 mb->wr_ptr (complete_message_length);
1331 if (this->ws_.write (*mb, mb->length ()) == -1)
1333 mb->release ();
1334 #if defined (ACE_WIN32)
1335 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
1336 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
1337 ACE_ERROR_RETURN ((LM_DEBUG,
1338 ACE_TEXT ("(%t) Client %d, peer gone\n"),
1339 this->id_),
1340 -1);
1341 #endif /* ACE_WIN32 */
1342 ACE_ERROR_RETURN((LM_ERROR,
1343 ACE_TEXT ("(%t) Client %d, %p\n"),
1344 this->id_,
1345 ACE_TEXT ("write")),
1346 -1);
1348 #endif /* ACE_WIN32 */
1350 this->io_count_++;
1351 this->total_w_++;
1352 return 0;
1356 Client::initiate_read_stream (void)
1358 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
1359 return -1;
1361 static const size_t complete_message_length =
1362 ACE_OS::strlen (complete_message);
1364 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
1365 ACE_Message_Block *mb1 = 0,
1366 *mb2 = 0,
1367 *mb3 = 0,
1368 *mb4 = 0,
1369 *mb5 = 0,
1370 *mb6 = 0;
1372 // We allocate +1 only for proper printing - we can just set the last byte
1373 // to '\0' before printing out
1374 ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1);
1375 ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1);
1376 ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1);
1378 // Let allocate memory for one more triplet,
1379 // This improves performance
1380 // as we can receive more the than one block at once
1381 // Generally, we can receive more triplets ....
1382 ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1);
1383 ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1);
1384 ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1);
1386 mb1->cont (mb2);
1387 mb2->cont (mb3);
1389 mb3->cont (mb4);
1390 mb4->cont (mb5);
1391 mb5->cont (mb6);
1394 // hide last byte in each message block, reserving it for later to set '\0'
1395 // for proper printouts
1396 mb1->size (mb1->size () - 1);
1397 mb2->size (mb2->size () - 1);
1398 mb3->size (mb3->size () - 1);
1400 mb4->size (mb4->size () - 1);
1401 mb5->size (mb5->size () - 1);
1402 mb6->size (mb6->size () - 1);
1404 // Inititiate read
1405 if (this->rs_.readv (*mb1, mb1->total_size () - 1) == -1)
1407 mb1->release ();
1408 ACE_ERROR_RETURN ((LM_ERROR,
1409 ACE_TEXT ("(%t) %p\n"),
1410 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1411 -1);
1413 #else /* ACE_HAS_WIN32_OVERLAPPED_IO */
1415 // Try to read more chunks
1416 size_t blksize = ( complete_message_length > BUFSIZ ) ?
1417 complete_message_length : BUFSIZ;
1419 ACE_Message_Block *mb = 0;
1421 // We allocate +1 only for proper printing - we can just set the last byte
1422 // to '\0' before printing out
1423 ACE_NEW_RETURN (mb,
1424 ACE_Message_Block (blksize + 1),
1425 -1);
1427 // Inititiate read
1428 if (this->rs_.read (*mb, mb->size () - 1) == -1)
1430 mb->release ();
1431 #if defined (ACE_WIN32)
1432 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
1433 // a 0-byte read as we would if underlying calls used WSARecv.
1434 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
1435 ACE_ERROR_RETURN ((LM_DEBUG,
1436 ACE_TEXT ("(%t) Client %d, peer closed\n"),
1437 this->id_),
1438 -1);
1439 #endif /* ACE_WIN32 */
1440 ACE_ERROR_RETURN ((LM_ERROR,
1441 ACE_TEXT ("(%t) Client %d, %p\n"),
1442 this->id_,
1443 ACE_TEXT ("read")),
1444 -1);
1446 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
1448 this->io_count_++;
1449 this->total_r_++;
1450 return 0;
1453 void
1454 Client::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
1457 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1459 ACE_Message_Block & mb = result.message_block ();
1461 if (loglevel > 1)
1463 LogLocker log_lock;
1465 ACE_DEBUG ((LM_DEBUG,
1466 ACE_TEXT ("(%t) **** Client %d: handle_write_stream() ****\n"),
1467 this->id_));
1468 ACE_DEBUG ((LM_DEBUG,
1469 ACE_TEXT ("%s = %B\n"),
1470 ACE_TEXT ("bytes_to_write"),
1471 result.bytes_to_write ()));
1472 ACE_DEBUG ((LM_DEBUG,
1473 ACE_TEXT ("%s = %d\n"),
1474 ACE_TEXT ("handle"),
1475 result.handle ()));
1476 ACE_DEBUG ((LM_DEBUG,
1477 ACE_TEXT ("%s = %B\n"),
1478 ACE_TEXT ("bytes_transfered"),
1479 result.bytes_transferred ()));
1480 ACE_DEBUG ((LM_DEBUG,
1481 ACE_TEXT ("%s = %@\n"),
1482 ACE_TEXT ("act"),
1483 result.act ()));
1484 ACE_DEBUG ((LM_DEBUG,
1485 ACE_TEXT ("%s = %d\n"),
1486 ACE_TEXT ("success"),
1487 result.success ()));
1488 ACE_DEBUG ((LM_DEBUG,
1489 ACE_TEXT ("%s = %@\n"),
1490 ACE_TEXT ("completion_key"),
1491 result.completion_key ()));
1492 ACE_DEBUG ((LM_DEBUG,
1493 ACE_TEXT ("%s = %d\n"),
1494 ACE_TEXT ("error"),
1495 result.error ()));
1497 #if defined (ACE_WIN32)
1498 size_t bytes_transferred = result.bytes_transferred ();
1499 char index = 0;
1500 for (ACE_Message_Block* mb_i = &mb;
1501 (mb_i != 0) && (bytes_transferred > 0);
1502 mb_i = mb_i->cont ())
1504 // write 0 at string end for proper printout (if end of mb,
1505 // it's 0 already)
1506 mb_i->rd_ptr()[0] = '\0';
1508 size_t len = mb_i->rd_ptr () - mb_i->base ();
1510 // move rd_ptr backwards as required for printout
1511 if (len >= bytes_transferred)
1513 mb_i->rd_ptr (0 - bytes_transferred);
1514 bytes_transferred = 0;
1516 else
1518 mb_i->rd_ptr (0 - len);
1519 bytes_transferred -= len;
1522 ++index;
1523 ACE_DEBUG ((LM_DEBUG,
1524 ACE_TEXT ("%s%d = %s\n"),
1525 ACE_TEXT ("message_block, part "),
1526 index,
1527 mb_i->rd_ptr ()));
1529 #else /* ACE_WIN32 */
1530 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1531 mb.rd_ptr()[0] = '\0';
1532 // move rd_ptr backwards as required for printout
1533 mb.rd_ptr (- result.bytes_transferred ());
1534 ACE_DEBUG ((LM_DEBUG,
1535 ACE_TEXT ("%s = %s\n"),
1536 ACE_TEXT ("message_block"),
1537 mb.rd_ptr ()));
1538 #endif /* ACE_WIN32 */
1540 ACE_DEBUG ((LM_DEBUG,
1541 ACE_TEXT ("**** end of message ****************\n")));
1543 else if (result.error () != 0)
1545 ACE_Log_Priority prio;
1546 #if defined (ACE_WIN32)
1547 if (result.error () == ERROR_OPERATION_ABORTED)
1548 prio = LM_DEBUG;
1549 #else
1550 if (result.error () == ECANCELED)
1551 prio = LM_DEBUG;
1552 #endif /* ACE_WIN32 */
1553 else
1554 prio = LM_ERROR;
1555 ACE_Log_Msg::instance ()->errnum (result.error ());
1556 ACE_Log_Msg::instance ()->log (prio,
1557 ACE_TEXT ("(%t) Client %d; %p\n"),
1558 this->id_,
1559 ACE_TEXT ("write"));
1561 else if (loglevel > 0)
1563 ACE_DEBUG ((LM_DEBUG,
1564 ACE_TEXT ("(%t) Client %d: wrote %B bytes ok\n"),
1565 this->id_,
1566 result.bytes_transferred ()));
1569 mb.release ();
1571 if (result.error () == 0 && result.bytes_transferred () > 0)
1573 this->total_snd_ += result.bytes_transferred ();
1574 if (this->total_snd_ >= xfer_limit)
1576 ACE_DEBUG ((LM_DEBUG,
1577 ACE_TEXT ("(%t) Client %d sent %B, limit %B\n"),
1578 this->id_, this->total_snd_, xfer_limit));
1579 this->close ();
1581 if (duplex != 0) // full duplex, continue write
1583 if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
1584 this->initiate_write_stream ();
1586 else // half-duplex read reply, after read we will start write
1587 this->initiate_read_stream ();
1590 --this->io_count_;
1591 if (this->io_count_ > 0)
1592 return;
1594 delete this;
1597 void
1598 Client::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
1601 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1603 ACE_Message_Block & mb = result.message_block ();
1605 if (loglevel > 1)
1607 LogLocker log_lock;
1609 ACE_DEBUG ((LM_DEBUG,
1610 ACE_TEXT ("(%t) **** Client %d: handle_read_stream() ****\n"),
1611 this->id_));
1612 ACE_DEBUG ((LM_DEBUG,
1613 ACE_TEXT ("%s = %B\n"),
1614 ACE_TEXT ("bytes_to_read"),
1615 result.bytes_to_read ()));
1616 ACE_DEBUG ((LM_DEBUG,
1617 ACE_TEXT ("%s = %d\n"),
1618 ACE_TEXT ("handle"),
1619 result.handle ()));
1620 ACE_DEBUG ((LM_DEBUG,
1621 ACE_TEXT ("%s = %B\n"),
1622 ACE_TEXT ("bytes_transfered"),
1623 result.bytes_transferred ()));
1624 ACE_DEBUG ((LM_DEBUG,
1625 ACE_TEXT ("%s = %@\n"),
1626 ACE_TEXT ("act"),
1627 result.act ()));
1628 ACE_DEBUG ((LM_DEBUG,
1629 ACE_TEXT ("%s = %d\n"),
1630 ACE_TEXT ("success"),
1631 result.success ()));
1632 ACE_DEBUG ((LM_DEBUG,
1633 ACE_TEXT ("%s = %@\n"),
1634 ACE_TEXT ("completion_key"),
1635 result.completion_key ()));
1636 ACE_DEBUG ((LM_DEBUG,
1637 ACE_TEXT ("%s = %d\n"),
1638 ACE_TEXT ("error"),
1639 result.error ()));
1641 #if defined (ACE_WIN32)
1642 char index = 0;
1643 for (ACE_Message_Block* mb_i = &mb;
1644 mb_i != 0;
1645 mb_i = mb_i->cont ())
1647 ++index;
1648 // write 0 at string end for proper printout
1649 mb_i->wr_ptr()[0] = '\0';
1651 ACE_DEBUG ((LM_DEBUG,
1652 ACE_TEXT ("%s%d = %s\n"),
1653 ACE_TEXT ("message_block, part "),
1654 index,
1655 mb_i->rd_ptr ()));
1657 #else /* ACE_WIN32 */
1658 // write 0 at string end for proper printout
1659 mb.rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout
1660 ACE_DEBUG ((LM_DEBUG,
1661 ACE_TEXT ("%s = %s\n"),
1662 ACE_TEXT ("message_block"),
1663 mb.rd_ptr ()));
1664 #endif /* ACE_WIN32 */
1666 ACE_DEBUG ((LM_DEBUG,
1667 ACE_TEXT ("**** end of message ****************\n")));
1669 else if (result.error () != 0)
1671 ACE_Log_Priority prio;
1672 #if defined (ACE_WIN32)
1673 if (result.error () == ERROR_OPERATION_ABORTED)
1674 prio = LM_DEBUG;
1675 #else
1676 if (result.error () == ECANCELED)
1677 prio = LM_DEBUG;
1678 #endif /* ACE_WIN32 */
1679 else
1680 prio = LM_ERROR;
1681 ACE_Log_Msg::instance ()->errnum (result.error ());
1682 ACE_Log_Msg::instance ()->log (prio,
1683 ACE_TEXT ("(%t) Client %d; %p\n"),
1684 this->id_,
1685 ACE_TEXT ("read"));
1687 else if (loglevel > 0)
1689 ACE_DEBUG ((LM_DEBUG,
1690 ACE_TEXT ("(%t) Client %d: read %B bytes ok\n"),
1691 this->id_,
1692 result.bytes_transferred ()));
1695 mb.release ();
1697 if (result.error () == 0 && result.bytes_transferred () > 0)
1699 this->total_rcv_ += result.bytes_transferred ();
1701 if (duplex != 0 || this->stop_writing_) // full duplex, continue read
1702 this->initiate_read_stream ();
1703 else // half-duplex write, after write we will start read
1704 this->initiate_write_stream ();
1707 --this->io_count_;
1708 if (this->io_count_ > 0)
1709 return;
1711 delete this;
1714 // *************************************************************
1715 // Configuration helpers
1716 // *************************************************************
1718 print_usage (int /* argc */, ACE_TCHAR *argv[])
1720 ACE_ERROR
1721 ((LM_ERROR,
1722 ACE_TEXT ("\nusage: %s")
1723 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1724 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1725 ACE_TEXT ("\n a AIOCB")
1726 ACE_TEXT ("\n i SIG")
1727 ACE_TEXT ("\n c CB")
1728 ACE_TEXT ("\n s SUN")
1729 ACE_TEXT ("\n d default")
1730 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1731 ACE_TEXT ("\n-h <host> for Client mode")
1732 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1733 ACE_TEXT ("\n-p <port to listen/connect>")
1734 ACE_TEXT ("\n-c <number of client instances>")
1735 ACE_TEXT ("\n-b run client and server at the same time")
1736 ACE_TEXT ("\n f file")
1737 ACE_TEXT ("\n c console")
1738 ACE_TEXT ("\n-v log level")
1739 ACE_TEXT ("\n 0 - log errors and highlights")
1740 ACE_TEXT ("\n 1 - log level 0 plus progress information")
1741 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
1742 ACE_TEXT ("\n-x max transfer byte count per Client")
1743 ACE_TEXT ("\n-u show this message")
1744 ACE_TEXT ("\n"),
1745 argv[0]
1747 return -1;
1750 static int
1751 set_proactor_type (const ACE_TCHAR *ptype)
1753 if (!ptype)
1754 return 0;
1756 switch (ACE_OS::ace_toupper (*ptype))
1758 case 'D':
1759 proactor_type = DEFAULT;
1760 return 1;
1761 case 'A':
1762 proactor_type = AIOCB;
1763 return 1;
1764 case 'I':
1765 proactor_type = SIG;
1766 return 1;
1767 #if defined (sun)
1768 case 'S':
1769 proactor_type = SUN;
1770 return 1;
1771 #endif /* sun */
1772 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
1773 case 'C':
1774 proactor_type = CB;
1775 return 1;
1776 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
1777 default:
1778 break;
1780 return 0;
1783 static int
1784 parse_args (int argc, ACE_TCHAR *argv[])
1786 // First, set up all the defaults then let any args change them.
1787 both = 1; // client and server simultaneosly
1788 duplex = 1; // full duplex is on
1789 host = ACE_LOCALHOST; // server to connect
1790 port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
1791 max_aio_operations = 512; // POSIX Proactor params
1792 proactor_type = DEFAULT; // Proactor type = default
1793 threads = 3; // size of Proactor thread pool
1794 clients = 10; // number of clients
1795 loglevel = 0; // log level : only errors and highlights
1796 // Default transfer limit 50 messages per Sender
1797 xfer_limit = 50 * ACE_OS::strlen (complete_message);
1799 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
1800 # if defined (ACE_LINUX)
1801 duplex = 0;
1802 #endif
1804 if (argc == 1) // no arguments , so one button test
1805 return 0;
1807 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
1808 int c;
1810 while ((c = get_opt ()) != EOF)
1812 switch (c)
1814 case 'x': // xfer limit
1815 xfer_limit = static_cast<size_t> (ACE_OS::atoi (get_opt.opt_arg ()));
1816 if (xfer_limit == 0)
1817 xfer_limit = 1; // Bare minimum.
1818 break;
1819 case 'b': // both client and server
1820 both = 1;
1821 break;
1822 case 'v': // log level
1823 loglevel = ACE_OS::atoi (get_opt.opt_arg ());
1824 break;
1825 case 'd': // duplex
1826 duplex = ACE_OS::atoi (get_opt.opt_arg ());
1827 break;
1828 case 'h': // host for sender
1829 host = get_opt.opt_arg ();
1830 break;
1831 case 'p': // port number
1832 port = ACE_OS::atoi (get_opt.opt_arg ());
1833 break;
1834 case 'n': // thread pool size
1835 threads = ACE_OS::atoi (get_opt.opt_arg ());
1836 break;
1837 case 'c': // number of clients
1838 clients = ACE_OS::atoi (get_opt.opt_arg ());
1839 if (clients > MAX_CLIENTS)
1840 clients = MAX_CLIENTS;
1841 break;
1842 case 'o': // max number of aio for proactor
1843 max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
1844 break;
1845 case 't': // Proactor Type
1846 if (set_proactor_type (get_opt.opt_arg ()))
1847 break;
1848 return print_usage (argc, argv);
1849 case 'u':
1850 default:
1851 return print_usage (argc, argv);
1852 } // switch
1853 } // while
1855 if (proactor_type == SUN && threads > 1)
1857 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sun aiowait is not thread-safe; ")
1858 ACE_TEXT ("changing to 1 thread\n")));
1859 threads = 1;
1862 return 0;
1866 run_main (int argc, ACE_TCHAR *argv[])
1868 ACE_START_TEST (ACE_TEXT ("Proactor_Test"));
1870 if (::parse_args (argc, argv) == -1)
1871 return -1;
1873 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
1874 disable_signal (SIGPIPE, SIGPIPE);
1876 MyTask task1;
1877 TestData test;
1879 if (task1.start (threads, proactor_type, max_aio_operations) == 0)
1881 Acceptor acceptor (&test);
1882 Connector connector (&test);
1883 ACE_INET_Addr addr (port);
1885 int rc = 0;
1887 if (both != 0 || host == 0) // Acceptor
1889 // Simplify, initial read with zero size
1890 if (acceptor.open (addr, 0, 1) == 0)
1891 rc = 1;
1894 if (both != 0 || host != 0)
1896 if (host == 0)
1897 host = ACE_LOCALHOST;
1899 if (addr.set (port, host, 1, addr.get_type ()) == -1)
1900 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
1901 else
1902 rc += connector.start (addr, clients);
1905 // Wait a few seconds to let things get going, then poll til
1906 // all sessions are done. Note that when we exit this scope, the
1907 // Acceptor and Connector will be destroyed, which should prevent
1908 // further connections and also test how well destroyed handlers
1909 // are handled.
1910 ACE_OS::sleep (3);
1912 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
1913 while (!test.testing_done ())
1914 ACE_OS::sleep (1);
1916 test.stop_all ();
1918 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
1919 task1.stop ();
1921 ACE_END_TEST;
1923 return 0;
1926 #else
1929 run_main (int, ACE_TCHAR *[])
1931 ACE_START_TEST (ACE_TEXT ("Proactor_Test"));
1933 ACE_DEBUG ((LM_INFO,
1934 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
1935 ACE_TEXT ("Proactor_Test will not be run.\n")));
1937 ACE_END_TEST;
1939 return 0;
1942 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */