Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / tests / Proactor_Test_IPV6.cpp
blob0a4e4e84c18703929e59a773b4c2c975cc7165df
1 // ============================================================================
2 /**
3 * @file Proactor_Test_IPV6.cpp
5 * This program illustrates how the ACE_Proactor can be used to
6 * implement an application that does various asynchronous
7 * operations.
9 * @author Alexander Libman <alibman@baltimore.com>
10 * @author Brian Buesker <bbuesker@qualcomm.com> - modified for IPv6 operation
12 // ============================================================================
14 #include "test_config.h"
16 #if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
17 // This only works on Win32 platforms and on Unix platforms
18 // supporting POSIX aio calls.
20 #include "ace/Signal.h"
22 #include "ace/Service_Config.h"
23 #include "ace/INET_Addr.h"
24 #include "ace/SOCK_Connector.h"
25 #include "ace/SOCK_Acceptor.h"
26 #include "ace/SOCK_Stream.h"
27 #include "ace/Object_Manager.h"
28 #include "ace/Get_Opt.h"
30 #include "ace/Proactor.h"
31 #include "ace/Asynch_Acceptor.h"
32 #include "ace/Asynch_Connector.h"
33 #include "ace/Task.h"
34 #include "ace/Thread_Semaphore.h"
35 #include "ace/OS_NS_ctype.h"
36 #include "ace/OS_NS_errno.h"
37 #include "ace/OS_NS_signal.h"
38 #include "ace/OS_NS_string.h"
39 #include "ace/OS_NS_unistd.h"
40 #include "ace/OS_NS_sys_socket.h"
41 #include "ace/os_include/netinet/os_tcp.h"
43 #include "ace/Atomic_Op.h"
44 #include "ace/Synch_Traits.h"
46 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
48 # include "ace/WIN32_Proactor.h"
50 #elif defined (ACE_HAS_AIO_CALLS)
52 # include "ace/POSIX_Proactor.h"
53 # include "ace/POSIX_CB_Proactor.h"
55 #endif /* defined (ACE_HAS_WIN32_OVERLAPPED_IO) */
57 #include "Proactor_Test.h"
59 // Proactor Type (UNIX only, Win32 ignored)
60 using ProactorType = enum { DEFAULT = 0, AIOCB, SIG, CB };
61 static ProactorType proactor_type = DEFAULT;
63 // POSIX : > 0 max number aio operations proactor,
64 static size_t max_aio_operations = 0;
66 // both: 0 run client or server / depends on host
67 // != 0 run client and server
68 static int both = 0;
70 // Host that we're connecting to.
71 static const ACE_TCHAR *host = 0;
73 // number of Client instances
74 static int clients = 1;
75 const int MAX_CLIENTS = 1000;
76 const int MAX_SERVERS = 1000;
78 // duplex mode: == 0 half-duplex
79 // != 0 full duplex
80 static int duplex = 0;
82 // number threads in the Proactor thread pool
83 static int threads = 1;
85 // Port that we're receiving connections on.
86 static u_short port = ACE_DEFAULT_SERVER_PORT;
88 // Log options
89 static int loglevel; // 0 full , 1 only errors
91 static size_t xfer_limit; // Number of bytes for Client to send.
93 static char complete_message[] =
94 "GET / HTTP/1.1\r\n"
95 "Accept: */*\r\n"
96 "Accept-Language: C++\r\n"
97 "Accept-Encoding: gzip, deflate\r\n"
98 "User-Agent: Proactor_Test_IPv6/1.0 (non-compatible)\r\n"
99 "Connection: Keep-Alive\r\n"
100 "\r\n";
102 class LogLocker
104 public:
105 LogLocker () { ACE_LOG_MSG->acquire (); }
106 virtual ~LogLocker () { ACE_LOG_MSG->release (); }
109 #if defined (ACE_HAS_IPV6)
111 // Function to remove signals from the signal mask.
112 static int
113 disable_signal (int sigmin, int sigmax)
115 #if !defined (ACE_LACKS_UNIX_SIGNALS)
116 sigset_t signal_set;
117 if (ACE_OS::sigemptyset (&signal_set) == - 1)
118 ACE_ERROR ((LM_ERROR,
119 ACE_TEXT ("Error: (%P|%t):%p\n"),
120 ACE_TEXT ("sigemptyset failed")));
122 for (int i = sigmin; i <= sigmax; i++)
123 ACE_OS::sigaddset (&signal_set, i);
125 // Put the <signal_set>.
126 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
127 // In multi-threaded application this is not POSIX compliant
128 // but let's leave it just in case.
129 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
130 # else
131 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
132 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
133 ACE_ERROR_RETURN ((LM_ERROR,
134 ACE_TEXT ("Error: (%P|%t): %p\n"),
135 ACE_TEXT ("SIG_BLOCK failed")),
136 -1);
137 #else
138 ACE_UNUSED_ARG (sigmin);
139 ACE_UNUSED_ARG (sigmax);
140 #endif /* ACE_LACKS_UNIX_SIGNALS */
142 return 0;
145 // *************************************************************
146 // MyTask is ACE_Task resposible for :
147 // 1. creation and deletion of
148 // Proactor and Proactor thread pool
149 // 2. running Proactor event loop
150 // *************************************************************
153 * @class MyTask
155 * MyTask plays role for Proactor threads pool
157 * MyTask is ACE_Task resposible for:
158 * 1. Creation and deletion of Proactor and Proactor thread pool
159 * 2. Running Proactor event loop
161 class MyTask : public ACE_Task<ACE_MT_SYNCH>
163 public:
164 MyTask ():
165 lock_ (),
166 sem_ ((unsigned int) 0),
167 proactor_(0) {}
169 ~MyTask() override
171 (void) this->stop ();
172 (void) this->delete_proactor();
175 int svc () override;
177 int start (int num_threads,
178 ProactorType type_proactor,
179 size_t max_op );
180 int stop ();
182 private:
183 int create_proactor (ProactorType type_proactor,
184 size_t max_op);
185 int delete_proactor ();
187 ACE_SYNCH_RECURSIVE_MUTEX lock_;
188 ACE_Thread_Semaphore sem_;
189 ACE_Proactor * proactor_;
193 MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
195 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
196 monitor,
197 this->lock_,
198 -1);
200 ACE_TEST_ASSERT (this->proactor_ == 0);
202 #if defined (ACE_WIN32)
204 ACE_UNUSED_ARG (type_proactor);
205 ACE_UNUSED_ARG (max_op);
207 ACE_WIN32_Proactor *proactor_impl = 0;
209 ACE_NEW_RETURN (proactor_impl,
210 ACE_WIN32_Proactor,
211 -1);
213 ACE_DEBUG ((LM_DEBUG,
214 ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
216 #elif defined (ACE_HAS_AIO_CALLS)
218 ACE_POSIX_Proactor * proactor_impl = 0;
220 switch (type_proactor)
222 case AIOCB:
223 ACE_NEW_RETURN (proactor_impl,
224 ACE_POSIX_AIOCB_Proactor (max_op),
225 -1);
226 ACE_DEBUG ((LM_DEBUG,
227 ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
228 break;
230 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
231 case SIG:
232 ACE_NEW_RETURN (proactor_impl,
233 ACE_POSIX_SIG_Proactor (max_op),
234 -1);
235 ACE_DEBUG ((LM_DEBUG,
236 ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
237 break;
238 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
240 # if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
241 case CB:
242 ACE_NEW_RETURN (proactor_impl,
243 ACE_POSIX_CB_Proactor (max_op),
244 -1);
245 ACE_DEBUG ((LM_DEBUG,
246 ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
247 break;
248 # endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
250 default:
251 ACE_DEBUG ((LM_DEBUG,
252 ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
253 break;
256 #endif // (ACE_WIN32)
258 // always delete implementation 1 , not !(proactor_impl == 0)
259 ACE_NEW_RETURN (this->proactor_,
260 ACE_Proactor (proactor_impl, 1 ),
261 -1);
262 // Set new singleton and delete it in close_singleton()
263 ACE_Proactor::instance (this->proactor_, 1);
264 return 0;
268 MyTask::delete_proactor ()
270 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
271 monitor,
272 this->lock_,
273 -1);
275 ACE_DEBUG ((LM_DEBUG,
276 ACE_TEXT ("(%t) Delete Proactor\n")));
278 ACE_Proactor::close_singleton ();
279 this->proactor_ = 0;
281 return 0;
285 MyTask::start (int num_threads,
286 ProactorType type_proactor,
287 size_t max_op)
289 if (this->create_proactor (type_proactor, max_op) == -1)
290 ACE_ERROR_RETURN ((LM_ERROR,
291 ACE_TEXT ("%p.\n"),
292 ACE_TEXT ("unable to create proactor")),
293 -1);
295 if (this->activate (THR_NEW_LWP, num_threads) == -1)
296 ACE_ERROR_RETURN ((LM_ERROR,
297 ACE_TEXT ("%p.\n"),
298 ACE_TEXT ("unable to activate thread pool")),
299 -1);
301 for (; num_threads > 0; num_threads--)
303 sem_.acquire ();
306 return 0;
311 MyTask::stop ()
313 if (this->proactor_ != 0)
315 ACE_DEBUG ((LM_DEBUG,
316 ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
318 ACE_Proactor::end_event_loop ();
321 if (this->wait () == -1)
322 ACE_ERROR ((LM_ERROR,
323 ACE_TEXT ("%p.\n"),
324 ACE_TEXT ("unable to stop thread pool")));
326 return 0;
330 MyTask::svc ()
332 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n")));
334 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
335 disable_signal (SIGPIPE, SIGPIPE);
337 // signal that we are ready
338 sem_.release (1);
340 ACE_Proactor::run_event_loop ();
342 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n")));
343 return 0;
345 #endif /* ACE_HAS_IPV6 */
347 // TestData collects and reports on test-related transfer and connection
348 // statistics.
349 class TestData
351 public:
352 TestData ();
353 bool testing_done ();
354 Server *server_up ();
355 Client *client_up ();
356 void server_done (Server *s);
357 void client_done (Client *c);
358 void stop_all ();
359 void report ();
361 private:
362 struct Local_Stats
364 // Track number of sessions that report start, and those that report
365 // their end (and stats).
366 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_up_;
367 ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_down_;
369 // Total read and write bytes for all sessions.
370 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_cnt_;
371 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_cnt_;
372 // Total read and write operations issues for all sessions.
373 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_ops_;
374 ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_ops_;
375 } servers_, clients_;
377 ACE_SYNCH_MUTEX list_lock_;
378 Server *server_list_[MAX_SERVERS];
379 Client *client_list_[MAX_CLIENTS];
382 TestData::TestData ()
384 int i;
385 for (i = 0; i < MAX_SERVERS; ++i)
386 this->server_list_[i] = 0;
387 for (i = 0; i < MAX_CLIENTS; ++i)
388 this->client_list_[i] = 0;
391 bool
392 TestData::testing_done ()
394 int svr_up = this->servers_.sessions_up_.value ();
395 int svr_dn = this->servers_.sessions_down_.value ();
396 int clt_up = this->clients_.sessions_up_.value ();
397 int clt_dn = this->clients_.sessions_down_.value ();
399 if (svr_up == 0 && clt_up == 0) // No connections up yet
400 return false;
402 return (svr_dn >= svr_up && clt_dn >= clt_up);
405 Server *
406 TestData::server_up ()
408 ++this->servers_.sessions_up_;
409 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
411 for (int i = 0; i < MAX_SERVERS; ++i)
413 if (this->server_list_[i] == 0)
415 ACE_NEW_RETURN (this->server_list_[i], Server (this, i), 0);
416 ACE_DEBUG ((LM_DEBUG,
417 ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
419 this->servers_.sessions_up_.value (),
420 this->servers_.sessions_down_.value ()));
421 return this->server_list_[i];
424 return 0;
427 Client *
428 TestData::client_up ()
430 ++this->clients_.sessions_up_;
431 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
433 for (int i = 0; i < MAX_CLIENTS; ++i)
435 if (this->client_list_[i] == 0)
437 ACE_NEW_RETURN (this->client_list_[i], Client (this, i), 0);
438 ACE_DEBUG ((LM_DEBUG,
439 ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
441 this->clients_.sessions_up_.value (),
442 this->clients_.sessions_down_.value ()));
443 return this->client_list_[i];
446 return 0;
449 void
450 TestData::server_done (Server *s)
452 this->servers_.w_cnt_ += s->get_total_snd ();
453 this->servers_.r_cnt_ += s->get_total_rcv ();
454 this->servers_.w_ops_ += s->get_total_w ();
455 this->servers_.r_ops_ += s->get_total_r ();
456 ++this->servers_.sessions_down_;
457 ACE_DEBUG ((LM_DEBUG,
458 ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
459 s->id (),
460 this->servers_.sessions_up_.value (),
461 this->servers_.sessions_down_.value ()));
463 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
464 int i;
465 for (i = 0; i < MAX_SERVERS; ++i)
467 if (this->server_list_[i] == s)
469 if (s->id () != i)
470 ACE_ERROR ((LM_ERROR,
471 ACE_TEXT ("Server %d is pos %d in list\n"),
472 s->id (),
473 i));
474 this->server_list_[i] = 0;
475 break;
478 if (i >= MAX_SERVERS)
479 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Server %@ done but not listed\n"), s));
481 return;
484 void
485 TestData::client_done (Client *c)
487 this->clients_.w_cnt_ += c->get_total_snd ();
488 this->clients_.r_cnt_ += c->get_total_rcv ();
489 this->clients_.w_ops_ += c->get_total_w ();
490 this->clients_.r_ops_ += c->get_total_r ();
491 ++this->clients_.sessions_down_;
492 ACE_DEBUG ((LM_DEBUG,
493 ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
494 c->id (),
495 this->clients_.sessions_up_.value (),
496 this->clients_.sessions_down_.value ()));
498 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
499 int i;
500 for (i = 0; i < MAX_CLIENTS; ++i)
502 if (this->client_list_[i] == c)
504 if (c->id () != i)
505 ACE_ERROR ((LM_ERROR,
506 ACE_TEXT ("Client %d is pos %d in list\n"),
507 c->id (),
508 i));
509 this->client_list_[i] = 0;
510 break;
513 if (i >= MAX_CLIENTS)
514 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Client %@ done but not listed\n"), c));
516 return;
519 void
520 TestData::stop_all ()
522 int i;
524 // Lock and cancel everything. Then release the lock, possibly allowing
525 // cleanups, then grab it again and delete all Servers and Clients.
527 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
528 for (i = 0; i < MAX_CLIENTS; ++i)
530 if (this->client_list_[i] != 0)
531 this->client_list_[i]->cancel ();
534 for (i = 0; i < MAX_SERVERS; ++i)
536 if (this->server_list_[i] != 0)
537 this->server_list_[i]->cancel ();
541 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
542 for (i = 0; i < MAX_CLIENTS; ++i)
544 if (this->client_list_[i] != 0)
545 delete this->client_list_[i];
548 for (i = 0; i < MAX_SERVERS; ++i)
550 if (this->server_list_[i] != 0)
551 delete this->server_list_[i];
556 void
557 TestData::report ()
559 // Print statistics
560 ACE_TCHAR bufs [256];
561 ACE_TCHAR bufr [256];
563 ACE_OS::snprintf (bufs, 256,
564 ACE_SIZE_T_FORMAT_SPECIFIER
565 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
566 this->clients_.w_cnt_.value (),
567 this->clients_.w_ops_.value ());
569 ACE_OS::snprintf (bufr, 256,
570 ACE_SIZE_T_FORMAT_SPECIFIER
571 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
572 this->clients_.r_cnt_.value (),
573 this->clients_.r_ops_.value ());
575 ACE_DEBUG ((LM_DEBUG,
576 ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
577 bufs,
578 bufr));
580 ACE_OS::snprintf (bufs, 256,
581 ACE_SIZE_T_FORMAT_SPECIFIER
582 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
583 this->servers_.w_cnt_.value (),
584 this->servers_.w_ops_.value ());
586 ACE_OS::snprintf (bufr, 256,
587 ACE_SIZE_T_FORMAT_SPECIFIER
588 ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
589 this->servers_.r_cnt_.value (),
590 this->servers_.r_ops_.value ());
592 ACE_DEBUG ((LM_DEBUG,
593 ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
594 bufs,
595 bufr));
597 if (this->clients_.w_cnt_.value () == 0 ||
598 this->clients_.r_cnt_.value () == 0 ||
599 this->servers_.w_cnt_.value () == 0 ||
600 this->servers_.r_cnt_.value () == 0 )
601 ACE_ERROR ((LM_ERROR, ACE_TEXT ("It appears that this test didn't ")
602 ACE_TEXT ("really do anything. Something is very wrong.\n")));
605 class Acceptor : public ACE_Asynch_Acceptor<Server>
607 public:
608 Acceptor (TestData *tester);
609 ~Acceptor () override;
611 // Virtual from ACE_Asynch_Acceptor
612 Server *make_handler () override;
614 private:
615 TestData *tester_;
618 // *************************************************************
619 Acceptor::Acceptor (TestData *tester)
620 : tester_ (tester)
624 Acceptor::~Acceptor ()
626 this->cancel ();
629 Server *
630 Acceptor::make_handler ()
632 return this->tester_->server_up ();
635 // ***************************************************
636 Server::Server ()
638 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
641 Server::Server (TestData *tester, int id)
642 : tester_ (tester),
643 id_ (id),
644 handle_ (ACE_INVALID_HANDLE),
645 io_count_ (0),
646 flg_cancel_(0),
647 total_snd_(0),
648 total_rcv_(0),
649 total_w_ (0),
650 total_r_ (0)
654 Server::~Server ()
656 ACE_DEBUG ((LM_DEBUG,
657 ACE_TEXT ("(%t) Server %d dtor; %d sends (%d bytes); ")
658 ACE_TEXT ("%d recvs (%d bytes)\n"),
659 this->id_,
660 this->total_w_, this->total_snd_,
661 this->total_r_, this->total_rcv_));
662 if (this->io_count_ != 0)
663 ACE_ERROR ((LM_WARNING,
664 ACE_TEXT ("(%t) Server %d deleted with ")
665 ACE_TEXT ("%d I/O outstanding\n"),
666 this->id_,
667 this->io_count_));
669 // This test bounces data back and forth between Clients and Servers.
670 // Therefore, if there was significantly more data in one direction, that's
671 // a problem. Remember, the byte counts are unsigned values.
672 int issue_data_warning = 0;
673 if (this->total_snd_ > this->total_rcv_)
675 if (this->total_rcv_ == 0)
676 issue_data_warning = 1;
677 else if (this->total_snd_ / this->total_rcv_ > 2)
678 issue_data_warning = 1;
680 else
682 if (this->total_snd_ == 0)
683 issue_data_warning = 1;
684 else if (this->total_rcv_ / this->total_snd_ > 2)
685 issue_data_warning = 1;
687 if (issue_data_warning)
688 ACE_DEBUG ((LM_WARNING,
689 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
691 if (this->tester_ != 0)
692 this->tester_->server_done (this);
694 if (this->handle_ != ACE_INVALID_HANDLE)
695 ACE_OS::closesocket (this->handle_);
697 this->id_ = -1;
698 this->handle_= ACE_INVALID_HANDLE;
701 void
702 Server::cancel ()
704 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
706 this->flg_cancel_ = 1;
707 this->ws_.cancel ();
708 this->rs_.cancel ();
709 return;
713 void
714 Server::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&)
716 ACE_TCHAR str[256];
717 if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
718 ACE_DEBUG ((LM_DEBUG,
719 ACE_TEXT ("(%t) Server %d connection from %s\n"),
720 this->id_,
721 str));
722 else
723 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Server %d %p\n"),
724 this->id_,
725 ACE_TEXT ("addr_to_string")));
726 return;
730 void
731 Server::open (ACE_HANDLE handle, ACE_Message_Block &)
734 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
736 // Don't buffer serial sends.
737 this->handle_ = handle;
738 int nodelay = 1;
739 ACE_SOCK_Stream option_setter (handle);
740 if (-1 == option_setter.set_option (ACE_IPPROTO_TCP,
741 TCP_NODELAY,
742 &nodelay,
743 sizeof (nodelay)))
744 ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
746 if (this->ws_.open (*this, this->handle_) == -1)
747 ACE_ERROR ((LM_ERROR,
748 ACE_TEXT ("(%t) %p\n"),
749 ACE_TEXT ("Server::ACE_Asynch_Write_Stream::open")));
750 else if (this->rs_.open (*this, this->handle_) == -1)
751 ACE_ERROR ((LM_ERROR,
752 ACE_TEXT ("(%t) %p\n"),
753 ACE_TEXT ("Server::ACE_Asynch_Read_Stream::open")));
754 else
755 this->initiate_read_stream ();
757 if (this->io_count_ > 0)
758 return;
760 delete this;
764 Server::initiate_read_stream ()
766 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
767 return -1;
769 ACE_Message_Block *mb = 0;
770 ACE_NEW_RETURN (mb,
771 ACE_Message_Block (1024), //BUFSIZ + 1),
772 -1);
774 // Inititiate read
775 if (this->rs_.read (*mb, mb->size () - 1) == -1)
777 mb->release ();
778 #if defined (ACE_WIN32)
779 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
780 // a 0-byte read as we would if underlying calls used WSARecv.
781 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
782 ACE_ERROR_RETURN ((LM_DEBUG,
783 ACE_TEXT ("(%t) Server %d, peer closed\n"),
784 this->id_),
785 -1);
786 #endif /* ACE_WIN32 */
787 ACE_ERROR_RETURN ((LM_ERROR,
788 ACE_TEXT ("(%t) Server %d, %p\n"),
789 this->id_,
790 ACE_TEXT ("read")),
791 -1);
794 this->io_count_++;
795 this->total_r_++;
796 return 0;
800 Server::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
802 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
804 mb.release ();
805 return -1;
808 if (nbytes == 0)
810 mb.release ();
811 ACE_ERROR_RETURN((LM_ERROR,
812 ACE_TEXT ("(%t) Server::ACE_Asynch_Write_Stream::write nbytes <0 ")),
813 -1);
816 if (this->ws_.write (mb, nbytes) == -1)
818 mb.release ();
819 #if defined (ACE_WIN32)
820 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
821 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
822 ACE_ERROR_RETURN ((LM_DEBUG,
823 ACE_TEXT ("(%t) Server %d, peer gone\n"),
824 this->id_),
825 -1);
826 #endif /* ACE_WIN32 */
827 ACE_ERROR_RETURN((LM_ERROR,
828 ACE_TEXT ("(%t) Server %d, %p\n"),
829 this->id_,
830 ACE_TEXT ("write")),
831 -1);
834 this->io_count_++;
835 this->total_w_++;
836 return 0;
839 void
840 Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
843 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
845 ACE_Message_Block & mb = result.message_block ();
847 // Reset pointers.
848 mb.rd_ptr ()[result.bytes_transferred ()] = '\0';
850 if (loglevel > 1)
852 LogLocker log_lock;
854 ACE_DEBUG ((LM_DEBUG,
855 ACE_TEXT ("(%t) **** Server %d: handle_read_stream() ****\n"),
856 this->id_));
857 ACE_DEBUG ((LM_DEBUG,
858 ACE_TEXT ("%s = %d\n"),
859 ACE_TEXT ("bytes_to_read"),
860 result.bytes_to_read ()));
861 ACE_DEBUG ((LM_DEBUG,
862 ACE_TEXT ("%s = %d\n"),
863 ACE_TEXT ("handle"),
864 result.handle ()));
865 ACE_DEBUG ((LM_DEBUG,
866 ACE_TEXT ("%s = %d\n"),
867 ACE_TEXT ("bytes_transfered"),
868 result.bytes_transferred ()));
869 ACE_DEBUG ((LM_DEBUG,
870 ACE_TEXT ("%s = %@\n"),
871 ACE_TEXT ("act"),
872 result.act ()));
873 ACE_DEBUG ((LM_DEBUG,
874 ACE_TEXT ("%s = %d\n"),
875 ACE_TEXT ("success"),
876 result.success ()));
877 ACE_DEBUG ((LM_DEBUG,
878 ACE_TEXT ("%s = %@\n"),
879 ACE_TEXT ("completion_key"),
880 result.completion_key ()));
881 ACE_DEBUG ((LM_DEBUG,
882 ACE_TEXT ("%s = %d\n"),
883 ACE_TEXT ("error"),
884 result.error ()));
885 ACE_DEBUG ((LM_DEBUG,
886 ACE_TEXT ("%s = %s\n"),
887 ACE_TEXT ("message_block"),
888 mb.rd_ptr ()));
889 ACE_DEBUG ((LM_DEBUG,
890 ACE_TEXT ("**** end of message ****************\n")));
892 else if (result.error () != 0)
894 ACE_Log_Priority prio;
895 #if defined (ACE_WIN32)
896 if (result.error () == ERROR_OPERATION_ABORTED)
897 prio = LM_DEBUG;
898 #else
899 if (result.error () == ECANCELED)
900 prio = LM_DEBUG;
901 #endif /* ACE_WIN32 */
902 else
903 prio = LM_ERROR;
904 ACE_Log_Msg::instance ()->errnum (result.error ());
905 ACE_Log_Msg::instance ()->log (prio,
906 ACE_TEXT ("(%t) Server %d; %p\n"),
907 this->id_,
908 ACE_TEXT ("read"));
910 else if (loglevel > 0)
912 ACE_DEBUG ((LM_DEBUG,
913 ACE_TEXT ("(%t) Server %d: read %d bytes\n"),
914 this->id_,
915 result.bytes_transferred ()));
918 if (result.error () == 0 && result.bytes_transferred () > 0)
920 this->total_rcv_ += result.bytes_transferred ();
922 if (this->initiate_write_stream (mb,
923 result.bytes_transferred ()) == 0)
925 if (duplex != 0) // Initiate new read from the stream.
926 this->initiate_read_stream ();
929 else
930 mb.release ();
932 --this->io_count_;
933 if (this->io_count_ > 0)
934 return;
936 delete this;
939 void
940 Server::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
943 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
945 ACE_Message_Block & mb = result.message_block ();
947 if (loglevel > 1)
949 LogLocker log_lock;
951 //mb.rd_ptr () [0] = '\0';
952 mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
954 ACE_DEBUG ((LM_DEBUG,
955 ACE_TEXT ("(%t) **** Server %d: handle_write_stream() ****\n"),
956 this->id_));
957 ACE_DEBUG ((LM_DEBUG,
958 ACE_TEXT ("%s = %d\n"),
959 ACE_TEXT ("bytes_to_write"),
960 result.bytes_to_write ()));
961 ACE_DEBUG ((LM_DEBUG,
962 ACE_TEXT ("%s = %d\n"),
963 ACE_TEXT ("handle"),
964 result.handle ()));
965 ACE_DEBUG ((LM_DEBUG,
966 ACE_TEXT ("%s = %d\n"),
967 ACE_TEXT ("bytes_transfered"),
968 result.bytes_transferred ()));
969 ACE_DEBUG ((LM_DEBUG,
970 ACE_TEXT ("%s = %@\n"),
971 ACE_TEXT ("act"),
972 result.act ()));
973 ACE_DEBUG ((LM_DEBUG,
974 ACE_TEXT ("%s = %d\n"),
975 ACE_TEXT ("success"),
976 result.success ()));
977 ACE_DEBUG ((LM_DEBUG,
978 ACE_TEXT ("%s = %@\n"),
979 ACE_TEXT ("completion_key"),
980 result.completion_key ()));
981 ACE_DEBUG ((LM_DEBUG,
982 ACE_TEXT ("%s = %d\n"),
983 ACE_TEXT ("error"),
984 result.error ()));
985 ACE_DEBUG ((LM_DEBUG,
986 ACE_TEXT ("%s = %s\n"),
987 ACE_TEXT ("message_block"),
988 mb.rd_ptr ()));
989 ACE_DEBUG ((LM_DEBUG,
990 ACE_TEXT ("**** end of message ****************\n")));
992 else if (result.error () != 0)
994 ACE_Log_Priority prio;
995 #if defined (ACE_WIN32)
996 if (result.error () == ERROR_OPERATION_ABORTED)
997 prio = LM_DEBUG;
998 #else
999 if (result.error () == ECANCELED)
1000 prio = LM_DEBUG;
1001 #endif /* ACE_WIN32 */
1002 else
1003 prio = LM_ERROR;
1004 ACE_Log_Msg::instance ()->errnum (result.error ());
1005 ACE_Log_Msg::instance ()->log (prio,
1006 ACE_TEXT ("(%t) Server %d; %p\n"),
1007 this->id_,
1008 ACE_TEXT ("write"));
1010 else if (loglevel > 0)
1012 ACE_DEBUG ((LM_DEBUG,
1013 ACE_TEXT ("(%t) Server %d: wrote %d bytes ok\n"),
1014 this->id_,
1015 result.bytes_transferred ()));
1018 mb.release ();
1020 if (result.error () == 0 && result.bytes_transferred () > 0)
1022 this->total_snd_ += result.bytes_transferred ();
1024 if (duplex == 0)
1025 this->initiate_read_stream ();
1028 --this->io_count_;
1029 if (this->io_count_ > 0)
1030 return;
1032 delete this;
1035 // *******************************************
1036 // Connector
1037 // *******************************************
1039 class Connector : public ACE_Asynch_Connector<Client>
1041 public:
1042 Connector (TestData *tester);
1043 ~Connector () override;
1045 int start (const ACE_INET_Addr &addr, int num);
1047 // Virtual from ACE_Asynch_Connector
1048 Client *make_handler () override;
1050 private:
1051 TestData *tester_;
1054 // *************************************************************
1056 Connector::Connector (TestData *tester)
1057 : tester_ (tester)
1061 Connector::~Connector ()
1063 this->cancel ();
1066 Client *
1067 Connector::make_handler ()
1069 return this->tester_->client_up ();
1074 Connector::start (const ACE_INET_Addr& addr, int num)
1076 if (num > MAX_CLIENTS)
1077 num = MAX_CLIENTS;
1079 if (num < 0)
1080 num = 1;
1082 int rc = 0;
1084 // int open ( int pass_addresses = 0,
1085 // ACE_Proactor *proactor = 0,
1086 // int validate_new_connection = 0 );
1088 if (this->open (1, 0, 1) != 0)
1090 ACE_ERROR ((LM_ERROR,
1091 ACE_TEXT ("(%t) %p\n"),
1092 ACE_TEXT ("Connector::open failed")));
1093 return rc;
1096 for (; rc < num; rc++)
1098 ACE_INET_Addr localAddr;
1099 if (this->connect (addr, localAddr) != 0)
1101 ACE_ERROR ((LM_ERROR,
1102 ACE_TEXT ("(%t) %p\n"),
1103 ACE_TEXT ("Connector::connect failed for IPv6")));
1104 break;
1107 return rc;
1111 Client::Client ()
1113 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
1116 Client::Client (TestData *tester, int id)
1117 : tester_ (tester),
1118 id_ (id),
1119 handle_ (ACE_INVALID_HANDLE),
1120 io_count_ (0),
1121 stop_writing_ (0),
1122 flg_cancel_ (0),
1123 total_snd_ (0),
1124 total_rcv_ (0),
1125 total_w_ (0),
1126 total_r_ (0)
1130 Client::~Client ()
1132 ACE_DEBUG ((LM_DEBUG,
1133 ACE_TEXT ("(%t) Client %d dtor; %d sends (%d bytes); ")
1134 ACE_TEXT ("%d recvs (%d bytes)\n"),
1135 this->id_,
1136 this->total_w_, this->total_snd_,
1137 this->total_r_, this->total_rcv_));
1138 if (this->io_count_ != 0)
1139 ACE_ERROR ((LM_WARNING,
1140 ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
1141 this->id_,
1142 this->io_count_));
1144 // This test bounces data back and forth between Clients and Servers.
1145 // Therefore, if there was significantly more data in one direction, that's
1146 // a problem. Remember, the byte counts are unsigned values.
1147 int issue_data_warning = 0;
1148 if (this->total_snd_ > this->total_rcv_)
1150 if (this->total_rcv_ == 0)
1151 issue_data_warning = 1;
1152 else if (this->total_snd_ / this->total_rcv_ > 2)
1153 issue_data_warning = 1;
1155 else
1157 if (this->total_snd_ == 0)
1158 issue_data_warning = 1;
1159 else if (this->total_rcv_ / this->total_snd_ > 2)
1160 issue_data_warning = 1;
1162 if (issue_data_warning)
1163 ACE_DEBUG ((LM_WARNING,
1164 ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
1166 if (this->tester_ != 0)
1167 this->tester_->client_done (this);
1169 this->id_ = -1;
1170 this->handle_= ACE_INVALID_HANDLE;
1171 if (this->handle_ != ACE_INVALID_HANDLE)
1173 ACE_OS::closesocket (this->handle_);
1175 this->handle_= ACE_INVALID_HANDLE;
1178 void
1179 Client::cancel ()
1181 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1183 this->flg_cancel_ = 1;
1184 this->ws_.cancel ();
1185 this->rs_.cancel ();
1186 return;
1189 void
1190 Client::close ()
1192 // This must be called with the lock_ held.
1193 ACE_DEBUG ((LM_DEBUG,
1194 ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
1195 this->id_, this->io_count_));
1196 ACE_OS::shutdown (this->handle_, ACE_SHUTDOWN_WRITE);
1197 this->stop_writing_ = 1;
1198 return;
1202 void
1203 Client::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr& local)
1205 char my_name[256];
1206 char peer_name[256];
1207 ACE_TCHAR local_str[256];
1208 ACE_INET_Addr addr ((u_short) 0, host);
1210 // This checks to make sure the peer address given to us matches what
1211 // we expect it to be.
1212 if (0 != peer.get_host_addr (peer_name, sizeof (peer_name)))
1214 if (0 != addr.get_host_addr (my_name, sizeof (my_name)))
1216 if (0 != ACE_OS::strncmp (peer_name, my_name, sizeof (my_name)))
1218 ACE_ERROR
1219 ((LM_ERROR,
1220 ACE_TEXT ("(%t) Sender %d peer address (%C) does not ")
1221 ACE_TEXT ("match host address (%C)\n"),
1222 this->id_,
1223 peer_name, my_name));
1224 return;
1227 else
1229 ACE_ERROR
1230 ((LM_ERROR,
1231 ACE_TEXT ("(%t) Sender %d unable to convert host addr\n"),
1232 this->id_));
1233 return;
1236 else
1238 ACE_ERROR ((LM_ERROR,
1239 ACE_TEXT ("(%t) Sender %d unable to convert peer addr\n"),
1240 this->id_));
1241 return;
1244 if (0 == local.addr_to_string (local_str,
1245 sizeof (local_str) / sizeof (ACE_TCHAR)))
1246 ACE_DEBUG ((LM_DEBUG,
1247 ACE_TEXT ("(%t) Client %d connected on %s\n"),
1248 this->id_,
1249 local_str));
1250 else
1251 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Client %d %p\n"),
1252 this->id_,
1253 ACE_TEXT ("addr_to_string")));
1254 return;
1258 void
1259 Client::open (ACE_HANDLE handle, ACE_Message_Block &)
1262 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1264 // Don't buffer serial sends.
1265 this->handle_ = handle;
1266 int nodelay = 1;
1267 ACE_SOCK_Stream option_setter (handle);
1268 if (option_setter.set_option (ACE_IPPROTO_TCP,
1269 TCP_NODELAY,
1270 &nodelay,
1271 sizeof (nodelay)))
1272 ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
1274 // Open ACE_Asynch_Write_Stream
1275 if (this->ws_.open (*this, this->handle_) == -1)
1276 ACE_ERROR ((LM_ERROR,
1277 ACE_TEXT ("(%t) %p\n"),
1278 ACE_TEXT ("Client::ACE_Asynch_Write_Stream::open")));
1280 // Open ACE_Asynch_Read_Stream
1281 else if (this->rs_.open (*this, this->handle_) == -1)
1282 ACE_ERROR ((LM_ERROR,
1283 ACE_TEXT ("(%t) %p\n"),
1284 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::open")));
1286 else if (this->initiate_write_stream () == 0)
1288 if (duplex != 0) // Start an asynchronous read
1289 this->initiate_read_stream ();
1292 if (this->io_count_ > 0)
1293 return;
1295 delete this;
1299 Client::initiate_write_stream ()
1301 if (this->flg_cancel_ != 0 ||
1302 this->stop_writing_ ||
1303 this->handle_ == ACE_INVALID_HANDLE)
1304 return -1;
1306 static const size_t complete_message_length = ACE_OS::strlen (complete_message);
1308 #if defined (ACE_WIN32)
1309 ACE_Message_Block *mb1 = 0,
1310 *mb2 = 0,
1311 *mb3 = 0;
1313 // No need to allocate +1 for proper printing - the memory includes it already
1314 ACE_NEW_RETURN (mb1,
1315 ACE_Message_Block ((char *)complete_message,
1316 complete_message_length),
1317 -1);
1319 ACE_NEW_RETURN (mb2,
1320 ACE_Message_Block ((char *)complete_message,
1321 complete_message_length),
1322 -1);
1324 ACE_NEW_RETURN (mb3,
1325 ACE_Message_Block ((char *)complete_message,
1326 complete_message_length),
1327 -1);
1329 mb1->wr_ptr (complete_message_length);
1330 mb2->wr_ptr (complete_message_length);
1331 mb3->wr_ptr (complete_message_length);
1333 // chain them together
1334 mb1->cont (mb2);
1335 mb2->cont (mb3);
1337 if (this->ws_.writev (*mb1, mb1->total_length ()) == -1)
1339 mb1->release ();
1340 ACE_ERROR_RETURN((LM_ERROR,
1341 ACE_TEXT ("(%t) %p\n"),
1342 ACE_TEXT ("Client::ACE_Asynch_Stream::writev")),
1343 -1);
1345 #else /* defined (ACE_WIN32) */
1347 ACE_Message_Block *mb = 0;
1349 // No need to allocate +1 for proper printing - the memory includes it already
1350 ACE_NEW_RETURN (mb,
1351 ACE_Message_Block (complete_message, complete_message_length),
1352 -1);
1353 mb->wr_ptr (complete_message_length);
1355 if (this->ws_.write (*mb, mb->length ()) == -1)
1357 mb->release ();
1358 #if defined (ACE_WIN32)
1359 // On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
1360 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
1361 ACE_ERROR_RETURN ((LM_DEBUG,
1362 ACE_TEXT ("(%t) Client %d, peer gone\n"),
1363 this->id_),
1364 -1);
1365 #endif /* ACE_WIN32 */
1366 ACE_ERROR_RETURN((LM_ERROR,
1367 ACE_TEXT ("(%t) Client %d, %p\n"),
1368 this->id_,
1369 ACE_TEXT ("write")),
1370 -1);
1372 #endif /* defined (ACE_WIN32) */
1374 this->io_count_++;
1375 this->total_w_++;
1376 return 0;
1380 Client::initiate_read_stream ()
1382 if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
1383 return -1;
1385 static const size_t complete_message_length =
1386 ACE_OS::strlen (complete_message);
1388 #if defined (ACE_WIN32)
1389 ACE_Message_Block *mb1 = 0,
1390 *mb2 = 0,
1391 *mb3 = 0,
1392 *mb4 = 0,
1393 *mb5 = 0,
1394 *mb6 = 0;
1396 // We allocate +1 only for proper printing - we can just set the last byte
1397 // to '\0' before printing out
1398 ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1);
1399 ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1);
1400 ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1);
1402 // Let allocate memory for one more triplet,
1403 // This improves performance
1404 // as we can receive more the than one block at once
1405 // Generally, we can receive more triplets ....
1406 ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1);
1407 ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1);
1408 ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1);
1410 mb1->cont (mb2);
1411 mb2->cont (mb3);
1413 mb3->cont (mb4);
1414 mb4->cont (mb5);
1415 mb5->cont (mb6);
1418 // hide last byte in each message block, reserving it for later to set '\0'
1419 // for proper printouts
1420 mb1->size (mb1->size () - 1);
1421 mb2->size (mb2->size () - 1);
1422 mb3->size (mb3->size () - 1);
1424 mb4->size (mb4->size () - 1);
1425 mb5->size (mb5->size () - 1);
1426 mb6->size (mb6->size () - 1);
1428 // Inititiate read
1429 if (this->rs_.readv (*mb1, mb1->total_size () - 1) == -1)
1431 mb1->release ();
1432 ACE_ERROR_RETURN ((LM_ERROR,
1433 ACE_TEXT ("(%t) %p\n"),
1434 ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
1435 -1);
1437 #else /* defined (ACE_WIN32) */
1439 // Try to read more chunks
1440 size_t blksize = ( complete_message_length > BUFSIZ ) ?
1441 complete_message_length : BUFSIZ;
1443 ACE_Message_Block *mb = 0;
1445 // We allocate +1 only for proper printing - we can just set the last byte
1446 // to '\0' before printing out
1447 ACE_NEW_RETURN (mb,
1448 ACE_Message_Block (blksize + 1),
1449 -1);
1451 // Inititiate read
1452 if (this->rs_.read (*mb, mb->size () - 1) == -1)
1454 mb->release ();
1455 #if defined (ACE_WIN32)
1456 // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
1457 // a 0-byte read as we would if underlying calls used WSARecv.
1458 if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
1459 ACE_ERROR_RETURN ((LM_DEBUG,
1460 ACE_TEXT ("(%t) Client %d, peer closed\n"),
1461 this->id_),
1462 -1);
1463 #endif /* ACE_WIN32 */
1464 ACE_ERROR_RETURN ((LM_ERROR,
1465 ACE_TEXT ("(%t) Client %d, %p\n"),
1466 this->id_,
1467 ACE_TEXT ("read")),
1468 -1);
1470 #endif /* defined (ACE_WIN32) */
1472 this->io_count_++;
1473 this->total_r_++;
1474 return 0;
1477 void
1478 Client::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
1481 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1483 ACE_Message_Block & mb = result.message_block ();
1485 if (loglevel > 1)
1487 LogLocker log_lock;
1489 ACE_DEBUG ((LM_DEBUG,
1490 ACE_TEXT ("(%t) **** Client %d: handle_write_stream() ****\n"),
1491 this->id_));
1492 ACE_DEBUG ((LM_DEBUG,
1493 ACE_TEXT ("%s = %d\n"),
1494 ACE_TEXT ("bytes_to_write"),
1495 result.bytes_to_write ()));
1496 ACE_DEBUG ((LM_DEBUG,
1497 ACE_TEXT ("%s = %d\n"),
1498 ACE_TEXT ("handle"),
1499 result.handle ()));
1500 ACE_DEBUG ((LM_DEBUG,
1501 ACE_TEXT ("%s = %d\n"),
1502 ACE_TEXT ("bytes_transfered"),
1503 result.bytes_transferred ()));
1504 ACE_DEBUG ((LM_DEBUG,
1505 ACE_TEXT ("%s = %@\n"),
1506 ACE_TEXT ("act"),
1507 result.act ()));
1508 ACE_DEBUG ((LM_DEBUG,
1509 ACE_TEXT ("%s = %d\n"),
1510 ACE_TEXT ("success"),
1511 result.success ()));
1512 ACE_DEBUG ((LM_DEBUG,
1513 ACE_TEXT ("%s = %@\n"),
1514 ACE_TEXT ("completion_key"),
1515 result.completion_key ()));
1516 ACE_DEBUG ((LM_DEBUG,
1517 ACE_TEXT ("%s = %d\n"),
1518 ACE_TEXT ("error"),
1519 result.error ()));
1521 #if defined (ACE_WIN32)
1522 size_t bytes_transferred = result.bytes_transferred ();
1523 char index = 0;
1524 for (ACE_Message_Block* mb_i = &mb;
1525 (mb_i != 0) && (bytes_transferred > 0);
1526 mb_i = mb_i->cont ())
1528 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1529 mb_i->rd_ptr()[0] = '\0';
1531 size_t len = mb_i->rd_ptr () - mb_i->base ();
1533 // move rd_ptr backwards as required for printout
1534 if (len >= bytes_transferred)
1536 mb_i->rd_ptr (0 - bytes_transferred);
1537 bytes_transferred = 0;
1539 else
1541 mb_i->rd_ptr (0 - len);
1542 bytes_transferred -= len;
1545 ++index;
1546 ACE_DEBUG ((LM_DEBUG,
1547 ACE_TEXT ("%s%d = %s\n"),
1548 ACE_TEXT ("message_block, part "),
1549 index,
1550 mb_i->rd_ptr ()));
1552 #else /* defined (ACE_WIN32) */
1553 // write 0 at string end for proper printout (if end of mb, it's 0 already)
1554 mb.rd_ptr()[0] = '\0';
1555 // move rd_ptr backwards as required for printout
1556 mb.rd_ptr (- result.bytes_transferred ());
1557 ACE_DEBUG ((LM_DEBUG,
1558 ACE_TEXT ("%s = %s\n"),
1559 ACE_TEXT ("message_block"),
1560 mb.rd_ptr ()));
1561 #endif /* defined (ACE_WIN32) */
1563 ACE_DEBUG ((LM_DEBUG,
1564 ACE_TEXT ("**** end of message ****************\n")));
1566 else if (result.error () != 0)
1568 ACE_Log_Priority prio;
1569 #if defined (ACE_WIN32)
1570 if (result.error () == ERROR_OPERATION_ABORTED)
1571 prio = LM_DEBUG;
1572 #else
1573 if (result.error () == ECANCELED)
1574 prio = LM_DEBUG;
1575 #endif /* ACE_WIN32 */
1576 else
1577 prio = LM_ERROR;
1578 ACE_Log_Msg::instance ()->errnum (result.error ());
1579 ACE_Log_Msg::instance ()->log (prio,
1580 ACE_TEXT ("(%t) Client %d; %p\n"),
1581 this->id_,
1582 ACE_TEXT ("write"));
1584 else if (loglevel > 0)
1586 ACE_DEBUG ((LM_DEBUG,
1587 ACE_TEXT ("(%t) Client %d: wrote %d bytes ok\n"),
1588 this->id_,
1589 result.bytes_transferred ()));
1592 mb.release ();
1594 if (result.error () == 0 && result.bytes_transferred () > 0)
1596 this->total_snd_ += result.bytes_transferred ();
1597 if (this->total_snd_ >= xfer_limit)
1599 ACE_DEBUG ((LM_DEBUG,
1600 ACE_TEXT ("(%t) Client %d sent %d, limit %d\n"),
1601 this->id_, this->total_snd_, xfer_limit));
1602 this->close ();
1604 if (duplex != 0) // full duplex, continue write
1606 if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
1607 this->initiate_write_stream ();
1609 else // half-duplex read reply, after read we will start write
1610 this->initiate_read_stream ();
1613 --this->io_count_;
1614 if (this->io_count_ > 0)
1615 return;
1617 delete this;
1620 void
1621 Client::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
1624 ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
1626 ACE_Message_Block & mb = result.message_block ();
1628 if (loglevel > 1)
1630 LogLocker log_lock;
1632 ACE_DEBUG ((LM_DEBUG,
1633 ACE_TEXT ("(%t) **** Client %d: handle_read_stream() ****\n"),
1634 this->id_));
1635 ACE_DEBUG ((LM_DEBUG,
1636 ACE_TEXT ("%s = %d\n"),
1637 ACE_TEXT ("bytes_to_read"),
1638 result.bytes_to_read ()));
1639 ACE_DEBUG ((LM_DEBUG,
1640 ACE_TEXT ("%s = %d\n"),
1641 ACE_TEXT ("handle"),
1642 result.handle ()));
1643 ACE_DEBUG ((LM_DEBUG,
1644 ACE_TEXT ("%s = %d\n"),
1645 ACE_TEXT ("bytes_transfered"),
1646 result.bytes_transferred ()));
1647 ACE_DEBUG ((LM_DEBUG,
1648 ACE_TEXT ("%s = %@\n"),
1649 ACE_TEXT ("act"),
1650 result.act ()));
1651 ACE_DEBUG ((LM_DEBUG,
1652 ACE_TEXT ("%s = %d\n"),
1653 ACE_TEXT ("success"),
1654 result.success ()));
1655 ACE_DEBUG ((LM_DEBUG,
1656 ACE_TEXT ("%s = %@\n"),
1657 ACE_TEXT ("completion_key"),
1658 result.completion_key ()));
1659 ACE_DEBUG ((LM_DEBUG,
1660 ACE_TEXT ("%s = %d\n"),
1661 ACE_TEXT ("error"),
1662 result.error ()));
1664 #if defined (ACE_WIN32)
1665 char index = 0;
1666 for (ACE_Message_Block* mb_i = &mb;
1667 mb_i != 0;
1668 mb_i = mb_i->cont ())
1670 ++index;
1671 // write 0 at string end for proper printout
1672 mb_i->wr_ptr()[0] = '\0';
1674 ACE_DEBUG ((LM_DEBUG,
1675 ACE_TEXT ("%s%d = %s\n"),
1676 ACE_TEXT ("message_block, part "),
1677 index,
1678 mb_i->rd_ptr ()));
1680 #else /* ACE_WIN32 */
1681 // write 0 at string end for proper printout
1682 mb.rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout
1683 ACE_DEBUG ((LM_DEBUG,
1684 ACE_TEXT ("%s = %s\n"),
1685 ACE_TEXT ("message_block"),
1686 mb.rd_ptr ()));
1687 #endif /* ACE_WIN32 */
1689 ACE_DEBUG ((LM_DEBUG,
1690 ACE_TEXT ("**** end of message ****************\n")));
1692 else if (result.error () != 0)
1694 ACE_Log_Priority prio;
1695 #if defined (ACE_WIN32)
1696 if (result.error () == ERROR_OPERATION_ABORTED)
1697 prio = LM_DEBUG;
1698 #else
1699 if (result.error () == ECANCELED)
1700 prio = LM_DEBUG;
1701 #endif /* ACE_WIN32 */
1702 else
1703 prio = LM_ERROR;
1704 ACE_Log_Msg::instance ()->errnum (result.error ());
1705 ACE_Log_Msg::instance ()->log (prio,
1706 ACE_TEXT ("(%t) Client %d; %p\n"),
1707 this->id_,
1708 ACE_TEXT ("read"));
1710 else if (loglevel > 0)
1712 ACE_DEBUG ((LM_DEBUG,
1713 ACE_TEXT ("(%t) Client %d: read %d bytes ok\n"),
1714 this->id_,
1715 result.bytes_transferred ()));
1718 mb.release ();
1720 if (result.error () == 0 && result.bytes_transferred () > 0)
1722 this->total_rcv_ += result.bytes_transferred ();
1724 if (duplex != 0 || this->stop_writing_) // full duplex, continue read
1725 this->initiate_read_stream ();
1726 else // half-duplex write, after write we will start read
1727 this->initiate_write_stream ();
1730 --this->io_count_;
1731 if (this->io_count_ > 0)
1732 return;
1734 delete this;
1737 // *************************************************************
1738 // Configuration helpers
1739 // *************************************************************
1741 print_usage (int /* argc */, ACE_TCHAR *argv[])
1743 ACE_ERROR
1744 ((LM_ERROR,
1745 ACE_TEXT ("\nusage: %s")
1746 ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
1747 ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
1748 ACE_TEXT ("\n a AIOCB")
1749 ACE_TEXT ("\n i SIG")
1750 ACE_TEXT ("\n c CB")
1751 ACE_TEXT ("\n d default")
1752 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1753 ACE_TEXT ("\n-h <host> for Client mode")
1754 ACE_TEXT ("\n-n <number threads for Proactor pool>")
1755 ACE_TEXT ("\n-p <port to listen/connect>")
1756 ACE_TEXT ("\n-c <number of client instances>")
1757 ACE_TEXT ("\n-b run client and server at the same time")
1758 ACE_TEXT ("\n f file")
1759 ACE_TEXT ("\n c console")
1760 ACE_TEXT ("\n-v log level")
1761 ACE_TEXT ("\n 0 - log errors and highlights")
1762 ACE_TEXT ("\n 1 - log level 0 plus progress information")
1763 ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
1764 ACE_TEXT ("\n-x max transfer byte count per Client")
1765 ACE_TEXT ("\n-u show this message")
1766 ACE_TEXT ("\n"),
1767 argv[0]
1769 return -1;
1772 static int
1773 set_proactor_type (const ACE_TCHAR *ptype)
1775 if (!ptype)
1776 return 0;
1778 switch (ACE_OS::ace_toupper (*ptype))
1780 case 'D':
1781 proactor_type = DEFAULT;
1782 return 1;
1783 case 'A':
1784 proactor_type = AIOCB;
1785 return 1;
1786 case 'I':
1787 proactor_type = SIG;
1788 return 1;
1789 #if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
1790 case 'C':
1791 proactor_type = CB;
1792 return 1;
1793 #endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
1794 default:
1795 break;
1797 return 0;
1800 static int
1801 parse_args (int argc, ACE_TCHAR *argv[])
1803 // First, set up all the defaults then let any args change them.
1804 both = 1; // client and server simultaneosly
1805 duplex = 1; // full duplex is on
1806 #if defined (ACE_HAS_IPV6)
1807 host = ACE_IPV6_LOCALHOST; // server to connect (IPv6 localhost)
1808 #else
1809 host = ACE_LOCALHOST;
1810 #endif /*ACE_HAS_IPV6*/
1811 port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
1812 max_aio_operations = 512; // POSIX Proactor params
1813 proactor_type = DEFAULT; // Proactor type = default
1814 threads = 3; // size of Proactor thread pool
1815 clients = 10; // number of clients
1816 loglevel = 0; // log level : only errors and highlights
1817 // Default transfer limit 50 messages per Sender
1818 xfer_limit = 50 * ACE_OS::strlen (complete_message);
1820 // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
1821 # if defined (ACE_LINUX)
1822 duplex = 0;
1823 #endif
1825 if (argc == 1) // no arguments , so one button test
1826 return 0;
1828 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
1829 int c;
1831 while ((c = get_opt ()) != EOF)
1833 switch (c)
1835 case 'x': // xfer limit
1836 xfer_limit = static_cast<size_t> (ACE_OS::atoi (get_opt.opt_arg ()));
1837 if (xfer_limit == 0)
1838 xfer_limit = 1; // Bare minimum.
1839 break;
1840 case 'b': // both client and server
1841 both = 1;
1842 break;
1843 case 'v': // log level
1844 loglevel = ACE_OS::atoi (get_opt.opt_arg ());
1845 break;
1846 case 'd': // duplex
1847 duplex = ACE_OS::atoi (get_opt.opt_arg ());
1848 break;
1849 case 'h': // host for sender
1850 host = get_opt.opt_arg ();
1851 break;
1852 case 'p': // port number
1853 port = ACE_OS::atoi (get_opt.opt_arg ());
1854 break;
1855 case 'n': // thread pool size
1856 threads = ACE_OS::atoi (get_opt.opt_arg ());
1857 break;
1858 case 'c': // number of clients
1859 clients = ACE_OS::atoi (get_opt.opt_arg ());
1860 if (clients > MAX_CLIENTS)
1861 clients = MAX_CLIENTS;
1862 break;
1863 case 'o': // max number of aio for proactor
1864 max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
1865 break;
1866 case 't': // Proactor Type
1867 if (set_proactor_type (get_opt.opt_arg ()))
1868 break;
1869 return print_usage (argc, argv);
1870 case 'u':
1871 default:
1872 return print_usage (argc, argv);
1873 } // switch
1874 } // while
1876 return 0;
1880 run_main (int argc, ACE_TCHAR *argv[])
1882 ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPV6"));
1884 if (::parse_args (argc, argv) == -1)
1885 return -1;
1887 #if defined (ACE_HAS_IPV6)
1888 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
1889 disable_signal (SIGPIPE, SIGPIPE);
1891 MyTask task1;
1892 TestData test;
1894 if (task1.start (threads, proactor_type, max_aio_operations) == 0)
1896 Acceptor acceptor (&test);
1897 Connector connector (&test);
1898 ACE_INET_Addr addr (port, "::");
1900 int rc = 0;
1902 if (both != 0 || host == 0) // Acceptor
1904 // Simplify, initial read with zero size
1905 if (acceptor.open (addr, 0, 1) == 0)
1906 rc = 1;
1909 if (both != 0 || host != 0)
1911 if (host == 0)
1912 host = ACE_IPV6_LOCALHOST;
1914 if (addr.set (port, host, 1, addr.get_type ()) == -1)
1915 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
1916 else
1917 rc += connector.start (addr, clients);
1920 // Wait a few seconds to let things get going, then poll til
1921 // all sessions are done. Note that when we exit this scope, the
1922 // Acceptor and Connector will be destroyed, which should prevent
1923 // further connections and also test how well destroyed handlers
1924 // are handled.
1925 ACE_OS::sleep (3);
1927 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
1928 while (!test.testing_done ())
1929 ACE_OS::sleep (1);
1931 test.stop_all ();
1933 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
1934 task1.stop ();
1936 #endif /* ACE_HAS_IPV6 */
1938 ACE_END_TEST;
1940 return 0;
1943 #else
1946 run_main (int, ACE_TCHAR *[])
1948 ACE_START_TEST (ACE_TEXT ("Proactor_Test_IPV6"));
1950 ACE_DEBUG ((LM_INFO,
1951 ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
1952 ACE_TEXT ("Proactor_Test_IPV6 will not be run.\n")));
1954 ACE_END_TEST;
1956 return 0;
1959 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */