Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / ACE / tests / Proactor_Test.cpp
blob6657c096a158b02ba899163ea7d364766e3e4080
1 // ============================================================================
2 /**
3 * @file Proactor_Test.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that does various asynchronous
7 * operations.
9 * @author Alexander Libman <alibman@baltimore.com>
11 // ============================================================================
13 #include "test_config.h"
15 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
16 // This only works on Win32 platforms and on Unix platforms
17 // supporting POSIX aio calls.
19 #include "ace/Signal.h"
21 #include "ace/Service_Config.h"
22 #include "ace/INET_Addr.h"
23 #include "ace/SOCK_Connector.h"
24 #include "ace/SOCK_Acceptor.h"
25 #include "ace/SOCK_Stream.h"
26 #include "ace/Object_Manager.h"
27 #include "ace/Get_Opt.h"
29 #include "ace/Proactor.h"
30 #include "ace/Asynch_Acceptor.h"
31 #include "ace/Asynch_Connector.h"
32 #include "ace/Task.h"
33 #include "ace/Thread_Semaphore.h"
34 #include "ace/OS_NS_ctype.h"
35 #include "ace/OS_NS_errno.h"
36 #include "ace/OS_NS_signal.h"
37 #include "ace/OS_NS_string.h"
38 #include "ace/OS_NS_unistd.h"
39 #include "ace/OS_NS_sys_socket.h"
40 #include "ace/os_include/netinet/os_tcp.h"
42 #include "ace/Atomic_Op.h"
43 #include "ace/Synch_Traits.h"
45 #if defined (ACE_WIN32)
47 # include "ace/WIN32_Proactor.h"
49 #elif defined (ACE_HAS_AIO_CALLS)
51 # include "ace/POSIX_Proactor.h"
52 # include "ace/POSIX_CB_Proactor.h"
54 #endif /* ACE_WIN32 */
56 #include "Proactor_Test.h"
59 // Proactor Type (UNIX only, Win32 ignored)
60 using ProactorType = enum { DEFAULT = 0, AIOCB, SIG, CB };
61 static ProactorType proactor_type = DEFAULT;
63 // POSIX : > 0 max number aio operations proactor,
64 static size_t max_aio_operations = 0;
66 // both: 0 run client or server / depends on host
67 // != 0 run client and server
68 static int both = 0;
70 // Host that we're connecting to.
71 static const ACE_TCHAR *host = 0;
73 // number of Client instances
74 static int clients = 1;
75 const int MAX_CLIENTS = 1000;
76 const int MAX_SERVERS = 1000;
78 // duplex mode: == 0 half-duplex
79 // != 0 full duplex
80 static int duplex = 0;
82 // number threads in the Proactor thread pool
83 static int threads = 1;
85 // Port that we're receiving connections on.
86 static u_short port = ACE_DEFAULT_SERVER_PORT;
88 // Log options
89 static int loglevel; // 0 full , 1 only errors
91 static size_t xfer_limit; // Number of bytes for Client to send.
93 static char complete_message[] =
94 "GET / HTTP/1.1\r\n"
95 "Accept: */*\r\n"
96 "Accept-Language: C++\r\n"
97 "Accept-Encoding: gzip, deflate\r\n"
98 "User-Agent: Proactor_Test/1.0 (non-compatible)\r\n"
99 "Connection: Keep-Alive\r\n"
100 "\r\n";
102 class LogLocker
104 public:
105 LogLocker () { ACE_LOG_MSG->acquire (); }
106 virtual ~LogLocker () { ACE_LOG_MSG->release (); }
110 // Function to remove signals from the signal mask.
111 static int
112 disable_signal (int sigmin, int sigmax)
114 #if !defined (ACE_LACKS_UNIX_SIGNALS)
115 sigset_t signal_set;
116 if (ACE_OS::sigemptyset (&signal_set) == - 1)
117 ACE_ERROR ((LM_ERROR,
118 ACE_TEXT ("Error: (%P|%t):%p\n"),
119 ACE_TEXT ("sigemptyset failed")));
121 for (int i = sigmin; i <= sigmax; i++)
122 ACE_OS::sigaddset (&signal_set, i);
124 // Put the <signal_set>.
125 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
126 // In multi-threaded application this is not POSIX compliant
127 // but let's leave it just in case.
128 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
129 # else
130 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
131 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
132 ACE_ERROR_RETURN ((LM_ERROR,
133 ACE_TEXT ("Error: (%P|%t): %p\n"),
134 ACE_TEXT ("SIG_BLOCK failed")),
135 -1);
136 #else
137 ACE_UNUSED_ARG (sigmin);
138 ACE_UNUSED_ARG (sigmax);
139 #endif /* ACE_LACKS_UNIX_SIGNALS */
141 return 0;
145 // *************************************************************
146 // MyTask is ACE_Task resposible for :
147 // 1. creation and deletion of
148 // Proactor and Proactor thread pool
149 // 2. running Proactor event loop
150 // *************************************************************
153 * @class MyTask
155 * MyTask plays role for Proactor threads pool
157 * MyTask is ACE_Task resposible for:
158 * 1. Creation and deletion of Proactor and Proactor thread pool
159 * 2. Running Proactor event loop
161 class MyTask : public ACE_Task<ACE_MT_SYNCH>
163 public:
164 MyTask ():
165 lock_ (),
166 sem_ ((unsigned int) 0),
167 proactor_(0) {}
169 ~MyTask() override
171 (void) this->stop ();
172 (void) this->delete_proactor();
175 int svc () override;
177 int start (int num_threads,
178 ProactorType type_proactor,
179 size_t max_op );
180 int stop ();
182 private:
183 int create_proactor (ProactorType type_proactor,
184 size_t max_op);
185 int delete_proactor ();
187 ACE_SYNCH_RECURSIVE_MUTEX lock_;
188 ACE_Thread_Semaphore sem_;
189 ACE_Proactor * proactor_;
193 MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
195 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
196 monitor,
197 this->lock_,
198 -1);
200 ACE_TEST_ASSERT (this->proactor_ == 0);
202 #if defined (ACE_WIN32)
204 ACE_UNUSED_ARG (type_proactor);
205 ACE_UNUSED_ARG (max_op);
207 ACE_WIN32_Proactor *proactor_impl = 0;
209 ACE_NEW_RETURN (proactor_impl,
210 ACE_WIN32_Proactor,
211 -1);
213 ACE_DEBUG ((LM_DEBUG,
214 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
216 #elif defined (ACE_HAS_AIO_CALLS)
218 ACE_POSIX_Proactor * proactor_impl = 0;
220 switch (type_proactor)
222 case AIOCB:
223 ACE_NEW_RETURN (proactor_impl,
224 ACE_POSIX_AIOCB_Proactor (max_op),
225 -1);
226 ACE_DEBUG ((LM_DEBUG,
227 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
228 break;
230 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
231 case SIG:
232 ACE_NEW_RETURN (proactor_impl,
233 ACE_POSIX_SIG_Proactor (max_op),
234 -1);
235 ACE_DEBUG ((LM_DEBUG,
236 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
237 break;
238 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
240 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
241 case CB:
242 ACE_NEW_RETURN (proactor_impl,
243 ACE_POSIX_CB_Proactor (max_op),
244 -1);
245 ACE_DEBUG ((LM_DEBUG,
246 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
247 break;
248 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
250 default:
251 ACE_DEBUG ((LM_DEBUG,
252 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
253 break;
256 #endif /* ACE_WIN32 */
258 // always delete implementation 1 , not !(proactor_impl == 0)
259 ACE_NEW_RETURN (this->proactor_,
260 ACE_Proactor (proactor_impl, 1 ),
261 -1);
262 // Set new singleton and delete it in close_singleton()
263 ACE_Proactor::instance (this->proactor_, 1);
264 return 0;
268 MyTask::delete_proactor ()
270 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
271 monitor,
272 this->lock_,
273 -1);
275 ACE_DEBUG ((LM_DEBUG,
276 ACE_TEXT ("(%t) Delete Proactor\n")));
278 ACE_Proactor::close_singleton ();
279 this->proactor_ = 0;
281 return 0;
285 MyTask::start (int num_threads,
286 ProactorType type_proactor,
287 size_t max_op)
289 if (this->create_proactor (type_proactor, max_op) == -1)
290 ACE_ERROR_RETURN ((LM_ERROR,
291 ACE_TEXT ("%p.\n"),
292 ACE_TEXT ("unable to create proactor")),
293 -1);
295 if (this->activate (THR_NEW_LWP, num_threads) == -1)
296 ACE_ERROR_RETURN ((LM_ERROR,
297 ACE_TEXT ("%p.\n"),
298 ACE_TEXT ("unable to activate thread pool")),
299 -1);
301 for (; num_threads > 0; num_threads--)
303 sem_.acquire ();
306 return 0;
311 MyTask::stop ()
313 if (this->proactor_ != 0)
315 ACE_DEBUG ((LM_DEBUG,
316 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
318 this->proactor_->proactor_end_event_loop ();
321 if (this->wait () == -1)
322 ACE_ERROR ((LM_ERROR,
323 ACE_TEXT ("%p.\n"),
324 ACE_TEXT ("unable to stop thread pool")));
326 return 0;
330 MyTask::svc ()
332 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n")));
334 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
335 disable_signal (SIGPIPE, SIGPIPE);
337 // signal that we are ready
338 sem_.release (1);
340 ACE_Proactor::run_event_loop ();
342 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n")));
343 return 0;
347 // TestData collects and reports on test-related transfer and connection
348 // statistics.
349 class TestData
351 public:
352 TestData ();
353 bool testing_done ();
354 Server *server_up ();
355 Client *client_up ();
356 void server_done (Server *s);
357 void client_done (Client *c);
358 void stop_all ();
359 void report ();
361 private:
362 struct Local_Stats
364 // Track number of sessions that report start, and those that report
365 // their end (and stats).
366 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_up_;
367 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_down_;
369 // Total read and write bytes for all sessions.
370 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_cnt_;
371 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_cnt_;
372 // Total read and write operations issues for all sessions.
373 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_ops_;
374 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_ops_;
375 } servers_, clients_;
377 ACE_SYNCH_MUTEX list_lock_;
378 Server *server_list_[MAX_SERVERS];
379 Client *client_list_[MAX_CLIENTS];
382 TestData::TestData ()
384 int i;
385 for (i = 0; i < MAX_SERVERS; ++i)
386 this->server_list_[i] = 0;
387 for (i = 0; i < MAX_CLIENTS; ++i)
388 this->client_list_[i] = 0;
391 bool
392 TestData::testing_done ()
394 int svr_up = this->servers_.sessions_up_.value ();
395 int svr_dn = this->servers_.sessions_down_.value ();
396 int clt_up = this->clients_.sessions_up_.value ();
397 int clt_dn = this->clients_.sessions_down_.value ();
399 if (svr_up == 0 && clt_up == 0) // No connections up yet
400 return false;
402 return (svr_dn >= svr_up && clt_dn >= clt_up);
405 Server *
406 TestData::server_up ()
408 ++this->servers_.sessions_up_;
409 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
411 for (int i = 0; i < MAX_SERVERS; ++i)
413 if (this->server_list_[i] == 0)
415 ACE_NEW_RETURN (this->server_list_[i], Server (this, i), 0);
416 ACE_DEBUG ((LM_DEBUG,
417 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
419 this->servers_.sessions_up_.value (),
420 this->servers_.sessions_down_.value ()));
421 return this->server_list_[i];
424 return 0;
427 Client *
428 TestData::client_up ()
430 ++this->clients_.sessions_up_;
431 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
433 for (int i = 0; i < MAX_CLIENTS; ++i)
435 if (this->client_list_[i] == 0)
437 ACE_NEW_RETURN (this->client_list_[i], Client (this, i), 0);
438 ACE_DEBUG ((LM_DEBUG,
439 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
441 this->clients_.sessions_up_.value (),
442 this->clients_.sessions_down_.value ()));
443 return this->client_list_[i];
446 return 0;
449 void
450 TestData::server_done (Server *s)
452 this->servers_.w_cnt_ += s->get_total_snd ();
453 this->servers_.r_cnt_ += s->get_total_rcv ();
454 this->servers_.w_ops_ += s->get_total_w ();
455 this->servers_.r_ops_ += s->get_total_r ();
456 ++this->servers_.sessions_down_;
457 ACE_DEBUG ((LM_DEBUG,
458 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
459 s->id (),
460 this->servers_.sessions_up_.value (),
461 this->servers_.sessions_down_.value ()));
463 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
464 int i;
465 for (i = 0; i < MAX_SERVERS; ++i)
467 if (this->server_list_[i] == s)
469 if (s->id () != i)
470 ACE_ERROR ((LM_ERROR,
471 ACE_TEXT ("Server %d is pos %d in list\n"),
472 s->id (),
473 i));
474 this->server_list_[i] = 0;
475 break;
478 if (i >= MAX_SERVERS)
479 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Server %@ done but not listed\n"), s));
481 return;
484 void
485 TestData::client_done (Client *c)
487 this->clients_.w_cnt_ += c->get_total_snd ();
488 this->clients_.r_cnt_ += c->get_total_rcv ();
489 this->clients_.w_ops_ += c->get_total_w ();
490 this->clients_.r_ops_ += c->get_total_r ();
491 ++this->clients_.sessions_down_;
492 ACE_DEBUG ((LM_DEBUG,
493 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
494 c->id (),
495 this->clients_.sessions_up_.value (),
496 this->clients_.sessions_down_.value ()));
498 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
499 int i;
500 for (i = 0; i < MAX_CLIENTS; ++i)
502 if (this->client_list_[i] == c)
504 if (c->id () != i)
505 ACE_ERROR ((LM_ERROR,
506 ACE_TEXT ("Client %d is pos %d in list\n"),
507 c->id (),
508 i));
509 this->client_list_[i] = 0;
510 break;
513 if (i >= MAX_CLIENTS)
514 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Client %@ done but not listed\n"), c));
516 return;
519 void
520 TestData::stop_all ()
522 int i;
524 // Lock and cancel everything. Then release the lock, possibly allowing
525 // cleanups, then grab it again and delete all Servers and Clients.
527 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
528 for (i = 0; i < MAX_CLIENTS; ++i)
530 if (this->client_list_[i] != 0)
531 this->client_list_[i]->cancel ();
534 for (i = 0; i < MAX_SERVERS; ++i)
536 if (this->server_list_[i] != 0)
537 this->server_list_[i]->cancel ();
541 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
542 for (i = 0; i < MAX_CLIENTS; ++i)
544 if (this->client_list_[i] != 0)
545 delete this->client_list_[i];
548 for (i = 0; i < MAX_SERVERS; ++i)
550 if (this->server_list_[i] != 0)
551 delete this->server_list_[i];
556 void
557 TestData::report ()
559 // Print statistics
560 ACE_TCHAR bufs [256];
561 ACE_TCHAR bufr [256];
563 ACE_OS::snprintf (bufs, 256,
564 ACE_SIZE_T_FORMAT_SPECIFIER
565 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
566 this->clients_.w_cnt_.value (),
567 this->clients_.w_ops_.value ());
569 ACE_OS::snprintf (bufr, 256,
570 ACE_SIZE_T_FORMAT_SPECIFIER
571 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
572 this->clients_.r_cnt_.value (),
573 this->clients_.r_ops_.value ());
575 ACE_DEBUG ((LM_DEBUG,
576 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
577 bufs,
578 bufr));
580 ACE_OS::snprintf (bufs, 256,
581 ACE_SIZE_T_FORMAT_SPECIFIER
582 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
583 this->servers_.w_cnt_.value (),
584 this->servers_.w_ops_.value ());
586 ACE_OS::snprintf (bufr, 256,
587 ACE_SIZE_T_FORMAT_SPECIFIER
588 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
589 this->servers_.r_cnt_.value (),
590 this->servers_.r_ops_.value ());
592 ACE_DEBUG ((LM_DEBUG,
593 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
594 bufs,
595 bufr));
597 if (this->clients_.w_cnt_.value () == 0 ||
598 this->clients_.r_cnt_.value () == 0 ||
599 this->servers_.w_cnt_.value () == 0 ||
600 this->servers_.r_cnt_.value () == 0 )
601 ACE_ERROR ((LM_ERROR, ACE_TEXT ("It appears that this test didn't ")
602 ACE_TEXT ("really do anything. Something is very wrong.\n")));
606 class Acceptor : public ACE_Asynch_Acceptor<Server>
608 public:
609 Acceptor (TestData *tester);
610 ~Acceptor () override;
612 // Virtual from ACE_Asynch_Acceptor
613 Server *make_handler () override;
615 private:
616 TestData *tester_;
619 // *************************************************************
620 Acceptor::Acceptor (TestData *tester)
621 : tester_ (tester)
625 Acceptor::~Acceptor ()
627 this->cancel ();
630 Server *
631 Acceptor::make_handler ()
633 return this->tester_->server_up ();
636 // ***************************************************
637 Server::Server ()
639 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
642 Server::Server (TestData *tester, int id)
643 : tester_ (tester),
644 id_ (id),
645 handle_ (ACE_INVALID_HANDLE),
646 io_count_ (0),
647 flg_cancel_(0),
648 total_snd_(0),
649 total_rcv_(0),
650 total_w_ (0),
651 total_r_ (0)
655 Server::~Server ()
657 ACE_DEBUG ((LM_DEBUG,
658 ACE_TEXT ("(%t) Server %d dtor; %d sends (%B bytes); ")
659 ACE_TEXT ("%d recvs (%B bytes)\n"),
660 this->id_,
661 this->total_w_, this->total_snd_,
662 this->total_r_, this->total_rcv_));
663 if (this->io_count_ != 0)
664 ACE_ERROR ((LM_WARNING,
665 ACE_TEXT ("(%t) Server %d deleted with ")
666 ACE_TEXT ("%d I/O outstanding\n"),
667 this->id_,
668 this->io_count_));
670 // This test bounces data back and forth between Clients and Servers.
671 // Therefore, if there was significantly more data in one direction, that's
672 // a problem. Remember, the byte counts are unsigned values.
673 int issue_data_warning = 0;
674 if (this->total_snd_ > this->total_rcv_)
676 if (this->total_rcv_ == 0)
677 issue_data_warning = 1;
678 else if (this->total_snd_ / this->total_rcv_ > 2)
679 issue_data_warning = 1;
681 else
683 if (this->total_snd_ == 0)
684 issue_data_warning = 1;
685 else if (this->total_rcv_ / this->total_snd_ > 2)
686 issue_data_warning = 1;
688 if (issue_data_warning)
689 ACE_DEBUG ((LM_WARNING,
690 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
692 if (this->tester_ != 0)
693 this->tester_->server_done (this);
695 if (this->handle_ != ACE_INVALID_HANDLE)
696 ACE_OS::closesocket (this->handle_);
698 this->id_ = -1;
699 this->handle_= ACE_INVALID_HANDLE;
702 void
703 Server::cancel ()
705 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
707 this->flg_cancel_ = 1;
708 this->ws_.cancel ();
709 this->rs_.cancel ();
710 return;
714 void
715 Server::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&)
717 ACE_TCHAR str[256];
718 if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
719 ACE_DEBUG ((LM_DEBUG,
720 ACE_TEXT ("(%t) Server %d connection from %s\n"),
721 this->id_,
722 str));
723 else
724 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Server %d %p\n"),
725 this->id_,
726 ACE_TEXT ("addr_to_string")));
727 return;
731 void
732 Server::open (ACE_HANDLE handle, ACE_Message_Block &)
735 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
737 // Don't buffer serial sends.
738 this->handle_ = handle;
739 int nodelay = 1;
740 ACE_SOCK_Stream option_setter (handle);
741 if (-1 == option_setter.set_option (ACE_IPPROTO_TCP,
742 TCP_NODELAY,
743 &nodelay,
744 sizeof (nodelay)))
745 ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
747 if (this->ws_.open (*this, this->handle_) == -1)
748 ACE_ERROR ((LM_ERROR,
749 ACE_TEXT ("(%t) %p\n"),
750 ACE_TEXT ("Server::ACE_Asynch_Write_Stream::open")));
751 else if (this->rs_.open (*this, this->handle_) == -1)
752 ACE_ERROR ((LM_ERROR,
753 ACE_TEXT ("(%t) %p\n"),
754 ACE_TEXT ("Server::ACE_Asynch_Read_Stream::open")));
755 else
756 this->initiate_read_stream ();
758 if (this->io_count_ > 0)
759 return;
761 delete this;
765 Server::initiate_read_stream ()
767 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
768 return -1;
770 ACE_Message_Block *mb = 0;
771 ACE_NEW_RETURN (mb,
772 ACE_Message_Block (1024), //BUFSIZ + 1),
773 -1);
775 // Inititiate read
776 if (this->rs_.read (*mb, mb->size () - 1) == -1)
778 mb->release ();
779 #if defined (ACE_WIN32)
780 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
781 // a 0-byte read as we would if underlying calls used WSARecv.
782 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
783 ACE_ERROR_RETURN ((LM_DEBUG,
784 ACE_TEXT ("(%t) Server %d, peer closed\n"),
785 this->id_),
786 -1);
787 #endif /* ACE_WIN32 */
788 ACE_ERROR_RETURN ((LM_ERROR,
789 ACE_TEXT ("(%t) Server %d, %p\n"),
790 this->id_,
791 ACE_TEXT ("read")),
792 -1);
795 this->io_count_++;
796 this->total_r_++;
797 return 0;
801 Server::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
803 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
805 mb.release ();
806 return -1;
809 if (nbytes == 0)
811 mb.release ();
812 ACE_ERROR_RETURN((LM_ERROR,
813 ACE_TEXT ("(%t) Server::ACE_Asynch_Write_Stream::write nbytes <0 ")),
814 -1);
817 if (this->ws_.write (mb, nbytes) == -1)
819 mb.release ();
820 #if defined (ACE_WIN32)
821 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
822 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
823 ACE_ERROR_RETURN ((LM_DEBUG,
824 ACE_TEXT ("(%t) Server %d, peer gone\n"),
825 this->id_),
826 -1);
827 #endif /* ACE_WIN32 */
828 ACE_ERROR_RETURN((LM_ERROR,
829 ACE_TEXT ("(%t) Server %d, %p\n"),
830 this->id_,
831 ACE_TEXT ("write")),
832 -1);
835 this->io_count_++;
836 this->total_w_++;
837 return 0;
840 void
841 Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
844 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
846 ACE_Message_Block & mb = result.message_block ();
848 // Reset pointers.
849 mb.rd_ptr ()[result.bytes_transferred ()] = '\0';
851 if (loglevel > 1)
853 LogLocker log_lock;
855 ACE_DEBUG ((LM_DEBUG,
856 ACE_TEXT ("(%t) **** Server %d: handle_read_stream() ****\n"),
857 this->id_));
858 ACE_DEBUG ((LM_DEBUG,
859 ACE_TEXT ("%s = %B\n"),
860 ACE_TEXT ("bytes_to_read"),
861 result.bytes_to_read ()));
862 ACE_DEBUG ((LM_DEBUG,
863 ACE_TEXT ("%s = %d\n"),
864 ACE_TEXT ("handle"),
865 result.handle ()));
866 ACE_DEBUG ((LM_DEBUG,
867 ACE_TEXT ("%s = %B\n"),
868 ACE_TEXT ("bytes_transfered"),
869 result.bytes_transferred ()));
870 ACE_DEBUG ((LM_DEBUG,
871 ACE_TEXT ("%s = %@\n"),
872 ACE_TEXT ("act"),
873 result.act ()));
874 ACE_DEBUG ((LM_DEBUG,
875 ACE_TEXT ("%s = %d\n"),
876 ACE_TEXT ("success"),
877 result.success ()));
878 ACE_DEBUG ((LM_DEBUG,
879 ACE_TEXT ("%s = %@\n"),
880 ACE_TEXT ("completion_key"),
881 result.completion_key ()));
882 ACE_DEBUG ((LM_DEBUG,
883 ACE_TEXT ("%s = %d\n"),
884 ACE_TEXT ("error"),
885 result.error ()));
886 ACE_DEBUG ((LM_DEBUG,
887 ACE_TEXT ("%s = %s\n"),
888 ACE_TEXT ("message_block"),
889 mb.rd_ptr ()));
890 ACE_DEBUG ((LM_DEBUG,
891 ACE_TEXT ("**** end of message ****************\n")));
893 else if (result.error () != 0)
895 ACE_Log_Priority prio;
896 #if defined (ACE_WIN32)
897 if (result.error () == ERROR_OPERATION_ABORTED)
898 prio = LM_DEBUG;
899 #else
900 if (result.error () == ECANCELED)
901 prio = LM_DEBUG;
902 #endif /* ACE_WIN32 */
903 else
904 prio = LM_ERROR;
905 ACE_Log_Msg::instance ()->errnum (result.error ());
906 ACE_Log_Msg::instance ()->log (prio,
907 ACE_TEXT ("(%t) Server %d; %p\n"),
908 this->id_,
909 ACE_TEXT ("read"));
911 else if (loglevel > 0)
913 ACE_DEBUG ((LM_DEBUG,
914 ACE_TEXT ("(%t) Server %d: read %B bytes\n"),
915 this->id_,
916 result.bytes_transferred ()));
919 if (result.error () == 0 && result.bytes_transferred () > 0)
921 this->total_rcv_ += result.bytes_transferred ();
923 if (this->initiate_write_stream (mb,
924 result.bytes_transferred ()) == 0)
926 if (duplex != 0) // Initiate new read from the stream.
927 this->initiate_read_stream ();
930 else
931 mb.release ();
933 --this->io_count_;
934 if (this->io_count_ > 0)
935 return;
937 delete this;
940 void
941 Server::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
944 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
946 ACE_Message_Block & mb = result.message_block ();
948 if (loglevel > 1)
950 LogLocker log_lock;
952 //mb.rd_ptr () [0] = '\0';
953 mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
955 ACE_DEBUG ((LM_DEBUG,
956 ACE_TEXT ("(%t) **** Server %d: handle_write_stream() ****\n"),
957 this->id_));
958 ACE_DEBUG ((LM_DEBUG,
959 ACE_TEXT ("%s = %B\n"),
960 ACE_TEXT ("bytes_to_write"),
961 result.bytes_to_write ()));
962 ACE_DEBUG ((LM_DEBUG,
963 ACE_TEXT ("%s = %d\n"),
964 ACE_TEXT ("handle"),
965 result.handle ()));
966 ACE_DEBUG ((LM_DEBUG,
967 ACE_TEXT ("%s = %B\n"),
968 ACE_TEXT ("bytes_transfered"),
969 result.bytes_transferred ()));
970 ACE_DEBUG ((LM_DEBUG,
971 ACE_TEXT ("%s = %@\n"),
972 ACE_TEXT ("act"),
973 result.act ()));
974 ACE_DEBUG ((LM_DEBUG,
975 ACE_TEXT ("%s = %d\n"),
976 ACE_TEXT ("success"),
977 result.success ()));
978 ACE_DEBUG ((LM_DEBUG,
979 ACE_TEXT ("%s = %@\n"),
980 ACE_TEXT ("completion_key"),
981 result.completion_key ()));
982 ACE_DEBUG ((LM_DEBUG,
983 ACE_TEXT ("%s = %d\n"),
984 ACE_TEXT ("error"),
985 result.error ()));
986 ACE_DEBUG ((LM_DEBUG,
987 ACE_TEXT ("%s = %s\n"),
988 ACE_TEXT ("message_block"),
989 mb.rd_ptr ()));
990 ACE_DEBUG ((LM_DEBUG,
991 ACE_TEXT ("**** end of message ****************\n")));
993 else if (result.error () != 0)
995 ACE_Log_Priority prio;
996 #if defined (ACE_WIN32)
997 if (result.error () == ERROR_OPERATION_ABORTED)
998 prio = LM_DEBUG;
999 #else
1000 if (result.error () == ECANCELED)
1001 prio = LM_DEBUG;
1002 #endif /* ACE_WIN32 */
1003 else
1004 prio = LM_ERROR;
1005 ACE_Log_Msg::instance ()->errnum (result.error ());
1006 ACE_Log_Msg::instance ()->log (prio,
1007 ACE_TEXT ("(%t) Server %d; %p\n"),
1008 this->id_,
1009 ACE_TEXT ("write"));
1011 else if (loglevel > 0)
1013 ACE_DEBUG ((LM_DEBUG,
1014 ACE_TEXT ("(%t) Server %d: wrote %B bytes ok\n"),
1015 this->id_,
1016 result.bytes_transferred ()));
1019 mb.release ();
1021 if (result.error () == 0 && result.bytes_transferred () > 0)
1023 this->total_snd_ += result.bytes_transferred ();
1025 if (duplex == 0)
1026 this->initiate_read_stream ();
1029 --this->io_count_;
1030 if (this->io_count_ > 0)
1031 return;
1033 delete this;
1036 // *******************************************
1037 // Connector
1038 // *******************************************
1040 class Connector : public ACE_Asynch_Connector<Client>
1042 public:
1043 Connector (TestData *tester);
1044 ~Connector () override;
1046 int start (const ACE_INET_Addr &addr, int num);
1048 // Virtual from ACE_Asynch_Connector
1049 Client *make_handler () override;
1051 private:
1052 TestData *tester_;
1055 // *************************************************************
1057 Connector::Connector (TestData *tester)
1058 : tester_ (tester)
1062 Connector::~Connector ()
1064 this->cancel ();
1067 Client *
1068 Connector::make_handler ()
1070 return this->tester_->client_up ();
1075 Connector::start (const ACE_INET_Addr& addr, int num)
1077 if (num > MAX_CLIENTS)
1078 num = MAX_CLIENTS;
1080 if (num < 0)
1081 num = 1;
1083 int rc = 0;
1085 // int open ( int pass_addresses = 0,
1086 // ACE_Proactor *proactor = 0,
1087 // int validate_new_connection = 0 );
1089 if (this->open (1, 0, 1) != 0)
1091 ACE_ERROR ((LM_ERROR,
1092 ACE_TEXT ("(%t) %p\n"),
1093 ACE_TEXT ("Connector::open failed")));
1094 return rc;
1097 for (; rc < num; rc++)
1099 if (this->connect (addr) != 0)
1101 ACE_ERROR ((LM_ERROR,
1102 ACE_TEXT ("(%t) %p\n"),
1103 ACE_TEXT ("Connector::connect failed")));
1104 break;
1107 return rc;
1111 Client::Client ()
1113 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
1116 Client::Client (TestData *tester, int id)
1117 : tester_ (tester),
1118 id_ (id),
1119 handle_ (ACE_INVALID_HANDLE),
1120 io_count_ (0),
1121 stop_writing_ (0),
1122 flg_cancel_ (0),
1123 total_snd_ (0),
1124 total_rcv_ (0),
1125 total_w_ (0),
1126 total_r_ (0)
1130 Client::~Client ()
1132 ACE_DEBUG ((LM_DEBUG,
1133 ACE_TEXT ("(%t) Client %d dtor; %d sends (%B bytes); ")
1134 ACE_TEXT ("%d recvs (%B bytes)\n"),
1135 this->id_,
1136 this->total_w_, this->total_snd_,
1137 this->total_r_, this->total_rcv_));
1138 if (this->io_count_ != 0)
1139 ACE_ERROR ((LM_WARNING,
1140 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1141 this->id_,
1142 this->io_count_));
1144 // This test bounces data back and forth between Clients and Servers.
1145 // Therefore, if there was significantly more data in one direction, that's
1146 // a problem. Remember, the byte counts are unsigned values.
1147 int issue_data_warning = 0;
1148 if (this->total_snd_ > this->total_rcv_)
1150 if (this->total_rcv_ == 0)
1151 issue_data_warning = 1;
1152 else if (this->total_snd_ / this->total_rcv_ > 2)
1153 issue_data_warning = 1;
1155 else
1157 if (this->total_snd_ == 0)
1158 issue_data_warning = 1;
1159 else if (this->total_rcv_ / this->total_snd_ > 2)
1160 issue_data_warning = 1;
1162 if (issue_data_warning)
1163 ACE_DEBUG ((LM_WARNING,
1164 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1166 if (this->tester_ != 0)
1167 this->tester_->client_done (this);
1169 this->id_ = -1;
1170 this->handle_= ACE_INVALID_HANDLE;
1171 if (this->handle_ != ACE_INVALID_HANDLE)
1173 ACE_OS::closesocket (this->handle_);
1175 this->handle_= ACE_INVALID_HANDLE;
1178 void
1179 Client::cancel ()
1181 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1183 this->flg_cancel_ = 1;
1184 this->ws_.cancel ();
1185 this->rs_.cancel ();
1186 return;
1189 void
1190 Client::close ()
1192 // This must be called with the lock_ held.
1193 ACE_DEBUG ((LM_DEBUG,
1194 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1195 this->id_, this->io_count_));
1196 ACE_OS::shutdown (this->handle_, ACE_SHUTDOWN_WRITE);
1197 this->stop_writing_ = 1;
1198 return;
1202 void
1203 Client::addresses (const ACE_INET_Addr& /* peer */, const ACE_INET_Addr& local)
1205 ACE_TCHAR str[256];
1206 if (0 == local.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
1207 ACE_DEBUG ((LM_DEBUG,
1208 ACE_TEXT ("(%t) Client %d connected on %s\n"),
1209 this->id_,
1210 str));
1211 else
1212 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Client %d %p\n"),
1213 this->id_,
1214 ACE_TEXT ("addr_to_string")));
1215 return;
1219 void
1220 Client::open (ACE_HANDLE handle, ACE_Message_Block &)
1223 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1225 // Don't buffer serial sends.
1226 this->handle_ = handle;
1227 int nodelay = 1;
1228 ACE_SOCK_Stream option_setter (handle);
1229 if (option_setter.set_option (ACE_IPPROTO_TCP,
1230 TCP_NODELAY,
1231 &nodelay,
1232 sizeof (nodelay)))
1233 ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
1235 // Open ACE_Asynch_Write_Stream
1236 if (this->ws_.open (*this, this->handle_) == -1)
1237 ACE_ERROR ((LM_ERROR,
1238 ACE_TEXT ("(%t) %p\n"),
1239 ACE_TEXT ("Client::ACE_Asynch_Write_Stream::open")));
1241 // Open ACE_Asynch_Read_Stream
1242 else if (this->rs_.open (*this, this->handle_) == -1)
1243 ACE_ERROR ((LM_ERROR,
1244 ACE_TEXT ("(%t) %p\n"),
1245 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::open")));
1247 else if (this->initiate_write_stream () == 0)
1249 if (duplex != 0) // Start an asynchronous read
1250 this->initiate_read_stream ();
1253 if (this->io_count_ > 0)
1254 return;
1256 delete this;
1260 Client::initiate_write_stream ()
1262 if (this->flg_cancel_ != 0 ||
1263 this->stop_writing_ ||
1264 this->handle_ == ACE_INVALID_HANDLE)
1265 return -1;
1267 static const size_t complete_message_length = ACE_OS::strlen (complete_message);
1269 #if defined (ACE_WIN32)
1271 ACE_Message_Block *mb1 = 0,
1272 *mb2 = 0,
1273 *mb3 = 0;
1275 // No need to allocate +1 for proper printing - the memory includes it already
1276 ACE_NEW_RETURN (mb1,
1277 ACE_Message_Block ((char *)complete_message,
1278 complete_message_length),
1279 -1);
1281 ACE_NEW_RETURN (mb2,
1282 ACE_Message_Block ((char *)complete_message,
1283 complete_message_length),
1284 -1);
1286 ACE_NEW_RETURN (mb3,
1287 ACE_Message_Block ((char *)complete_message,
1288 complete_message_length),
1289 -1);
1291 mb1->wr_ptr (complete_message_length);
1292 mb2->wr_ptr (complete_message_length);
1293 mb3->wr_ptr (complete_message_length);
1295 // chain them together
1296 mb1->cont (mb2);
1297 mb2->cont (mb3);
1299 if (this->ws_.writev (*mb1, mb1->total_length ()) == -1)
1301 mb1->release ();
1302 ACE_ERROR_RETURN((LM_ERROR,
1303 ACE_TEXT ("(%t) %p\n"),
1304 ACE_TEXT ("Client::ACE_Asynch_Stream::writev")),
1305 -1);
1307 #else /* ACE_WIN32 */
1309 ACE_Message_Block *mb = 0;
1311 // No need to allocate +1 for proper printing - the memory includes it already
1312 ACE_NEW_RETURN (mb,
1313 ACE_Message_Block (complete_message, complete_message_length),
1314 -1);
1315 mb->wr_ptr (complete_message_length);
1317 if (this->ws_.write (*mb, mb->length ()) == -1)
1319 mb->release ();
1320 #if defined (ACE_WIN32)
1321 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
1322 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
1323 ACE_ERROR_RETURN ((LM_DEBUG,
1324 ACE_TEXT ("(%t) Client %d, peer gone\n"),
1325 this->id_),
1326 -1);
1327 #endif /* ACE_WIN32 */
1328 ACE_ERROR_RETURN((LM_ERROR,
1329 ACE_TEXT ("(%t) Client %d, %p\n"),
1330 this->id_,
1331 ACE_TEXT ("write")),
1332 -1);
1334 #endif /* ACE_WIN32 */
1336 this->io_count_++;
1337 this->total_w_++;
1338 return 0;
1342 Client::initiate_read_stream ()
1344 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
1345 return -1;
1347 static const size_t complete_message_length =
1348 ACE_OS::strlen (complete_message);
1350 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
1351 ACE_Message_Block *mb1 = 0,
1352 *mb2 = 0,
1353 *mb3 = 0,
1354 *mb4 = 0,
1355 *mb5 = 0,
1356 *mb6 = 0;
1358 // We allocate +1 only for proper printing - we can just set the last byte
1359 // to '\0' before printing out
1360 ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1);
1361 ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1);
1362 ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1);
1364 // Let allocate memory for one more triplet,
1365 // This improves performance
1366 // as we can receive more the than one block at once
1367 // Generally, we can receive more triplets ....
1368 ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1);
1369 ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1);
1370 ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1);
1372 mb1->cont (mb2);
1373 mb2->cont (mb3);
1375 mb3->cont (mb4);
1376 mb4->cont (mb5);
1377 mb5->cont (mb6);
1380 // hide last byte in each message block, reserving it for later to set '\0'
1381 // for proper printouts
1382 mb1->size (mb1->size () - 1);
1383 mb2->size (mb2->size () - 1);
1384 mb3->size (mb3->size () - 1);
1386 mb4->size (mb4->size () - 1);
1387 mb5->size (mb5->size () - 1);
1388 mb6->size (mb6->size () - 1);
1390 // Inititiate read
1391 if (this->rs_.readv (*mb1, mb1->total_size () - 1) == -1)
1393 mb1->release ();
1394 ACE_ERROR_RETURN ((LM_ERROR,
1395 ACE_TEXT ("(%t) %p\n"),
1396 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1397 -1);
1399 #else /* ACE_HAS_WIN32_OVERLAPPED_IO */
1401 // Try to read more chunks
1402 size_t blksize = ( complete_message_length > BUFSIZ ) ?
1403 complete_message_length : BUFSIZ;
1405 ACE_Message_Block *mb = 0;
1407 // We allocate +1 only for proper printing - we can just set the last byte
1408 // to '\0' before printing out
1409 ACE_NEW_RETURN (mb,
1410 ACE_Message_Block (blksize + 1),
1411 -1);
1413 // Inititiate read
1414 if (this->rs_.read (*mb, mb->size () - 1) == -1)
1416 mb->release ();
1417 #if defined (ACE_WIN32)
1418 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
1419 // a 0-byte read as we would if underlying calls used WSARecv.
1420 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
1421 ACE_ERROR_RETURN ((LM_DEBUG,
1422 ACE_TEXT ("(%t) Client %d, peer closed\n"),
1423 this->id_),
1424 -1);
1425 #endif /* ACE_WIN32 */
1426 ACE_ERROR_RETURN ((LM_ERROR,
1427 ACE_TEXT ("(%t) Client %d, %p\n"),
1428 this->id_,
1429 ACE_TEXT ("read")),
1430 -1);
1432 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
1434 this->io_count_++;
1435 this->total_r_++;
1436 return 0;
1439 void
1440 Client::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
1443 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1445 ACE_Message_Block & mb = result.message_block ();
1447 if (loglevel > 1)
1449 LogLocker log_lock;
1451 ACE_DEBUG ((LM_DEBUG,
1452 ACE_TEXT ("(%t) **** Client %d: handle_write_stream() ****\n"),
1453 this->id_));
1454 ACE_DEBUG ((LM_DEBUG,
1455 ACE_TEXT ("%s = %B\n"),
1456 ACE_TEXT ("bytes_to_write"),
1457 result.bytes_to_write ()));
1458 ACE_DEBUG ((LM_DEBUG,
1459 ACE_TEXT ("%s = %d\n"),
1460 ACE_TEXT ("handle"),
1461 result.handle ()));
1462 ACE_DEBUG ((LM_DEBUG,
1463 ACE_TEXT ("%s = %B\n"),
1464 ACE_TEXT ("bytes_transfered"),
1465 result.bytes_transferred ()));
1466 ACE_DEBUG ((LM_DEBUG,
1467 ACE_TEXT ("%s = %@\n"),
1468 ACE_TEXT ("act"),
1469 result.act ()));
1470 ACE_DEBUG ((LM_DEBUG,
1471 ACE_TEXT ("%s = %d\n"),
1472 ACE_TEXT ("success"),
1473 result.success ()));
1474 ACE_DEBUG ((LM_DEBUG,
1475 ACE_TEXT ("%s = %@\n"),
1476 ACE_TEXT ("completion_key"),
1477 result.completion_key ()));
1478 ACE_DEBUG ((LM_DEBUG,
1479 ACE_TEXT ("%s = %d\n"),
1480 ACE_TEXT ("error"),
1481 result.error ()));
1483 #if defined (ACE_WIN32)
1484 size_t bytes_transferred = result.bytes_transferred ();
1485 char index = 0;
1486 for (ACE_Message_Block* mb_i = &mb;
1487 (mb_i != 0) && (bytes_transferred > 0);
1488 mb_i = mb_i->cont ())
1490 // write 0 at string end for proper printout (if end of mb,
1491 // it's 0 already)
1492 mb_i->rd_ptr()[0] = '\0';
1494 size_t len = mb_i->rd_ptr () - mb_i->base ();
1496 // move rd_ptr backwards as required for printout
1497 if (len >= bytes_transferred)
1499 mb_i->rd_ptr (0 - bytes_transferred);
1500 bytes_transferred = 0;
1502 else
1504 mb_i->rd_ptr (0 - len);
1505 bytes_transferred -= len;
1508 ++index;
1509 ACE_DEBUG ((LM_DEBUG,
1510 ACE_TEXT ("%s%d = %s\n"),
1511 ACE_TEXT ("message_block, part "),
1512 index,
1513 mb_i->rd_ptr ()));
1515 #else /* ACE_WIN32 */
1516 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1517 mb.rd_ptr()[0] = '\0';
1518 // move rd_ptr backwards as required for printout
1519 mb.rd_ptr (- result.bytes_transferred ());
1520 ACE_DEBUG ((LM_DEBUG,
1521 ACE_TEXT ("%s = %s\n"),
1522 ACE_TEXT ("message_block"),
1523 mb.rd_ptr ()));
1524 #endif /* ACE_WIN32 */
1526 ACE_DEBUG ((LM_DEBUG,
1527 ACE_TEXT ("**** end of message ****************\n")));
1529 else if (result.error () != 0)
1531 ACE_Log_Priority prio;
1532 #if defined (ACE_WIN32)
1533 if (result.error () == ERROR_OPERATION_ABORTED)
1534 prio = LM_DEBUG;
1535 #else
1536 if (result.error () == ECANCELED)
1537 prio = LM_DEBUG;
1538 #endif /* ACE_WIN32 */
1539 else
1540 prio = LM_ERROR;
1541 ACE_Log_Msg::instance ()->errnum (result.error ());
1542 ACE_Log_Msg::instance ()->log (prio,
1543 ACE_TEXT ("(%t) Client %d; %p\n"),
1544 this->id_,
1545 ACE_TEXT ("write"));
1547 else if (loglevel > 0)
1549 ACE_DEBUG ((LM_DEBUG,
1550 ACE_TEXT ("(%t) Client %d: wrote %B bytes ok\n"),
1551 this->id_,
1552 result.bytes_transferred ()));
1555 mb.release ();
1557 if (result.error () == 0 && result.bytes_transferred () > 0)
1559 this->total_snd_ += result.bytes_transferred ();
1560 if (this->total_snd_ >= xfer_limit)
1562 ACE_DEBUG ((LM_DEBUG,
1563 ACE_TEXT ("(%t) Client %d sent %B, limit %B\n"),
1564 this->id_, this->total_snd_, xfer_limit));
1565 this->close ();
1567 if (duplex != 0) // full duplex, continue write
1569 if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
1570 this->initiate_write_stream ();
1572 else // half-duplex read reply, after read we will start write
1573 this->initiate_read_stream ();
1576 --this->io_count_;
1577 if (this->io_count_ > 0)
1578 return;
1580 delete this;
1583 void
1584 Client::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
1587 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1589 ACE_Message_Block & mb = result.message_block ();
1591 if (loglevel > 1)
1593 LogLocker log_lock;
1595 ACE_DEBUG ((LM_DEBUG,
1596 ACE_TEXT ("(%t) **** Client %d: handle_read_stream() ****\n"),
1597 this->id_));
1598 ACE_DEBUG ((LM_DEBUG,
1599 ACE_TEXT ("%s = %B\n"),
1600 ACE_TEXT ("bytes_to_read"),
1601 result.bytes_to_read ()));
1602 ACE_DEBUG ((LM_DEBUG,
1603 ACE_TEXT ("%s = %d\n"),
1604 ACE_TEXT ("handle"),
1605 result.handle ()));
1606 ACE_DEBUG ((LM_DEBUG,
1607 ACE_TEXT ("%s = %B\n"),
1608 ACE_TEXT ("bytes_transfered"),
1609 result.bytes_transferred ()));
1610 ACE_DEBUG ((LM_DEBUG,
1611 ACE_TEXT ("%s = %@\n"),
1612 ACE_TEXT ("act"),
1613 result.act ()));
1614 ACE_DEBUG ((LM_DEBUG,
1615 ACE_TEXT ("%s = %d\n"),
1616 ACE_TEXT ("success"),
1617 result.success ()));
1618 ACE_DEBUG ((LM_DEBUG,
1619 ACE_TEXT ("%s = %@\n"),
1620 ACE_TEXT ("completion_key"),
1621 result.completion_key ()));
1622 ACE_DEBUG ((LM_DEBUG,
1623 ACE_TEXT ("%s = %d\n"),
1624 ACE_TEXT ("error"),
1625 result.error ()));
1627 #if defined (ACE_WIN32)
1628 char index = 0;
1629 for (ACE_Message_Block* mb_i = &mb;
1630 mb_i != 0;
1631 mb_i = mb_i->cont ())
1633 ++index;
1634 // write 0 at string end for proper printout
1635 mb_i->wr_ptr()[0] = '\0';
1637 ACE_DEBUG ((LM_DEBUG,
1638 ACE_TEXT ("%s%d = %s\n"),
1639 ACE_TEXT ("message_block, part "),
1640 index,
1641 mb_i->rd_ptr ()));
1643 #else /* ACE_WIN32 */
1644 // write 0 at string end for proper printout
1645 mb.rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout
1646 ACE_DEBUG ((LM_DEBUG,
1647 ACE_TEXT ("%s = %s\n"),
1648 ACE_TEXT ("message_block"),
1649 mb.rd_ptr ()));
1650 #endif /* ACE_WIN32 */
1652 ACE_DEBUG ((LM_DEBUG,
1653 ACE_TEXT ("**** end of message ****************\n")));
1655 else if (result.error () != 0)
1657 ACE_Log_Priority prio;
1658 #if defined (ACE_WIN32)
1659 if (result.error () == ERROR_OPERATION_ABORTED)
1660 prio = LM_DEBUG;
1661 #else
1662 if (result.error () == ECANCELED)
1663 prio = LM_DEBUG;
1664 #endif /* ACE_WIN32 */
1665 else
1666 prio = LM_ERROR;
1667 ACE_Log_Msg::instance ()->errnum (result.error ());
1668 ACE_Log_Msg::instance ()->log (prio,
1669 ACE_TEXT ("(%t) Client %d; %p\n"),
1670 this->id_,
1671 ACE_TEXT ("read"));
1673 else if (loglevel > 0)
1675 ACE_DEBUG ((LM_DEBUG,
1676 ACE_TEXT ("(%t) Client %d: read %B bytes ok\n"),
1677 this->id_,
1678 result.bytes_transferred ()));
1681 mb.release ();
1683 if (result.error () == 0 && result.bytes_transferred () > 0)
1685 this->total_rcv_ += result.bytes_transferred ();
1687 if (duplex != 0 || this->stop_writing_) // full duplex, continue read
1688 this->initiate_read_stream ();
1689 else // half-duplex write, after write we will start read
1690 this->initiate_write_stream ();
1693 --this->io_count_;
1694 if (this->io_count_ > 0)
1695 return;
1697 delete this;
1700 // *************************************************************
1701 // Configuration helpers
1702 // *************************************************************
1704 print_usage (int /* argc */, ACE_TCHAR *argv[])
1706 ACE_ERROR
1707 ((LM_ERROR,
1708 ACE_TEXT ("\nusage: %s")
1709 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1710 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1711 ACE_TEXT ("\n a AIOCB")
1712 ACE_TEXT ("\n i SIG")
1713 ACE_TEXT ("\n c CB")
1714 ACE_TEXT ("\n d default")
1715 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1716 ACE_TEXT ("\n-h <host> for Client mode")
1717 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1718 ACE_TEXT ("\n-p <port to listen/connect>")
1719 ACE_TEXT ("\n-c <number of client instances>")
1720 ACE_TEXT ("\n-b run client and server at the same time")
1721 ACE_TEXT ("\n f file")
1722 ACE_TEXT ("\n c console")
1723 ACE_TEXT ("\n-v log level")
1724 ACE_TEXT ("\n 0 - log errors and highlights")
1725 ACE_TEXT ("\n 1 - log level 0 plus progress information")
1726 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
1727 ACE_TEXT ("\n-x max transfer byte count per Client")
1728 ACE_TEXT ("\n-u show this message")
1729 ACE_TEXT ("\n"),
1730 argv[0]
1732 return -1;
1735 static int
1736 set_proactor_type (const ACE_TCHAR *ptype)
1738 if (!ptype)
1739 return 0;
1741 switch (ACE_OS::ace_toupper (*ptype))
1743 case 'D':
1744 proactor_type = DEFAULT;
1745 return 1;
1746 case 'A':
1747 proactor_type = AIOCB;
1748 return 1;
1749 case 'I':
1750 proactor_type = SIG;
1751 return 1;
1752 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
1753 case 'C':
1754 proactor_type = CB;
1755 return 1;
1756 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
1757 default:
1758 break;
1760 return 0;
1763 static int
1764 parse_args (int argc, ACE_TCHAR *argv[])
1766 // First, set up all the defaults then let any args change them.
1767 both = 1; // client and server simultaneosly
1768 duplex = 1; // full duplex is on
1769 host = ACE_LOCALHOST; // server to connect
1770 port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
1771 max_aio_operations = 512; // POSIX Proactor params
1772 proactor_type = DEFAULT; // Proactor type = default
1773 threads = 3; // size of Proactor thread pool
1774 clients = 10; // number of clients
1775 loglevel = 0; // log level : only errors and highlights
1776 // Default transfer limit 50 messages per Sender
1777 xfer_limit = 50 * ACE_OS::strlen (complete_message);
1779 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
1780 # if defined (ACE_LINUX)
1781 duplex = 0;
1782 #endif
1784 if (argc == 1) // no arguments , so one button test
1785 return 0;
1787 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
1788 int c;
1790 while ((c = get_opt ()) != EOF)
1792 switch (c)
1794 case 'x': // xfer limit
1795 xfer_limit = static_cast<size_t> (ACE_OS::atoi (get_opt.opt_arg ()));
1796 if (xfer_limit == 0)
1797 xfer_limit = 1; // Bare minimum.
1798 break;
1799 case 'b': // both client and server
1800 both = 1;
1801 break;
1802 case 'v': // log level
1803 loglevel = ACE_OS::atoi (get_opt.opt_arg ());
1804 break;
1805 case 'd': // duplex
1806 duplex = ACE_OS::atoi (get_opt.opt_arg ());
1807 break;
1808 case 'h': // host for sender
1809 host = get_opt.opt_arg ();
1810 break;
1811 case 'p': // port number
1812 port = ACE_OS::atoi (get_opt.opt_arg ());
1813 break;
1814 case 'n': // thread pool size
1815 threads = ACE_OS::atoi (get_opt.opt_arg ());
1816 break;
1817 case 'c': // number of clients
1818 clients = ACE_OS::atoi (get_opt.opt_arg ());
1819 if (clients > MAX_CLIENTS)
1820 clients = MAX_CLIENTS;
1821 break;
1822 case 'o': // max number of aio for proactor
1823 max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
1824 break;
1825 case 't': // Proactor Type
1826 if (set_proactor_type (get_opt.opt_arg ()))
1827 break;
1828 return print_usage (argc, argv);
1829 case 'u':
1830 default:
1831 return print_usage (argc, argv);
1832 } // switch
1833 } // while
1835 return 0;
1839 run_main (int argc, ACE_TCHAR *argv[])
1841 ACE_START_TEST (ACE_TEXT ("Proactor_Test"));
1843 if (::parse_args (argc, argv) == -1)
1844 return -1;
1846 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
1847 disable_signal (SIGPIPE, SIGPIPE);
1849 MyTask task1;
1850 TestData test;
1852 if (task1.start (threads, proactor_type, max_aio_operations) == 0)
1854 Acceptor acceptor (&test);
1855 Connector connector (&test);
1856 ACE_INET_Addr addr (port);
1858 int rc = 0;
1860 if (both != 0 || host == 0) // Acceptor
1862 // Simplify, initial read with zero size
1863 if (acceptor.open (addr, 0, 1) == 0)
1864 rc = 1;
1867 if (both != 0 || host != 0)
1869 if (host == 0)
1870 host = ACE_LOCALHOST;
1872 if (addr.set (port, host, 1, addr.get_type ()) == -1)
1873 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
1874 else
1875 rc += connector.start (addr, clients);
1878 // Wait a few seconds to let things get going, then poll til
1879 // all sessions are done. Note that when we exit this scope, the
1880 // Acceptor and Connector will be destroyed, which should prevent
1881 // further connections and also test how well destroyed handlers
1882 // are handled.
1883 ACE_OS::sleep (3);
1885 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
1886 while (!test.testing_done ())
1887 ACE_OS::sleep (1);
1889 test.stop_all ();
1891 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
1892 task1.stop ();
1894 ACE_END_TEST;
1896 return 0;
1899 #else
1902 run_main (int, ACE_TCHAR *[])
1904 ACE_START_TEST (ACE_TEXT ("Proactor_Test"));
1906 ACE_DEBUG ((LM_INFO,
1907 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
1908 ACE_TEXT ("Proactor_Test will not be run.\n")));
1910 ACE_END_TEST;
1912 return 0;
1915 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */