Doxygen changes
[ACE_TAO.git] / ACE / tests / TP_Reactor_Test.cpp
blob50e34740ada8aa7028d53bc60324bdd7fbb5d209
2 //=============================================================================
3 /**
4 * @file TP_Reactor_Test.cpp
6 * This program illustrates how the <ACE_TP_Reactor> can be used to
7 * implement an application that does various operations.
8 * usage: TP_Reactor_Test
9 * -n number threads in the TP_Reactor thread pool
10 * -d duplex mode 1 (full-duplex) vs. 0 (half-duplex)
11 * -p port to listen(Server)/connect(Client)
12 * -h host to connect (Client mode)
13 * -s number of sender's instances ( Client mode)
14 * -b run client and server (both modes ) at the same time
15 * -v log level
16 * 0 - log all messages
17 * 1 - log only errors and unusual cases
18 * -i time to run in seconds
19 * -u show this message
21 * The main differences between Thread_Pool_Reactor_Test.cpp and
22 * this test are:
24 * 1. Thread_Pool_Reactor_Test.cpp tests only handle_input()
25 * events on the server, whereas this one tests both handle_input() and
26 * handle_output() on both server and client, i.e., the receiver
27 * and sender are completely event-driven.
29 * 2. The receiver and sender in this test can work in full duplex
30 * mode, i.e., input and ouput events are processed independently.
31 * Half-duplex mode (request-reply) is also supported.
33 * This test is therefore a bit more stressful than the
34 * Thread_Pool_Reactor.cpp for the ACE_TP_Reactor since same
35 * thread pool is shared between client and server.
37 * This test is a "twin" of the Proactor_Test.cpp, so it can help for
38 * developers to provide independent of Reactor/Proactor solutions.
40 * @author Alexander Libman <alibman@ihug.com.au>
41 * @author <alexl@rumblgroup.com>
43 //=============================================================================
46 #include "test_config.h"
48 #if defined(ACE_HAS_THREADS)
50 #include "TP_Reactor_Test.h"
52 #include "ace/Signal.h"
53 #include "ace/Service_Config.h"
54 #include "ace/Get_Opt.h"
56 #include "ace/Reactor.h"
57 #include "ace/TP_Reactor.h"
58 #include "ace/OS_NS_signal.h"
59 #include "ace/OS_NS_stdio.h"
60 #include "ace/OS_NS_string.h"
61 #include "ace/OS_NS_unistd.h"
62 #include "ace/Synch_Traits.h"
63 #include "ace/Thread_Semaphore.h"
65 // Some debug helper functions
66 static int disable_signal (int sigmin, int sigmax);
68 // both: 0 run client or server / depends on host
69 // != 0 run client and server
70 static int both = 0;
72 // Host that we're connecting to.
73 static const ACE_TCHAR *host = 0;
75 // number of Senders instances
76 static int senders = 1;
78 // duplex mode: == 0 half-duplex
79 // != 0 full duplex
80 static int duplex = 0;
82 // number threads in the TP_Reactor thread pool
83 static int threads = 1;
85 // Port that we're receiving connections on.
86 static u_short port = ACE_DEFAULT_SERVER_PORT;
88 // Log options
89 static int loglevel = 1; // 0 full , 1 only errors
91 static const size_t MIN_TIME = 1; // min 1 sec
92 static const size_t MAX_TIME = 3600; // max 1 hour
93 static u_int seconds = 2; // default time to run - 2 seconds
95 static char data[] =
96 "GET / HTTP/1.1\r\n"
97 "Accept: */*\r\n"
98 "Accept-Language: C++\r\n"
99 "Accept-Encoding: gzip, deflate\r\n"
100 "User-Agent: TPReactor_Test/1.0 (non-compatible)\r\n"
101 "Connection: Keep-Alive\r\n"
102 "\r\n" ;
104 // *************************************************************
106 class LogLocker
108 public:
110 LogLocker () { ACE_LOG_MSG->acquire (); }
111 virtual ~LogLocker () { ACE_LOG_MSG->release (); }
113 // *************************************************************
116 * @class MyTask
118 * MyTask plays role for TP_Reactor threads pool
120 * MyTask is ACE_Task resposible for:
121 * 1. Creation and deletion of TP_Reactor and TP_Reactor thread pool
122 * 2. Running TP_Reactor event loop
124 class MyTask : public ACE_Task<ACE_MT_SYNCH>
126 public:
127 MyTask (void): sem_ ((unsigned int) 0),
128 my_reactor_ (0) {}
130 virtual ~MyTask () { stop (); }
132 virtual int svc (void);
134 int start (int num_threads);
135 int stop (void);
137 private:
138 int create_reactor (void);
139 int delete_reactor (void);
141 ACE_SYNCH_RECURSIVE_MUTEX lock_;
142 ACE_Thread_Semaphore sem_;
143 ACE_Reactor *my_reactor_;
147 MyTask::create_reactor (void)
149 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
150 monitor,
151 this->lock_,
152 -1);
154 ACE_TEST_ASSERT (this->my_reactor_ == 0);
156 ACE_TP_Reactor * pImpl = 0;
158 ACE_NEW_RETURN (pImpl,ACE_TP_Reactor, -1);
160 ACE_NEW_RETURN (my_reactor_,
161 ACE_Reactor (pImpl ,1),
162 -1);
164 ACE_DEBUG ((LM_DEBUG,
165 ACE_TEXT (" (%t) Create TP_Reactor\n")));
167 ACE_Reactor::instance (this->my_reactor_);
169 this->reactor (my_reactor_);
171 return 0;
175 MyTask::delete_reactor (void)
177 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
178 monitor,
179 this->lock_,
180 -1);
182 ACE_DEBUG ((LM_DEBUG,
183 ACE_TEXT (" (%t) Delete TP_Reactor\n")));
185 delete this->my_reactor_;
186 ACE_Reactor::instance ((ACE_Reactor *) 0);
187 this->my_reactor_ = 0;
188 this->reactor (0);
190 return 0;
194 MyTask::start (int num_threads)
196 if (this->create_reactor () == -1)
197 ACE_ERROR_RETURN ((LM_ERROR,
198 ACE_TEXT ("%p.\n"),
199 ACE_TEXT ("unable to create reactor")),
200 -1);
202 if (this->activate (THR_NEW_LWP, num_threads) == -1)
203 ACE_ERROR_RETURN ((LM_ERROR,
204 ACE_TEXT ("%p.\n"),
205 ACE_TEXT ("unable to activate thread pool")),
206 -1);
208 for (; num_threads > 0 ; num_threads--)
209 sem_.acquire ();
211 return 0;
216 MyTask::stop (void)
218 if (this->my_reactor_ != 0)
220 ACE_DEBUG ((LM_DEBUG,
221 ACE_TEXT ("End TP_Reactor event loop\n")));
223 ACE_Reactor::instance()->end_reactor_event_loop ();
226 if (this->wait () == -1)
227 ACE_ERROR ((LM_ERROR,
228 ACE_TEXT ("%p.\n"),
229 ACE_TEXT ("unable to stop thread pool")));
231 if (this->delete_reactor () == -1)
232 ACE_ERROR ((LM_ERROR,
233 ACE_TEXT ("%p.\n"),
234 ACE_TEXT ("unable to delete reactor")));
236 return 0;
240 MyTask::svc (void)
242 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask started\n")));
244 disable_signal (SIGPIPE, SIGPIPE);
246 // signal that we are ready
247 sem_.release (1);
249 while (ACE_Reactor::instance()->reactor_event_loop_done () == 0)
250 ACE_Reactor::instance()->run_reactor_event_loop ();
252 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask finished\n")));
253 return 0;
256 // *************************************************************
258 Acceptor::Acceptor (void)
259 : ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> ((ACE_Reactor *) 0),
260 sessions_ (0),
261 total_snd_(0),
262 total_rcv_(0),
263 total_w_ (0),
264 total_r_ (0)
266 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
268 for (size_t i = 0; i < MAX_RECEIVERS; ++i)
269 this->list_receivers_[i] =0;
272 Acceptor::~Acceptor (void)
274 this->reactor (0);
275 stop ();
278 void
279 Acceptor::stop (void)
281 // this method can be called only after reactor event loop id done
282 // in all threads
284 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
286 for (size_t i = 0; i < MAX_RECEIVERS; ++i)
288 delete this->list_receivers_[i];
289 this->list_receivers_[i] =0;
293 void
294 Acceptor::on_new_receiver (Receiver &rcvr)
296 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
297 this->sessions_++;
298 this->list_receivers_[rcvr.index_] = & rcvr;
299 ACE_DEBUG ((LM_DEBUG,
300 "Receiver::CTOR sessions_=%d\n",
301 this->sessions_));
304 void
305 Acceptor::on_delete_receiver (Receiver &rcvr)
307 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
309 this->sessions_--;
311 this->total_snd_ += rcvr.get_total_snd ();
312 this->total_rcv_ += rcvr.get_total_rcv ();
313 this->total_w_ += rcvr.get_total_w ();
314 this->total_r_ += rcvr.get_total_r ();
316 if (rcvr.index_ < MAX_RECEIVERS
317 && this->list_receivers_[rcvr.index_] == &rcvr)
318 this->list_receivers_[rcvr.index_] = 0;
320 ACE_TCHAR bufs [256];
321 ACE_TCHAR bufr [256];
323 ACE_OS::snprintf (bufs, 256, ACE_TEXT ("%ld(%ld)"),
324 rcvr.get_total_snd (),
325 rcvr.get_total_w ());
327 ACE_OS::snprintf (bufr, 256, ACE_TEXT ("%ld(%ld)"),
328 rcvr.get_total_rcv (),
329 rcvr.get_total_r ());
331 ACE_DEBUG ((LM_DEBUG,
332 ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
333 rcvr.index_,
334 bufs,
335 bufr,
336 this->sessions_));
340 Acceptor::start (const ACE_INET_Addr &addr)
342 if (ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR>::open (addr,
343 ACE_Reactor::instance (),
344 ACE_NONBLOCK) < 0)
345 ACE_ERROR_RETURN ((LM_ERROR,
346 ACE_TEXT("%p\n"),
347 ACE_TEXT("Acceptor::start () - open failed")),
349 return 1;
353 Acceptor::make_svc_handler (Receiver *&sh)
355 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
357 if (sessions_ >= MAX_RECEIVERS)
358 return -1;
360 for (size_t i = 0; i < MAX_RECEIVERS; ++i)
361 if (this->list_receivers_ [i] == 0)
363 ACE_NEW_RETURN (sh,
364 Receiver (this , i),
365 -1);
366 return 0;
368 return -1;
371 // *************************************************************
373 Receiver::Receiver (Acceptor * acceptor, size_t index)
374 : acceptor_ (acceptor),
375 index_ (index),
376 flg_mask_ (ACE_Event_Handler::NULL_MASK),
377 total_snd_(0),
378 total_rcv_(0),
379 total_w_ (0),
380 total_r_ (0)
382 if (acceptor_ != 0)
383 acceptor_->on_new_receiver (*this);
387 Receiver::~Receiver (void)
389 this->reactor (0);
390 if (acceptor_ != 0)
391 acceptor_->on_delete_receiver (*this);
393 this->index_ = 0;
395 for (; ;)
397 ACE_Time_Value tv = ACE_Time_Value::zero;
398 ACE_Message_Block *mb = 0;
400 if (this->getq (mb, &tv) < 0)
401 break;
403 ACE_Message_Block::release (mb);
408 Receiver::check_destroy (void)
410 if (flg_mask_ == ACE_Event_Handler::NULL_MASK)
411 return -1;
413 return 0;
417 Receiver::open (void *)
419 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
421 ACE_Reactor *TPReactor = ACE_Reactor::instance ();
423 this->reactor (TPReactor);
425 flg_mask_ = ACE_Event_Handler::NULL_MASK ;
427 if (TPReactor->register_handler (this, flg_mask_) == -1)
428 return -1;
430 initiate_io (ACE_Event_Handler::READ_MASK);
432 return check_destroy ();
436 Receiver::initiate_io (ACE_Reactor_Mask mask)
438 if (ACE_BIT_ENABLED (flg_mask_, mask))
439 return 0;
441 if (ACE_Reactor::instance ()->schedule_wakeup (this, mask) == -1)
442 return -1;
444 ACE_SET_BITS (flg_mask_, mask);
445 return 0;
449 Receiver::terminate_io (ACE_Reactor_Mask mask)
451 if (ACE_BIT_DISABLED (flg_mask_, mask))
452 return 0;
454 if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1)
455 return -1;
457 ACE_CLR_BITS (flg_mask_, mask);
458 return 0;
462 Receiver::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
464 ACE_Reactor *TPReactor = ACE_Reactor::instance ();
466 TPReactor->remove_handler (this,
467 ACE_Event_Handler::ALL_EVENTS_MASK |
468 ACE_Event_Handler::DONT_CALL); // Don't call handle_close
469 this->reactor (0);
470 this->destroy ();
471 return 0;
475 Receiver::handle_input (ACE_HANDLE h)
477 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
479 ACE_Message_Block *mb = 0;
480 ACE_NEW_RETURN (mb,
481 ACE_Message_Block (BUFSIZ),
482 -1);
484 int err = 0;
485 ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1);
487 this->total_r_++;
489 if (res >= 0)
491 mb->wr_ptr (res);
492 this->total_rcv_ += res;
494 else
495 err = errno ;
497 mb->wr_ptr ()[0] = '\0';
499 if (loglevel == 0 || res <= 0 || err!= 0)
501 LogLocker log_lock;
503 ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_input () SessionId=%d****\n", index_));
504 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_to_read", BUFSIZ));
505 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "handle", h));
506 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_transferred", res));
507 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "error", err));
508 ACE_DEBUG ((LM_DEBUG, "%C = %s\n", "message_block", mb->rd_ptr ()));
509 ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
512 if (err == EWOULDBLOCK)
514 err=0;
515 res=0;
516 return check_destroy ();
519 if (err !=0 || res <= 0)
521 ACE_Message_Block::release (mb);
522 return -1;
525 ACE_Time_Value tv = ACE_Time_Value::zero;
527 int qcount = this->putq (mb, & tv);
529 if (qcount <= 0) // failed to putq
531 ACE_Message_Block::release (mb);
532 return -1 ;
535 int rc = 0;
537 if (duplex == 0) // half-duplex , stop read
538 rc = this->terminate_io (ACE_Event_Handler::READ_MASK);
539 else // full duplex
541 if (qcount >= 20 ) // flow control, stop read
542 rc = this->terminate_io (ACE_Event_Handler::READ_MASK);
543 else
544 rc = this->initiate_io (ACE_Event_Handler::READ_MASK);
547 if (rc == -1)
548 return -1;
550 //initiate write
551 if (this->initiate_io (ACE_Event_Handler::WRITE_MASK) != 0)
552 return -1;
554 return check_destroy ();
558 Receiver::handle_output (ACE_HANDLE h)
560 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
562 ACE_Time_Value tv = ACE_Time_Value::zero;
563 ACE_Message_Block *mb = 0;
565 int err = 0;
566 ssize_t res = 0;
567 size_t bytes = 0;
569 int qcount = this->getq (mb, &tv);
571 if (mb != 0) // qcount >= 0)
573 bytes = mb->length ();
574 res = this->peer ().send (mb->rd_ptr (), bytes);
576 this->total_w_++;
578 if (res < 0)
579 err = errno ;
580 else
581 this->total_snd_ += res;
584 if (loglevel == 0 || res <= 0 || err!= 0)
586 LogLocker log_lock;
588 ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_output () SessionId=%d****\n", index_));
589 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_to_write", bytes));
590 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "handle", h));
591 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_transferred", res));
592 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "error", err));
593 ACE_DEBUG ((LM_DEBUG, "%C = %s\n", "message_block", mb->rd_ptr ()));
594 ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
598 ACE_Message_Block::release (mb);
600 if (err != 0 || res < 0)
601 return -1;
603 if (qcount <= 0) // no more message blocks in queue
605 if (this->terminate_io (ACE_Event_Handler::WRITE_MASK) != 0)
606 return -1;
608 if (this->initiate_io (ACE_Event_Handler::READ_MASK) != 0)
609 return -1;
612 return check_destroy ();
615 // *************************************************************
617 Connector::Connector (void)
618 : ACE_Connector<Sender,ACE_SOCK_CONNECTOR> ((ACE_Reactor *) 0),
619 sessions_ (0),
620 total_snd_(0),
621 total_rcv_(0),
622 total_w_ (0),
623 total_r_ (0)
625 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
627 for (size_t i = 0; i < MAX_SENDERS; ++i)
628 this->list_senders_[i] = 0;
631 Connector::~Connector (void)
633 this->reactor (0);
634 stop ();
637 void
638 Connector::stop ()
640 // this method can be called only
641 // after reactor event loop id done
642 // in all threads
644 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
646 for (size_t i = 0; i < MAX_SENDERS; ++i)
648 delete this->list_senders_[i];
649 this->list_senders_[i] =0;
653 void
654 Connector::on_new_sender (Sender & sndr)
656 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
657 this->sessions_++;
658 this->list_senders_[sndr.index_] = &sndr;
659 ACE_DEBUG ((LM_DEBUG,
660 "Sender::CTOR sessions_=%d\n",
661 this->sessions_));
664 void
665 Connector::on_delete_sender (Sender & sndr)
667 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, this->mutex_);
669 this->sessions_--;
670 this->total_snd_ += sndr.get_total_snd();
671 this->total_rcv_ += sndr.get_total_rcv();
672 this->total_w_ += sndr.get_total_w();
673 this->total_r_ += sndr.get_total_r();
675 if (sndr.index_ < MAX_SENDERS
676 && this->list_senders_[sndr.index_] == &sndr)
677 this->list_senders_[sndr.index_] = 0;
679 ACE_TCHAR bufs [256];
680 ACE_TCHAR bufr [256];
682 ACE_OS::snprintf (bufs, 256, ACE_TEXT ("%ld(%ld)"),
683 sndr.get_total_snd (),
684 sndr.get_total_w ());
686 ACE_OS::snprintf (bufr, 256, ACE_TEXT ("%ld(%ld)"),
687 sndr.get_total_rcv (),
688 sndr.get_total_r ());
690 ACE_DEBUG ((LM_DEBUG,
691 ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
692 sndr.index_,
693 bufs,
694 bufr,
695 this->sessions_));
700 Connector::start (const ACE_INET_Addr & addr, int num)
703 if (ACE_Connector<Sender,ACE_SOCK_CONNECTOR>::open (ACE_Reactor::instance (),
704 ACE_NONBLOCK) < 0)
705 ACE_ERROR_RETURN
706 ((LM_ERROR,
707 ACE_TEXT("%p\n"),
708 ACE_TEXT("Connector::start () - open failed")),
711 int rc = 0;
713 for (int i = 0 ; i < num ; i++)
715 Sender * sender = 0;
717 if (ACE_Connector<Sender,ACE_SOCK_CONNECTOR>::connect (sender, addr) < 0)
718 ACE_ERROR_RETURN
719 ((LM_ERROR,
720 ACE_TEXT("%p\n"),
721 ACE_TEXT("Connector::start () - connect failed")),
722 rc);
725 return rc;
729 Connector::make_svc_handler (Sender * & sh)
731 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
733 if (sessions_ >= MAX_SENDERS)
734 return -1;
736 for (size_t i = 0; i < MAX_SENDERS; ++i)
737 if (this->list_senders_ [i] == 0)
739 ACE_NEW_RETURN (sh,
740 Sender (this , i),
741 -1);
742 return 0;
745 return -1;
748 // *************************************************************
750 Sender::Sender (Connector* connector, size_t index)
751 : connector_ (connector),
752 index_ (index),
753 flg_mask_ (ACE_Event_Handler::NULL_MASK),
754 total_snd_(0),
755 total_rcv_(0),
756 total_w_ (0),
757 total_r_ (0)
759 if (connector_ != 0)
760 connector_->on_new_sender (*this);
762 ACE_OS::snprintf (send_buf_, 1024, "%s", data);
766 Sender::~Sender (void)
768 this->reactor (0);
769 if (connector_ != 0)
770 connector_->on_delete_sender (*this);
772 this->index_ = 0;
774 for (; ;)
776 ACE_Time_Value tv = ACE_Time_Value::zero;
777 ACE_Message_Block *mb = 0;
779 if (this->getq (mb, &tv) < 0)
780 break;
782 ACE_Message_Block::release (mb);
787 Sender::check_destroy (void)
789 if (flg_mask_ == ACE_Event_Handler::NULL_MASK)
790 return -1;
792 return 0;
795 int Sender::open (void *)
797 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
799 ACE_Reactor * TPReactor = ACE_Reactor::instance ();
801 this->reactor (TPReactor);
803 flg_mask_ = ACE_Event_Handler::NULL_MASK ;
805 if (TPReactor->register_handler (this,flg_mask_) == -1)
806 return -1;
808 if (this->initiate_write () == -1)
809 return -1;
811 if (duplex != 0)
812 initiate_io (ACE_Event_Handler::READ_MASK);
814 return check_destroy ();
818 Sender::initiate_write (void)
820 if ( this->msg_queue ()->message_count () < 20) // flow control
822 size_t nbytes = ACE_OS::strlen (send_buf_);
824 ACE_Message_Block *mb = 0;
825 ACE_NEW_RETURN (mb,
826 ACE_Message_Block (nbytes+8),
827 -1);
829 mb->init (send_buf_, nbytes);
830 mb->rd_ptr (mb->base ());
831 mb->wr_ptr (mb->base ());
832 mb->wr_ptr (nbytes);
834 ACE_Time_Value tv = ACE_Time_Value::zero;
836 int qcount =this->putq (mb, & tv);
838 if (qcount <= 0)
840 ACE_Message_Block::release (mb);
841 return -1;
845 return initiate_io (ACE_Event_Handler::WRITE_MASK);
849 Sender::initiate_io (ACE_Reactor_Mask mask)
851 if (ACE_BIT_ENABLED (flg_mask_, mask))
852 return 0;
854 if (ACE_Reactor::instance ()->schedule_wakeup (this, mask) == -1)
855 return -1;
857 ACE_SET_BITS (flg_mask_, mask);
858 return 0;
862 Sender::terminate_io (ACE_Reactor_Mask mask)
864 if (ACE_BIT_DISABLED (flg_mask_, mask))
865 return 0;
867 if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1)
868 return -1;
870 ACE_CLR_BITS (flg_mask_, mask);
871 return 0;
875 Sender::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
877 ACE_Reactor * TPReactor = ACE_Reactor::instance ();
879 TPReactor->remove_handler (this,
880 ACE_Event_Handler::ALL_EVENTS_MASK |
881 ACE_Event_Handler::DONT_CALL); // Don't call handle_close
882 this->reactor (0);
883 this->destroy ();
884 return 0;
888 Sender::handle_input (ACE_HANDLE h)
890 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
892 ACE_Message_Block *mb = 0;
893 ACE_NEW_RETURN (mb,
894 ACE_Message_Block (BUFSIZ),
895 -1);
897 int err = 0;
898 ssize_t res = this->peer ().recv (mb->rd_ptr (),
899 BUFSIZ-1);
900 this->total_r_++;
902 if (res >= 0)
904 mb->wr_ptr (res);
905 this->total_rcv_ += res;
907 else
908 err = errno ;
910 mb->wr_ptr ()[0] = '\0';
912 if (loglevel == 0 || res <= 0 || err!= 0)
914 LogLocker log_lock;
916 ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_input () SessionId=%d****\n", index_));
917 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_to_read", BUFSIZ));
918 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "handle", h));
919 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_transferred", res));
920 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "error", err));
921 ACE_DEBUG ((LM_DEBUG, "%C = %s\n", "message_block", mb->rd_ptr ()));
922 ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
925 ACE_Message_Block::release (mb);
927 if (err == EWOULDBLOCK)
929 err=0;
930 res=0;
931 return check_destroy ();
934 if (err !=0 || res <= 0)
935 return -1;
937 int rc = 0;
939 if (duplex != 0) // full duplex, continue read
940 rc = initiate_io (ACE_Event_Handler::READ_MASK);
941 else
942 rc = terminate_io (ACE_Event_Handler::READ_MASK);
944 if (rc != 0)
945 return -1 ;
947 rc = initiate_write ();
948 if (rc != 0)
949 return -1;
951 return check_destroy ();
955 Sender::handle_output (ACE_HANDLE h)
957 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, this->mutex_, -1);
959 ACE_Time_Value tv = ACE_Time_Value::zero;
960 ACE_Message_Block *mb = 0;
962 int err=0;
963 ssize_t res=0;
964 size_t bytes=0;
966 int qcount = this->getq (mb , & tv);
968 if (mb != 0) // qcount >= 0
970 bytes = mb->length ();
971 res = this->peer ().send (mb->rd_ptr (), bytes);
973 this->total_w_++;
975 if (res < 0)
976 err = errno ;
977 else
978 this->total_snd_ += res;
980 if (loglevel == 0 || res <= 0 || err!= 0)
982 LogLocker log_lock;
984 ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_output () SessionId=%d****\n", index_));
985 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_to_write", bytes));
986 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "handle", h));
987 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "bytes_transferred", res));
988 ACE_DEBUG ((LM_DEBUG, "%C = %d\n", "error", err));
989 ACE_DEBUG ((LM_DEBUG, "%C = %s\n", "message_block", mb->rd_ptr ()));
990 ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
994 ACE_Message_Block::release (mb);
996 if (err != 0 || res < 0)
997 return -1;
999 int rc = 0;
1001 if (qcount <= 0) // no more message blocks in queue
1003 if (duplex != 0 && // full duplex, continue write
1004 (this->total_snd_ - this->total_rcv_ ) < 1024*32 ) // flow control
1005 rc = initiate_write ();
1006 else
1007 rc = terminate_io (ACE_Event_Handler::WRITE_MASK);
1009 if (rc == -1)
1010 return -1;
1013 rc = initiate_io (ACE_Event_Handler::READ_MASK);
1014 if (rc == -1)
1015 return -1;
1017 return check_destroy ();
1021 // *************************************************************
1022 // Configuration helpers
1023 // *************************************************************
1025 print_usage (int /* argc */, ACE_TCHAR *argv[])
1027 ACE_ERROR
1028 ((LM_ERROR,
1029 ACE_TEXT ("\nusage: %s")
1030 ACE_TEXT ("\n-n <number threads in the thread pool>")
1031 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1032 ACE_TEXT ("\n-p <port to listen/connect>")
1033 ACE_TEXT ("\n-h <host> for Sender mode")
1034 ACE_TEXT ("\n-s <number of sender's instances>")
1035 ACE_TEXT ("\n-b run client and server at the same time")
1036 ACE_TEXT ("\n-v log level")
1037 ACE_TEXT ("\n 0 - log all messages")
1038 ACE_TEXT ("\n 1 - log only errors and unusual cases")
1039 ACE_TEXT ("\n-i time to run in seconds")
1040 ACE_TEXT ("\n-u show this message")
1041 ACE_TEXT ("\n"),
1042 argv[0]
1044 return -1;
1048 parse_args (int argc, ACE_TCHAR *argv[])
1050 if (argc == 1) // no arguments , so one button test
1052 both = 1; // client and server simultaneosly
1053 duplex = 1; // full duplex is on
1054 host = ACE_LOCALHOST; // server to connect
1055 port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
1056 threads = 3; // size of Proactor thread pool
1057 senders = 20; // number of senders
1058 loglevel = 1; // log level : 0 full/ 1 only errors
1059 seconds = 20; // time to run in seconds
1060 #if defined(SOMAXCONN) // The test is invalid if senders > SOMAXCONN
1061 if(SOMAXCONN < senders)
1062 senders = SOMAXCONN;
1063 #endif
1064 return 0;
1067 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("i:n:p:d:h:s:v:ub"));
1068 int c;
1070 while ((c = get_opt ()) != EOF)
1072 switch (c)
1074 case 'i': // time to run
1075 seconds = ACE_OS::atoi (get_opt.opt_arg());
1076 if (seconds < MIN_TIME)
1077 seconds = MIN_TIME;
1078 if (seconds > MAX_TIME)
1079 seconds = MAX_TIME;
1080 break;
1081 case 'b': // both client and server
1082 both = 1;
1083 break;
1084 case 'v': // log level
1085 loglevel = ACE_OS::atoi (get_opt.opt_arg());
1086 break;
1087 case 'd': // duplex
1088 duplex = ACE_OS::atoi (get_opt.opt_arg());
1089 break;
1090 case 'h': // host for sender
1091 host = get_opt.opt_arg();
1092 break;
1093 case 'p': // port number
1094 port = ACE_OS::atoi (get_opt.opt_arg());
1095 break;
1096 case 'n': // thread pool size
1097 threads = ACE_OS::atoi (get_opt.opt_arg());
1098 break;
1099 case 's': // number of senders
1100 senders = ACE_OS::atoi (get_opt.opt_arg());
1101 if (size_t (senders) > MAX_SENDERS)
1102 senders = MAX_SENDERS;
1103 break;
1104 case 'u':
1105 default:
1106 return print_usage (argc,argv);
1107 } // switch
1108 } // while
1110 return 0;
1113 static int
1114 disable_signal (int sigmin, int sigmax)
1116 #if !defined (ACE_LACKS_UNIX_SIGNALS)
1117 sigset_t signal_set;
1118 if (ACE_OS::sigemptyset (&signal_set) == - 1)
1119 ACE_ERROR ((LM_ERROR,
1120 ACE_TEXT("Error: (%P | %t):%p\n"),
1121 ACE_TEXT("sigemptyset failed")));
1123 for (int i = sigmin; i <= sigmax; i++)
1124 ACE_OS::sigaddset (&signal_set, i);
1126 // Put the <signal_set>.
1127 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
1128 // In multi-threaded application this is not POSIX compliant
1129 // but let's leave it just in case.
1130 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
1131 # else
1132 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
1133 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
1134 ACE_ERROR_RETURN ((LM_ERROR,
1135 ACE_TEXT ("Error: (%P|%t): %p\n"),
1136 ACE_TEXT ("SIG_BLOCK failed")),
1137 -1);
1138 #else
1139 ACE_UNUSED_ARG(sigmin);
1140 ACE_UNUSED_ARG(sigmax);
1141 #endif /* ACE_LACKS_UNIX_SIGNALS */
1143 return 0;
1146 #endif /* ACE_HAS_THREADS */
1149 run_main (int argc, ACE_TCHAR *argv[])
1151 ACE_START_TEST (ACE_TEXT ("TP_Reactor_Test"));
1153 #if defined(ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
1154 if (::parse_args (argc, argv) == -1)
1155 return -1;
1157 disable_signal (SIGPIPE, SIGPIPE);
1159 MyTask task1;
1160 Acceptor acceptor;
1161 Connector connector;
1163 if (task1.start (threads) == 0)
1165 int rc = 0;
1167 ACE_INET_Addr addr (port);
1168 if (both != 0 || host == 0) // Acceptor
1169 rc += acceptor.start (addr);
1171 if (both != 0 || host != 0)
1173 if (host == 0)
1174 host = ACE_LOCALHOST;
1176 if (addr.set (port, host, 1, addr.get_type ()) == -1)
1177 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
1178 rc += connector.start (addr, senders);
1182 if (rc > 0)
1183 ACE_OS::sleep (seconds);
1186 task1.stop ();
1188 ACE_DEBUG ((LM_DEBUG,
1189 ACE_TEXT ("\nNumber of Receivers objects = %d\n")
1190 ACE_TEXT ("\nNumber of Sender objects = %d\n"),
1191 acceptor.get_number_sessions (),
1192 connector.get_number_sessions ()));
1194 // As Reactor event loop now is inactive it is safe to destroy all
1195 // senders
1197 connector.stop ();
1198 acceptor.stop ();
1200 //Print statistic
1201 ACE_TCHAR bufs [256];
1202 ACE_TCHAR bufr [256];
1204 ACE_OS::snprintf (bufs, 256, ACE_TEXT ("%ld(%ld)"),
1205 connector.get_total_snd (),
1206 connector.get_total_w ());
1208 ACE_OS::snprintf (bufr, 256, ACE_TEXT ("%ld(%ld)"),
1209 connector.get_total_rcv (),
1210 connector.get_total_r ());
1212 ACE_DEBUG ((LM_DEBUG,
1213 ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
1214 bufs,
1215 bufr
1218 ACE_OS::snprintf (bufs, 256, ACE_TEXT ("%ld(%ld)"),
1219 acceptor.get_total_snd (),
1220 acceptor.get_total_w ());
1222 ACE_OS::snprintf (bufr, 256, ACE_TEXT ("%ld(%ld)"),
1223 acceptor.get_total_rcv (),
1224 acceptor.get_total_r ());
1226 ACE_DEBUG ((LM_DEBUG,
1227 ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),
1228 bufs,
1229 bufr
1232 #else /* ACE_HAS_THREADS */
1233 ACE_UNUSED_ARG( argc );
1234 ACE_UNUSED_ARG( argv );
1235 #endif /* ACE_HAS_THREADS */
1237 ACE_END_TEST;
1239 return 0;