Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / ACE / tests / TP_Reactor_Test.cpp
blobd81410851ba13b6c20bf823e96dbde5a778ddee4
2 //=============================================================================
3 /**
4 * @file TP_Reactor_Test.cpp
6 * This program illustrates how the <ACE_TP_Reactor> can be used to
7 * implement an application that does various operations.
8 * usage: TP_Reactor_Test
9 * -n number threads in the TP_Reactor thread pool
10 * -d duplex mode 1 (full-duplex) vs. 0 (half-duplex)
11 * -p port to listen(Server)/connect(Client)
12 * -h host to connect (Client mode)
13 * -s number of sender's instances ( Client mode)
14 * -b run client and server (both modes ) at the same time
15 * -v log level
16 * 0 - log all messages
17 * 1 - log only errors and unusual cases
18 * -i time to run in seconds
19 * -u show this message
21 * The main differences between Thread_Pool_Reactor_Test.cpp and
22 * this test are:
24 * 1. Thread_Pool_Reactor_Test.cpp tests only handle_input()
25 * events on the server, whereas this one tests both handle_input() and
26 * handle_output() on both server and client, i.e., the receiver
27 * and sender are completely event-driven.
29 * 2. The receiver and sender in this test can work in full duplex
30 * mode, i.e., input and ouput events are processed independently.
31 * Half-duplex mode (request-reply) is also supported.
33 * This test is therefore a bit more stressful than the
34 * Thread_Pool_Reactor.cpp for the ACE_TP_Reactor since same
35 * thread pool is shared between client and server.
37 * This test is a "twin" of the Proactor_Test.cpp, so it can help for
38 * developers to provide independent of Reactor/Proactor solutions.
40 * @author Alexander Libman <alibman@ihug.com.au>
41 * @author <alexl@rumblgroup.com>
43 //=============================================================================
46 #include "test_config.h"
48 #if defined(ACE_HAS_THREADS)
50 #include "TP_Reactor_Test.h"
52 #include "ace/Signal.h"
53 #include "ace/Service_Config.h"
54 #include "ace/Get_Opt.h"
56 #include "ace/Reactor.h"
57 #include "ace/TP_Reactor.h"
58 #include "ace/OS_NS_signal.h"
59 #include "ace/OS_NS_stdio.h"
60 #include "ace/OS_NS_string.h"
61 #include "ace/OS_NS_unistd.h"
62 #include "ace/Synch_Traits.h"
63 #include "ace/Thread_Semaphore.h"
65 // Some debug helper functions
66 static int disable_signal (int sigmin, int sigmax);
68 // both: 0 run client or server / depends on host
69 // != 0 run client and server
70 static int both = 0;
72 // Host that we're connecting to.
73 static const ACE_TCHAR *host = 0;
75 // number of Senders instances
76 static int senders = 1;
78 // duplex mode: == 0 half-duplex
79 // != 0 full duplex
80 static int duplex = 0;
82 // number threads in the TP_Reactor thread pool
83 static int threads = 1;
85 // Port that we're receiving connections on.
86 static u_short port = ACE_DEFAULT_SERVER_PORT;
88 // Log options
89 static int loglevel = 1; // 0 full , 1 only errors
91 static const size_t MIN_TIME = 1; // min 1 sec
92 static const size_t MAX_TIME = 3600; // max 1 hour
93 static u_int seconds = 2; // default time to run - 2 seconds
95 static char data[] =
96 "GET / HTTP/1.1\r\n"
97 "Accept: */*\r\n"
98 "Accept-Language: C++\r\n"
99 "Accept-Encoding: gzip, deflate\r\n"
100 "User-Agent: TPReactor_Test/1.0 (non-compatible)\r\n"
101 "Connection: Keep-Alive\r\n"
102 "\r\n" ;
104 // *************************************************************
106 class LogLocker
108 public:
109 LogLocker () { ACE_LOG_MSG->acquire (); }
110 virtual ~LogLocker () { ACE_LOG_MSG->release (); }
112 // *************************************************************
115 * @class MyTask
117 * MyTask plays role for TP_Reactor threads pool
119 * MyTask is ACE_Task resposible for:
120 * 1. Creation and deletion of TP_Reactor and TP_Reactor thread pool
121 * 2. Running TP_Reactor event loop
123 class MyTask : public ACE_Task<ACE_MT_SYNCH>
125 public:
126 MyTask (): sem_ ((unsigned int) 0),
127 my_reactor_ (0) {}
129 ~MyTask () override { stop (); }
131 int svc () override;
133 int start (int num_threads);
134 int stop ();
136 private:
137 int create_reactor ();
138 int delete_reactor ();
140 ACE_SYNCH_RECURSIVE_MUTEX lock_;
141 ACE_Thread_Semaphore sem_;
142 ACE_Reactor *my_reactor_;
146 MyTask::create_reactor ()
148 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
149 monitor,
150 this->lock_,
151 -1);
153 ACE_TEST_ASSERT (this->my_reactor_ == 0);
155 ACE_TP_Reactor * pImpl = 0;
157 ACE_NEW_RETURN (pImpl,ACE_TP_Reactor, -1);
159 ACE_NEW_RETURN (my_reactor_,
160 ACE_Reactor (pImpl ,1),
161 -1);
163 ACE_DEBUG ((LM_DEBUG,
164 ACE_TEXT (" (%t) Create TP_Reactor\n")));
166 ACE_Reactor::instance (this->my_reactor_);
168 this->reactor (my_reactor_);
170 return 0;
174 MyTask::delete_reactor ()
176 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
177 monitor,
178 this->lock_,
179 -1);
181 ACE_DEBUG ((LM_DEBUG,
182 ACE_TEXT (" (%t) Delete TP_Reactor\n")));
184 delete this->my_reactor_;
185 ACE_Reactor::instance ((ACE_Reactor *) 0);
186 this->my_reactor_ = 0;
187 this->reactor (0);
189 return 0;
193 MyTask::start (int num_threads)
195 if (this->create_reactor () == -1)
196 ACE_ERROR_RETURN ((LM_ERROR,
197 ACE_TEXT ("%p.\n"),
198 ACE_TEXT ("unable to create reactor")),
199 -1);
201 if (this->activate (THR_NEW_LWP, num_threads) == -1)
202 ACE_ERROR_RETURN ((LM_ERROR,
203 ACE_TEXT ("%p.\n"),
204 ACE_TEXT ("unable to activate thread pool")),
205 -1);
207 for (; num_threads > 0 ; num_threads--)
208 sem_.acquire ();
210 return 0;
215 MyTask::stop ()
217 if (this->my_reactor_ != 0)
219 ACE_DEBUG ((LM_DEBUG,
220 ACE_TEXT ("End TP_Reactor event loop\n")));
222 ACE_Reactor::instance()->end_reactor_event_loop ();
225 if (this->wait () == -1)
226 ACE_ERROR ((LM_ERROR,
227 ACE_TEXT ("%p.\n"),
228 ACE_TEXT ("unable to stop thread pool")));
230 if (this->delete_reactor () == -1)
231 ACE_ERROR ((LM_ERROR,
232 ACE_TEXT ("%p.\n"),
233 ACE_TEXT ("unable to delete reactor")));
235 return 0;
239 MyTask::svc ()
241 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask started\n")));
243 disable_signal (SIGPIPE, SIGPIPE);
245 // signal that we are ready
246 sem_.release (1);
248 while (ACE_Reactor::instance()->reactor_event_loop_done () == 0)
249 ACE_Reactor::instance()->run_reactor_event_loop ();
251 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask finished\n")));
252 return 0;
255 // *************************************************************
257 Acceptor::Acceptor ()
258 : ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> ((ACE_Reactor *) 0),
259 sessions_ (0),
260 total_snd_(0),
261 total_rcv_(0),
262 total_w_ (0),
263 total_r_ (0)
265 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
267 for (size_t i = 0; i < MAX_RECEIVERS; ++i)
268 this->list_receivers_[i] =0;
271 Acceptor::~Acceptor ()
273 this->reactor (0);
274 stop ();
277 void
278 Acceptor::stop ()
280 // this method can be called only after reactor event loop id done
281 // in all threads
283 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
285 for (size_t i = 0; i < MAX_RECEIVERS; ++i)
287 delete this->list_receivers_[i];
288 this->list_receivers_[i] =0;
292 void
293 Acceptor::on_new_receiver (Receiver &rcvr)
295 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
296 this->sessions_++;
297 this->list_receivers_[rcvr.index_] = & rcvr;
298 ACE_DEBUG ((LM_DEBUG,
299 "Receiver::CTOR sessions_=%d\n",
300 this->sessions_));
303 void
304 Acceptor::on_delete_receiver (Receiver &rcvr)
306 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
308 this->sessions_--;
310 this->total_snd_ += rcvr.get_total_snd ();
311 this->total_rcv_ += rcvr.get_total_rcv ();
312 this->total_w_ += rcvr.get_total_w ();
313 this->total_r_ += rcvr.get_total_r ();
315 if (rcvr.index_ < MAX_RECEIVERS
316 && this->list_receivers_[rcvr.index_] == &rcvr)
317 this->list_receivers_[rcvr.index_] = 0;
319 ACE_TCHAR bufs [256];
320 ACE_TCHAR bufr [256];
322 ACE_OS::snprintf (bufs, 256, ACE_TEXT ("%ld(%ld)"),
323 rcvr.get_total_snd (),
324 rcvr.get_total_w ());
326 ACE_OS::snprintf (bufr, 256, ACE_TEXT ("%ld(%ld)"),
327 rcvr.get_total_rcv (),
328 rcvr.get_total_r ());
330 ACE_DEBUG ((LM_DEBUG,
331 ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
332 rcvr.index_,
333 bufs,
334 bufr,
335 this->sessions_));
339 Acceptor::start (const ACE_INET_Addr &addr)
341 if (ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR>::open (addr,
342 ACE_Reactor::instance (),
343 ACE_NONBLOCK) < 0)
344 ACE_ERROR_RETURN ((LM_ERROR,
345 ACE_TEXT("%p\n"),
346 ACE_TEXT("Acceptor::start () - open failed")),
348 return 1;
352 Acceptor::make_svc_handler (Receiver *&sh)
354 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
356 if (sessions_ >= MAX_RECEIVERS)
357 return -1;
359 for (size_t i = 0; i < MAX_RECEIVERS; ++i)
360 if (this->list_receivers_ [i] == 0)
362 ACE_NEW_RETURN (sh,
363 Receiver (this , i),
364 -1);
365 return 0;
367 return -1;
370 // *************************************************************
372 Receiver::Receiver (Acceptor * acceptor, size_t index)
373 : acceptor_ (acceptor),
374 index_ (index),
375 flg_mask_ (ACE_Event_Handler::NULL_MASK),
376 total_snd_(0),
377 total_rcv_(0),
378 total_w_ (0),
379 total_r_ (0)
381 if (acceptor_ != 0)
382 acceptor_->on_new_receiver (*this);
386 Receiver::~Receiver ()
388 this->reactor (0);
389 if (acceptor_ != 0)
390 acceptor_->on_delete_receiver (*this);
392 this->index_ = 0;
394 for (; ;)
396 ACE_Time_Value tv = ACE_Time_Value::zero;
397 ACE_Message_Block *mb = 0;
399 if (this->getq (mb, &tv) < 0)
400 break;
402 ACE_Message_Block::release (mb);
407 Receiver::check_destroy ()
409 if (flg_mask_ == ACE_Event_Handler::NULL_MASK)
410 return -1;
412 return 0;
416 Receiver::open (void *)
418 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
420 ACE_Reactor *TPReactor = ACE_Reactor::instance ();
422 this->reactor (TPReactor);
424 flg_mask_ = ACE_Event_Handler::NULL_MASK ;
426 if (TPReactor->register_handler (this, flg_mask_) == -1)
427 return -1;
429 initiate_io (ACE_Event_Handler::READ_MASK);
431 return check_destroy ();
435 Receiver::initiate_io (ACE_Reactor_Mask mask)
437 if (ACE_BIT_ENABLED (flg_mask_, mask))
438 return 0;
440 if (ACE_Reactor::instance ()->schedule_wakeup (this, mask) == -1)
441 return -1;
443 ACE_SET_BITS (flg_mask_, mask);
444 return 0;
448 Receiver::terminate_io (ACE_Reactor_Mask mask)
450 if (ACE_BIT_DISABLED (flg_mask_, mask))
451 return 0;
453 if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1)
454 return -1;
456 ACE_CLR_BITS (flg_mask_, mask);
457 return 0;
461 Receiver::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
463 ACE_Reactor *TPReactor = ACE_Reactor::instance ();
465 TPReactor->remove_handler (this,
466 ACE_Event_Handler::ALL_EVENTS_MASK |
467 ACE_Event_Handler::DONT_CALL); // Don't call handle_close
468 this->reactor (0);
469 this->destroy ();
470 return 0;
474 Receiver::handle_input (ACE_HANDLE h)
476 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
478 ACE_Message_Block *mb = 0;
479 ACE_NEW_RETURN (mb,
480 ACE_Message_Block (BUFSIZ),
481 -1);
483 int err = 0;
484 ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1);
486 this->total_r_++;
488 if (res >= 0)
490 mb->wr_ptr (res);
491 this->total_rcv_ += res;
493 else
494 err = errno ;
496 mb->wr_ptr ()[0] = '\0';
498 if (loglevel == 0 || res <= 0 || err!= 0)
500 LogLocker log_lock;
502 ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_input () SessionId=%d****\n", index_));
503 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_to_read", BUFSIZ));
504 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "handle", h));
505 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_transferred", res));
506 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "error", err));
507 ACE_DEBUG ((LM_DEBUG, "%C = %s\n", "message_block", mb->rd_ptr ()));
508 ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
511 if (err == EWOULDBLOCK)
513 err=0;
514 res=0;
515 return check_destroy ();
518 if (err !=0 || res <= 0)
520 ACE_Message_Block::release (mb);
521 return -1;
524 ACE_Time_Value tv = ACE_Time_Value::zero;
526 int qcount = this->putq (mb, & tv);
528 if (qcount <= 0) // failed to putq
530 ACE_Message_Block::release (mb);
531 return -1 ;
534 int rc = 0;
536 if (duplex == 0) // half-duplex , stop read
537 rc = this->terminate_io (ACE_Event_Handler::READ_MASK);
538 else // full duplex
540 if (qcount >= 20 ) // flow control, stop read
541 rc = this->terminate_io (ACE_Event_Handler::READ_MASK);
542 else
543 rc = this->initiate_io (ACE_Event_Handler::READ_MASK);
546 if (rc == -1)
547 return -1;
549 //initiate write
550 if (this->initiate_io (ACE_Event_Handler::WRITE_MASK) != 0)
551 return -1;
553 return check_destroy ();
557 Receiver::handle_output (ACE_HANDLE h)
559 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
561 ACE_Time_Value tv = ACE_Time_Value::zero;
562 ACE_Message_Block *mb = 0;
564 int err = 0;
565 ssize_t res = 0;
566 size_t bytes = 0;
568 int qcount = this->getq (mb, &tv);
570 if (mb != 0) // qcount >= 0)
572 bytes = mb->length ();
573 res = this->peer ().send (mb->rd_ptr (), bytes);
575 this->total_w_++;
577 if (res < 0)
578 err = errno ;
579 else
580 this->total_snd_ += res;
583 if (loglevel == 0 || res <= 0 || err!= 0)
585 LogLocker log_lock;
587 ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_output () SessionId=%d****\n", index_));
588 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_to_write", bytes));
589 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "handle", h));
590 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_transferred", res));
591 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "error", err));
592 ACE_DEBUG ((LM_DEBUG, "%C = %s\n", "message_block", mb->rd_ptr ()));
593 ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
597 ACE_Message_Block::release (mb);
599 if (err != 0 || res < 0)
600 return -1;
602 if (qcount <= 0) // no more message blocks in queue
604 if (this->terminate_io (ACE_Event_Handler::WRITE_MASK) != 0)
605 return -1;
607 if (this->initiate_io (ACE_Event_Handler::READ_MASK) != 0)
608 return -1;
611 return check_destroy ();
614 // *************************************************************
616 Connector::Connector ()
617 : ACE_Connector<Sender,ACE_SOCK_CONNECTOR> ((ACE_Reactor *) 0),
618 sessions_ (0),
619 total_snd_(0),
620 total_rcv_(0),
621 total_w_ (0),
622 total_r_ (0)
624 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
626 for (size_t i = 0; i < MAX_SENDERS; ++i)
627 this->list_senders_[i] = 0;
630 Connector::~Connector ()
632 this->reactor (0);
633 stop ();
636 void
637 Connector::stop ()
639 // this method can be called only
640 // after reactor event loop id done
641 // in all threads
643 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
645 for (size_t i = 0; i < MAX_SENDERS; ++i)
647 delete this->list_senders_[i];
648 this->list_senders_[i] =0;
652 void
653 Connector::on_new_sender (Sender & sndr)
655 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
656 this->sessions_++;
657 this->list_senders_[sndr.index_] = &sndr;
658 ACE_DEBUG ((LM_DEBUG,
659 "Sender::CTOR sessions_=%d\n",
660 this->sessions_));
663 void
664 Connector::on_delete_sender (Sender & sndr)
666 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
668 this->sessions_--;
669 this->total_snd_ += sndr.get_total_snd();
670 this->total_rcv_ += sndr.get_total_rcv();
671 this->total_w_ += sndr.get_total_w();
672 this->total_r_ += sndr.get_total_r();
674 if (sndr.index_ < MAX_SENDERS
675 && this->list_senders_[sndr.index_] == &sndr)
676 this->list_senders_[sndr.index_] = 0;
678 ACE_TCHAR bufs [256];
679 ACE_TCHAR bufr [256];
681 ACE_OS::snprintf (bufs, 256, ACE_TEXT ("%ld(%ld)"),
682 sndr.get_total_snd (),
683 sndr.get_total_w ());
685 ACE_OS::snprintf (bufr, 256, ACE_TEXT ("%ld(%ld)"),
686 sndr.get_total_rcv (),
687 sndr.get_total_r ());
689 ACE_DEBUG ((LM_DEBUG,
690 ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
691 sndr.index_,
692 bufs,
693 bufr,
694 this->sessions_));
698 Connector::start (const ACE_INET_Addr & addr, int num)
700 if (ACE_Connector<Sender,ACE_SOCK_CONNECTOR>::open (ACE_Reactor::instance (),
701 ACE_NONBLOCK) < 0)
702 ACE_ERROR_RETURN
703 ((LM_ERROR,
704 ACE_TEXT("%p\n"),
705 ACE_TEXT("Connector::start () - open failed")),
708 int rc = 0;
710 for (int i = 0 ; i < num ; i++)
712 Sender * sender = 0;
714 if (ACE_Connector<Sender,ACE_SOCK_CONNECTOR>::connect (sender, addr) < 0)
715 ACE_ERROR_RETURN
716 ((LM_ERROR,
717 ACE_TEXT("%p\n"),
718 ACE_TEXT("Connector::start () - connect failed")),
719 rc);
722 return rc;
726 Connector::make_svc_handler (Sender * & sh)
728 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
730 if (sessions_ >= MAX_SENDERS)
731 return -1;
733 for (size_t i = 0; i < MAX_SENDERS; ++i)
734 if (this->list_senders_ [i] == 0)
736 ACE_NEW_RETURN (sh,
737 Sender (this , i),
738 -1);
739 return 0;
742 return -1;
745 // *************************************************************
747 Sender::Sender (Connector* connector, size_t index)
748 : connector_ (connector),
749 index_ (index),
750 flg_mask_ (ACE_Event_Handler::NULL_MASK),
751 total_snd_(0),
752 total_rcv_(0),
753 total_w_ (0),
754 total_r_ (0)
756 if (connector_ != 0)
757 connector_->on_new_sender (*this);
759 ACE_OS::snprintf (send_buf_, 1024, "%s", data);
763 Sender::~Sender ()
765 this->reactor (0);
766 if (connector_ != 0)
767 connector_->on_delete_sender (*this);
769 this->index_ = 0;
771 for (; ;)
773 ACE_Time_Value tv = ACE_Time_Value::zero;
774 ACE_Message_Block *mb = 0;
776 if (this->getq (mb, &tv) < 0)
777 break;
779 ACE_Message_Block::release (mb);
784 Sender::check_destroy ()
786 if (flg_mask_ == ACE_Event_Handler::NULL_MASK)
787 return -1;
789 return 0;
792 int Sender::open (void *)
794 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
796 ACE_Reactor * TPReactor = ACE_Reactor::instance ();
798 this->reactor (TPReactor);
800 flg_mask_ = ACE_Event_Handler::NULL_MASK ;
802 if (TPReactor->register_handler (this,flg_mask_) == -1)
803 return -1;
805 if (this->initiate_write () == -1)
806 return -1;
808 if (duplex != 0)
809 initiate_io (ACE_Event_Handler::READ_MASK);
811 return check_destroy ();
815 Sender::initiate_write ()
817 if ( this->msg_queue ()->message_count () < 20) // flow control
819 size_t nbytes = ACE_OS::strlen (send_buf_);
821 ACE_Message_Block *mb = 0;
822 ACE_NEW_RETURN (mb,
823 ACE_Message_Block (nbytes+8),
824 -1);
826 mb->init (send_buf_, nbytes);
827 mb->rd_ptr (mb->base ());
828 mb->wr_ptr (mb->base ());
829 mb->wr_ptr (nbytes);
831 ACE_Time_Value tv = ACE_Time_Value::zero;
833 int qcount =this->putq (mb, & tv);
835 if (qcount <= 0)
837 ACE_Message_Block::release (mb);
838 return -1;
842 return initiate_io (ACE_Event_Handler::WRITE_MASK);
846 Sender::initiate_io (ACE_Reactor_Mask mask)
848 if (ACE_BIT_ENABLED (flg_mask_, mask))
849 return 0;
851 if (ACE_Reactor::instance ()->schedule_wakeup (this, mask) == -1)
852 return -1;
854 ACE_SET_BITS (flg_mask_, mask);
855 return 0;
859 Sender::terminate_io (ACE_Reactor_Mask mask)
861 if (ACE_BIT_DISABLED (flg_mask_, mask))
862 return 0;
864 if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1)
865 return -1;
867 ACE_CLR_BITS (flg_mask_, mask);
868 return 0;
872 Sender::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
874 ACE_Reactor * TPReactor = ACE_Reactor::instance ();
876 TPReactor->remove_handler (this,
877 ACE_Event_Handler::ALL_EVENTS_MASK |
878 ACE_Event_Handler::DONT_CALL); // Don't call handle_close
879 this->reactor (0);
880 this->destroy ();
881 return 0;
885 Sender::handle_input (ACE_HANDLE h)
887 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
889 ACE_Message_Block *mb = 0;
890 ACE_NEW_RETURN (mb,
891 ACE_Message_Block (BUFSIZ),
892 -1);
894 int err = 0;
895 ssize_t res = this->peer ().recv (mb->rd_ptr (),
896 BUFSIZ-1);
897 this->total_r_++;
899 if (res >= 0)
901 mb->wr_ptr (res);
902 this->total_rcv_ += res;
904 else
905 err = errno ;
907 mb->wr_ptr ()[0] = '\0';
909 if (loglevel == 0 || res <= 0 || err!= 0)
911 LogLocker log_lock;
913 ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_input () SessionId=%d****\n", index_));
914 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_to_read", BUFSIZ));
915 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "handle", h));
916 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_transferred", res));
917 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "error", err));
918 ACE_DEBUG ((LM_DEBUG, "%C = %s\n", "message_block", mb->rd_ptr ()));
919 ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
922 ACE_Message_Block::release (mb);
924 if (err == EWOULDBLOCK)
926 err=0;
927 res=0;
928 return check_destroy ();
931 if (err !=0 || res <= 0)
932 return -1;
934 int rc = 0;
936 if (duplex != 0) // full duplex, continue read
937 rc = initiate_io (ACE_Event_Handler::READ_MASK);
938 else
939 rc = terminate_io (ACE_Event_Handler::READ_MASK);
941 if (rc != 0)
942 return -1 ;
944 rc = initiate_write ();
945 if (rc != 0)
946 return -1;
948 return check_destroy ();
952 Sender::handle_output (ACE_HANDLE h)
954 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
956 ACE_Time_Value tv = ACE_Time_Value::zero;
957 ACE_Message_Block *mb = 0;
959 int err=0;
960 ssize_t res=0;
961 size_t bytes=0;
963 int qcount = this->getq (mb , & tv);
965 if (mb != 0) // qcount >= 0
967 bytes = mb->length ();
968 res = this->peer ().send (mb->rd_ptr (), bytes);
970 this->total_w_++;
972 if (res < 0)
973 err = errno ;
974 else
975 this->total_snd_ += res;
977 if (loglevel == 0 || res <= 0 || err!= 0)
979 LogLocker log_lock;
981 ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_output () SessionId=%d****\n", index_));
982 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_to_write", bytes));
983 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "handle", h));
984 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_transferred", res));
985 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "error", err));
986 ACE_DEBUG ((LM_DEBUG, "%C = %s\n", "message_block", mb->rd_ptr ()));
987 ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
991 ACE_Message_Block::release (mb);
993 if (err != 0 || res < 0)
994 return -1;
996 int rc = 0;
998 if (qcount <= 0) // no more message blocks in queue
1000 if (duplex != 0 && // full duplex, continue write
1001 (this->total_snd_ - this->total_rcv_ ) < 1024*32 ) // flow control
1002 rc = initiate_write ();
1003 else
1004 rc = terminate_io (ACE_Event_Handler::WRITE_MASK);
1006 if (rc == -1)
1007 return -1;
1010 rc = initiate_io (ACE_Event_Handler::READ_MASK);
1011 if (rc == -1)
1012 return -1;
1014 return check_destroy ();
1018 // *************************************************************
1019 // Configuration helpers
1020 // *************************************************************
1022 print_usage (int /* argc */, ACE_TCHAR *argv[])
1024 ACE_ERROR
1025 ((LM_ERROR,
1026 ACE_TEXT ("\nusage: %s")
1027 ACE_TEXT ("\n-n <number threads in the thread pool>")
1028 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1029 ACE_TEXT ("\n-p <port to listen/connect>")
1030 ACE_TEXT ("\n-h <host> for Sender mode")
1031 ACE_TEXT ("\n-s <number of sender's instances>")
1032 ACE_TEXT ("\n-b run client and server at the same time")
1033 ACE_TEXT ("\n-v log level")
1034 ACE_TEXT ("\n 0 - log all messages")
1035 ACE_TEXT ("\n 1 - log only errors and unusual cases")
1036 ACE_TEXT ("\n-i time to run in seconds")
1037 ACE_TEXT ("\n-u show this message")
1038 ACE_TEXT ("\n"),
1039 argv[0]
1041 return -1;
1045 parse_args (int argc, ACE_TCHAR *argv[])
1047 if (argc == 1) // no arguments , so one button test
1049 both = 1; // client and server simultaneosly
1050 duplex = 1; // full duplex is on
1051 host = ACE_LOCALHOST; // server to connect
1052 port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
1053 threads = 3; // size of Proactor thread pool
1054 senders = 20; // number of senders
1055 loglevel = 1; // log level : 0 full/ 1 only errors
1056 seconds = 20; // time to run in seconds
1057 #if defined(SOMAXCONN) // The test is invalid if senders > SOMAXCONN
1058 if(SOMAXCONN < senders)
1059 senders = SOMAXCONN;
1060 #endif
1061 return 0;
1064 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("i:n:p:d:h:s:v:ub"));
1065 int c;
1067 while ((c = get_opt ()) != EOF)
1069 switch (c)
1071 case 'i': // time to run
1072 seconds = ACE_OS::atoi (get_opt.opt_arg());
1073 if (seconds < MIN_TIME)
1074 seconds = MIN_TIME;
1075 if (seconds > MAX_TIME)
1076 seconds = MAX_TIME;
1077 break;
1078 case 'b': // both client and server
1079 both = 1;
1080 break;
1081 case 'v': // log level
1082 loglevel = ACE_OS::atoi (get_opt.opt_arg());
1083 break;
1084 case 'd': // duplex
1085 duplex = ACE_OS::atoi (get_opt.opt_arg());
1086 break;
1087 case 'h': // host for sender
1088 host = get_opt.opt_arg();
1089 break;
1090 case 'p': // port number
1091 port = ACE_OS::atoi (get_opt.opt_arg());
1092 break;
1093 case 'n': // thread pool size
1094 threads = ACE_OS::atoi (get_opt.opt_arg());
1095 break;
1096 case 's': // number of senders
1097 senders = ACE_OS::atoi (get_opt.opt_arg());
1098 if (size_t (senders) > MAX_SENDERS)
1099 senders = MAX_SENDERS;
1100 break;
1101 case 'u':
1102 default:
1103 return print_usage (argc,argv);
1104 } // switch
1105 } // while
1107 return 0;
1110 static int
1111 disable_signal (int sigmin, int sigmax)
1113 #if !defined (ACE_LACKS_UNIX_SIGNALS)
1114 sigset_t signal_set;
1115 if (ACE_OS::sigemptyset (&signal_set) == - 1)
1116 ACE_ERROR ((LM_ERROR,
1117 ACE_TEXT("Error: (%P | %t):%p\n"),
1118 ACE_TEXT("sigemptyset failed")));
1120 for (int i = sigmin; i <= sigmax; i++)
1121 ACE_OS::sigaddset (&signal_set, i);
1123 // Put the <signal_set>.
1124 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
1125 // In multi-threaded application this is not POSIX compliant
1126 // but let's leave it just in case.
1127 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
1128 # else
1129 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
1130 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
1131 ACE_ERROR_RETURN ((LM_ERROR,
1132 ACE_TEXT ("Error: (%P|%t): %p\n"),
1133 ACE_TEXT ("SIG_BLOCK failed")),
1134 -1);
1135 #else
1136 ACE_UNUSED_ARG(sigmin);
1137 ACE_UNUSED_ARG(sigmax);
1138 #endif /* ACE_LACKS_UNIX_SIGNALS */
1140 return 0;
1143 #endif /* ACE_HAS_THREADS */
1146 run_main (int argc, ACE_TCHAR *argv[])
1148 ACE_START_TEST (ACE_TEXT ("TP_Reactor_Test"));
1150 #if defined(ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
1151 if (::parse_args (argc, argv) == -1)
1152 return -1;
1154 disable_signal (SIGPIPE, SIGPIPE);
1156 MyTask task1;
1157 Acceptor acceptor;
1158 Connector connector;
1160 if (task1.start (threads) == 0)
1162 int rc = 0;
1164 ACE_INET_Addr addr (port);
1165 if (both != 0 || host == 0) // Acceptor
1166 rc += acceptor.start (addr);
1168 if (both != 0 || host != 0)
1170 if (host == 0)
1171 host = ACE_LOCALHOST;
1173 if (addr.set (port, host, 1, addr.get_type ()) == -1)
1174 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
1175 rc += connector.start (addr, senders);
1178 if (rc > 0)
1179 ACE_OS::sleep (seconds);
1182 task1.stop ();
1184 ACE_DEBUG ((LM_DEBUG,
1185 ACE_TEXT ("\nNumber of Receivers objects = %d\n")
1186 ACE_TEXT ("\nNumber of Sender objects = %d\n"),
1187 acceptor.get_number_sessions (),
1188 connector.get_number_sessions ()));
1190 // As Reactor event loop now is inactive it is safe to destroy all
1191 // senders
1193 connector.stop ();
1194 acceptor.stop ();
1196 //Print statistic
1197 ACE_TCHAR bufs [256];
1198 ACE_TCHAR bufr [256];
1200 ACE_OS::snprintf (bufs, 256, ACE_TEXT ("%ld(%ld)"),
1201 connector.get_total_snd (),
1202 connector.get_total_w ());
1204 ACE_OS::snprintf (bufr, 256, ACE_TEXT ("%ld(%ld)"),
1205 connector.get_total_rcv (),
1206 connector.get_total_r ());
1208 ACE_DEBUG ((LM_DEBUG,
1209 ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
1210 bufs,
1211 bufr
1214 ACE_OS::snprintf (bufs, 256, ACE_TEXT ("%ld(%ld)"),
1215 acceptor.get_total_snd (),
1216 acceptor.get_total_w ());
1218 ACE_OS::snprintf (bufr, 256, ACE_TEXT ("%ld(%ld)"),
1219 acceptor.get_total_rcv (),
1220 acceptor.get_total_r ());
1222 ACE_DEBUG ((LM_DEBUG,
1223 ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),
1224 bufs,
1225 bufr
1228 #else /* ACE_HAS_THREADS */
1229 ACE_UNUSED_ARG( argc );
1230 ACE_UNUSED_ARG( argv );
1231 #endif /* ACE_HAS_THREADS */
1233 ACE_END_TEST;
1235 return 0;