2 //=============================================================================
4 * @file TP_Reactor_Test.cpp
6 * This program illustrates how the <ACE_TP_Reactor> can be used to
7 * implement an application that does various operations.
8 * usage: TP_Reactor_Test
9 * -n number threads in the TP_Reactor thread pool
10 * -d duplex mode 1 (full-duplex) vs. 0 (half-duplex)
11 * -p port to listen(Server)/connect(Client)
12 * -h host to connect (Client mode)
13 * -s number of sender's instances ( Client mode)
14 * -b run client and server (both modes ) at the same time
16 * 0 - log all messages
17 * 1 - log only errors and unusual cases
18 * -i time to run in seconds
19 * -u show this message
21 * The main differences between Thread_Pool_Reactor_Test.cpp and
24 * 1. Thread_Pool_Reactor_Test.cpp tests only handle_input()
25 * events on the server, whereas this one tests both handle_input() and
26 * handle_output() on both server and client, i.e., the receiver
27 * and sender are completely event-driven.
29 * 2. The receiver and sender in this test can work in full duplex
30 * mode, i.e., input and ouput events are processed independently.
31 * Half-duplex mode (request-reply) is also supported.
33 * This test is therefore a bit more stressful than the
34 * Thread_Pool_Reactor.cpp for the ACE_TP_Reactor since same
35 * thread pool is shared between client and server.
37 * This test is a "twin" of the Proactor_Test.cpp, so it can help for
38 * developers to provide independent of Reactor/Proactor solutions.
40 * @author Alexander Libman <alibman@ihug.com.au>
41 * @author <alexl@rumblgroup.com>
43 //=============================================================================
46 #include "test_config.h"
48 #if defined(ACE_HAS_THREADS)
50 #include "TP_Reactor_Test.h"
52 #include "ace/Signal.h"
53 #include "ace/Service_Config.h"
54 #include "ace/Get_Opt.h"
56 #include "ace/Reactor.h"
57 #include "ace/TP_Reactor.h"
58 #include "ace/OS_NS_signal.h"
59 #include "ace/OS_NS_stdio.h"
60 #include "ace/OS_NS_string.h"
61 #include "ace/OS_NS_unistd.h"
62 #include "ace/Synch_Traits.h"
63 #include "ace/Thread_Semaphore.h"
65 // Some debug helper functions
66 static int disable_signal (int sigmin
, int sigmax
);
68 // both: 0 run client or server / depends on host
69 // != 0 run client and server
72 // Host that we're connecting to.
73 static const ACE_TCHAR
*host
= 0;
75 // number of Senders instances
76 static int senders
= 1;
78 // duplex mode: == 0 half-duplex
80 static int duplex
= 0;
82 // number threads in the TP_Reactor thread pool
83 static int threads
= 1;
85 // Port that we're receiving connections on.
86 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
89 static int loglevel
= 1; // 0 full , 1 only errors
91 static const size_t MIN_TIME
= 1; // min 1 sec
92 static const size_t MAX_TIME
= 3600; // max 1 hour
93 static u_int seconds
= 2; // default time to run - 2 seconds
98 "Accept-Language: C++\r\n"
99 "Accept-Encoding: gzip, deflate\r\n"
100 "User-Agent: TPReactor_Test/1.0 (non-compatible)\r\n"
101 "Connection: Keep-Alive\r\n"
104 // *************************************************************
110 LogLocker () { ACE_LOG_MSG
->acquire (); }
111 virtual ~LogLocker () { ACE_LOG_MSG
->release (); }
113 // *************************************************************
118 * MyTask plays role for TP_Reactor threads pool
120 * MyTask is ACE_Task resposible for:
121 * 1. Creation and deletion of TP_Reactor and TP_Reactor thread pool
122 * 2. Running TP_Reactor event loop
124 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
127 MyTask (void): sem_ ((unsigned int) 0),
130 virtual ~MyTask () { stop (); }
132 virtual int svc (void);
134 int start (int num_threads
);
138 int create_reactor (void);
139 int delete_reactor (void);
141 ACE_SYNCH_RECURSIVE_MUTEX lock_
;
142 ACE_Thread_Semaphore sem_
;
143 ACE_Reactor
*my_reactor_
;
147 MyTask::create_reactor (void)
149 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
154 ACE_TEST_ASSERT (this->my_reactor_
== 0);
156 ACE_TP_Reactor
* pImpl
= 0;
158 ACE_NEW_RETURN (pImpl
,ACE_TP_Reactor
, -1);
160 ACE_NEW_RETURN (my_reactor_
,
161 ACE_Reactor (pImpl
,1),
164 ACE_DEBUG ((LM_DEBUG
,
165 ACE_TEXT (" (%t) Create TP_Reactor\n")));
167 ACE_Reactor::instance (this->my_reactor_
);
169 this->reactor (my_reactor_
);
175 MyTask::delete_reactor (void)
177 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
182 ACE_DEBUG ((LM_DEBUG
,
183 ACE_TEXT (" (%t) Delete TP_Reactor\n")));
185 delete this->my_reactor_
;
186 ACE_Reactor::instance ((ACE_Reactor
*) 0);
187 this->my_reactor_
= 0;
194 MyTask::start (int num_threads
)
196 if (this->create_reactor () == -1)
197 ACE_ERROR_RETURN ((LM_ERROR
,
199 ACE_TEXT ("unable to create reactor")),
202 if (this->activate (THR_NEW_LWP
, num_threads
) == -1)
203 ACE_ERROR_RETURN ((LM_ERROR
,
205 ACE_TEXT ("unable to activate thread pool")),
208 for (; num_threads
> 0 ; num_threads
--)
218 if (this->my_reactor_
!= 0)
220 ACE_DEBUG ((LM_DEBUG
,
221 ACE_TEXT ("End TP_Reactor event loop\n")));
223 ACE_Reactor::instance()->end_reactor_event_loop ();
226 if (this->wait () == -1)
227 ACE_ERROR ((LM_ERROR
,
229 ACE_TEXT ("unable to stop thread pool")));
231 if (this->delete_reactor () == -1)
232 ACE_ERROR ((LM_ERROR
,
234 ACE_TEXT ("unable to delete reactor")));
242 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" (%t) MyTask started\n")));
244 disable_signal (SIGPIPE
, SIGPIPE
);
246 // signal that we are ready
249 while (ACE_Reactor::instance()->reactor_event_loop_done () == 0)
250 ACE_Reactor::instance()->run_reactor_event_loop ();
252 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" (%t) MyTask finished\n")));
256 // *************************************************************
258 Acceptor::Acceptor (void)
259 : ACE_Acceptor
<Receiver
,ACE_SOCK_ACCEPTOR
> ((ACE_Reactor
*) 0),
266 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
268 for (size_t i
= 0; i
< MAX_RECEIVERS
; ++i
)
269 this->list_receivers_
[i
] =0;
272 Acceptor::~Acceptor (void)
279 Acceptor::stop (void)
281 // this method can be called only after reactor event loop id done
284 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
286 for (size_t i
= 0; i
< MAX_RECEIVERS
; ++i
)
288 delete this->list_receivers_
[i
];
289 this->list_receivers_
[i
] =0;
294 Acceptor::on_new_receiver (Receiver
&rcvr
)
296 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
298 this->list_receivers_
[rcvr
.index_
] = & rcvr
;
299 ACE_DEBUG ((LM_DEBUG
,
300 "Receiver::CTOR sessions_=%d\n",
305 Acceptor::on_delete_receiver (Receiver
&rcvr
)
307 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
311 this->total_snd_
+= rcvr
.get_total_snd ();
312 this->total_rcv_
+= rcvr
.get_total_rcv ();
313 this->total_w_
+= rcvr
.get_total_w ();
314 this->total_r_
+= rcvr
.get_total_r ();
316 if (rcvr
.index_
< MAX_RECEIVERS
317 && this->list_receivers_
[rcvr
.index_
] == &rcvr
)
318 this->list_receivers_
[rcvr
.index_
] = 0;
320 ACE_TCHAR bufs
[256];
321 ACE_TCHAR bufr
[256];
323 ACE_OS::snprintf (bufs
, 256, ACE_TEXT ("%ld(%ld)"),
324 rcvr
.get_total_snd (),
325 rcvr
.get_total_w ());
327 ACE_OS::snprintf (bufr
, 256, ACE_TEXT ("%ld(%ld)"),
328 rcvr
.get_total_rcv (),
329 rcvr
.get_total_r ());
331 ACE_DEBUG ((LM_DEBUG
,
332 ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
340 Acceptor::start (const ACE_INET_Addr
&addr
)
342 if (ACE_Acceptor
<Receiver
,ACE_SOCK_ACCEPTOR
>::open (addr
,
343 ACE_Reactor::instance (),
345 ACE_ERROR_RETURN ((LM_ERROR
,
347 ACE_TEXT("Acceptor::start () - open failed")),
353 Acceptor::make_svc_handler (Receiver
*&sh
)
355 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
357 if (sessions_
>= MAX_RECEIVERS
)
360 for (size_t i
= 0; i
< MAX_RECEIVERS
; ++i
)
361 if (this->list_receivers_
[i
] == 0)
371 // *************************************************************
373 Receiver::Receiver (Acceptor
* acceptor
, size_t index
)
374 : acceptor_ (acceptor
),
376 flg_mask_ (ACE_Event_Handler::NULL_MASK
),
383 acceptor_
->on_new_receiver (*this);
387 Receiver::~Receiver (void)
391 acceptor_
->on_delete_receiver (*this);
397 ACE_Time_Value tv
= ACE_Time_Value::zero
;
398 ACE_Message_Block
*mb
= 0;
400 if (this->getq (mb
, &tv
) < 0)
403 ACE_Message_Block::release (mb
);
408 Receiver::check_destroy (void)
410 if (flg_mask_
== ACE_Event_Handler::NULL_MASK
)
417 Receiver::open (void *)
419 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
421 ACE_Reactor
*TPReactor
= ACE_Reactor::instance ();
423 this->reactor (TPReactor
);
425 flg_mask_
= ACE_Event_Handler::NULL_MASK
;
427 if (TPReactor
->register_handler (this, flg_mask_
) == -1)
430 initiate_io (ACE_Event_Handler::READ_MASK
);
432 return check_destroy ();
436 Receiver::initiate_io (ACE_Reactor_Mask mask
)
438 if (ACE_BIT_ENABLED (flg_mask_
, mask
))
441 if (ACE_Reactor::instance ()->schedule_wakeup (this, mask
) == -1)
444 ACE_SET_BITS (flg_mask_
, mask
);
449 Receiver::terminate_io (ACE_Reactor_Mask mask
)
451 if (ACE_BIT_DISABLED (flg_mask_
, mask
))
454 if (ACE_Reactor::instance ()->cancel_wakeup (this, mask
) == -1)
457 ACE_CLR_BITS (flg_mask_
, mask
);
462 Receiver::handle_close (ACE_HANDLE
, ACE_Reactor_Mask
)
464 ACE_Reactor
*TPReactor
= ACE_Reactor::instance ();
466 TPReactor
->remove_handler (this,
467 ACE_Event_Handler::ALL_EVENTS_MASK
|
468 ACE_Event_Handler::DONT_CALL
); // Don't call handle_close
475 Receiver::handle_input (ACE_HANDLE h
)
477 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
479 ACE_Message_Block
*mb
= 0;
481 ACE_Message_Block (BUFSIZ
),
485 ssize_t res
= this->peer ().recv (mb
->rd_ptr (), BUFSIZ
-1);
492 this->total_rcv_
+= res
;
497 mb
->wr_ptr ()[0] = '\0';
499 if (loglevel
== 0 || res
<= 0 || err
!= 0)
503 ACE_DEBUG ((LM_DEBUG
, "**** Receiver::handle_input () SessionId=%d****\n", index_
));
504 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_to_read", BUFSIZ
));
505 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "handle", h
));
506 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_transferred", res
));
507 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "error", err
));
508 ACE_DEBUG ((LM_DEBUG
, "%C = %s\n", "message_block", mb
->rd_ptr ()));
509 ACE_DEBUG ((LM_DEBUG
, "**** end of message ****************\n"));
512 if (err
== EWOULDBLOCK
)
516 return check_destroy ();
519 if (err
!=0 || res
<= 0)
521 ACE_Message_Block::release (mb
);
525 ACE_Time_Value tv
= ACE_Time_Value::zero
;
527 int qcount
= this->putq (mb
, & tv
);
529 if (qcount
<= 0) // failed to putq
531 ACE_Message_Block::release (mb
);
537 if (duplex
== 0) // half-duplex , stop read
538 rc
= this->terminate_io (ACE_Event_Handler::READ_MASK
);
541 if (qcount
>= 20 ) // flow control, stop read
542 rc
= this->terminate_io (ACE_Event_Handler::READ_MASK
);
544 rc
= this->initiate_io (ACE_Event_Handler::READ_MASK
);
551 if (this->initiate_io (ACE_Event_Handler::WRITE_MASK
) != 0)
554 return check_destroy ();
558 Receiver::handle_output (ACE_HANDLE h
)
560 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
562 ACE_Time_Value tv
= ACE_Time_Value::zero
;
563 ACE_Message_Block
*mb
= 0;
569 int qcount
= this->getq (mb
, &tv
);
571 if (mb
!= 0) // qcount >= 0)
573 bytes
= mb
->length ();
574 res
= this->peer ().send (mb
->rd_ptr (), bytes
);
581 this->total_snd_
+= res
;
584 if (loglevel
== 0 || res
<= 0 || err
!= 0)
588 ACE_DEBUG ((LM_DEBUG
, "**** Receiver::handle_output () SessionId=%d****\n", index_
));
589 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_to_write", bytes
));
590 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "handle", h
));
591 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_transferred", res
));
592 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "error", err
));
593 ACE_DEBUG ((LM_DEBUG
, "%C = %s\n", "message_block", mb
->rd_ptr ()));
594 ACE_DEBUG ((LM_DEBUG
, "**** end of message ****************\n"));
598 ACE_Message_Block::release (mb
);
600 if (err
!= 0 || res
< 0)
603 if (qcount
<= 0) // no more message blocks in queue
605 if (this->terminate_io (ACE_Event_Handler::WRITE_MASK
) != 0)
608 if (this->initiate_io (ACE_Event_Handler::READ_MASK
) != 0)
612 return check_destroy ();
615 // *************************************************************
617 Connector::Connector (void)
618 : ACE_Connector
<Sender
,ACE_SOCK_CONNECTOR
> ((ACE_Reactor
*) 0),
625 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
627 for (size_t i
= 0; i
< MAX_SENDERS
; ++i
)
628 this->list_senders_
[i
] = 0;
631 Connector::~Connector (void)
640 // this method can be called only
641 // after reactor event loop id done
644 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
646 for (size_t i
= 0; i
< MAX_SENDERS
; ++i
)
648 delete this->list_senders_
[i
];
649 this->list_senders_
[i
] =0;
654 Connector::on_new_sender (Sender
& sndr
)
656 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
658 this->list_senders_
[sndr
.index_
] = &sndr
;
659 ACE_DEBUG ((LM_DEBUG
,
660 "Sender::CTOR sessions_=%d\n",
665 Connector::on_delete_sender (Sender
& sndr
)
667 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
670 this->total_snd_
+= sndr
.get_total_snd();
671 this->total_rcv_
+= sndr
.get_total_rcv();
672 this->total_w_
+= sndr
.get_total_w();
673 this->total_r_
+= sndr
.get_total_r();
675 if (sndr
.index_
< MAX_SENDERS
676 && this->list_senders_
[sndr
.index_
] == &sndr
)
677 this->list_senders_
[sndr
.index_
] = 0;
679 ACE_TCHAR bufs
[256];
680 ACE_TCHAR bufr
[256];
682 ACE_OS::snprintf (bufs
, 256, ACE_TEXT ("%ld(%ld)"),
683 sndr
.get_total_snd (),
684 sndr
.get_total_w ());
686 ACE_OS::snprintf (bufr
, 256, ACE_TEXT ("%ld(%ld)"),
687 sndr
.get_total_rcv (),
688 sndr
.get_total_r ());
690 ACE_DEBUG ((LM_DEBUG
,
691 ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
700 Connector::start (const ACE_INET_Addr
& addr
, int num
)
703 if (ACE_Connector
<Sender
,ACE_SOCK_CONNECTOR
>::open (ACE_Reactor::instance (),
708 ACE_TEXT("Connector::start () - open failed")),
713 for (int i
= 0 ; i
< num
; i
++)
717 if (ACE_Connector
<Sender
,ACE_SOCK_CONNECTOR
>::connect (sender
, addr
) < 0)
721 ACE_TEXT("Connector::start () - connect failed")),
729 Connector::make_svc_handler (Sender
* & sh
)
731 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
733 if (sessions_
>= MAX_SENDERS
)
736 for (size_t i
= 0; i
< MAX_SENDERS
; ++i
)
737 if (this->list_senders_
[i
] == 0)
748 // *************************************************************
750 Sender::Sender (Connector
* connector
, size_t index
)
751 : connector_ (connector
),
753 flg_mask_ (ACE_Event_Handler::NULL_MASK
),
760 connector_
->on_new_sender (*this);
762 ACE_OS::snprintf (send_buf_
, 1024, "%s", data
);
766 Sender::~Sender (void)
770 connector_
->on_delete_sender (*this);
776 ACE_Time_Value tv
= ACE_Time_Value::zero
;
777 ACE_Message_Block
*mb
= 0;
779 if (this->getq (mb
, &tv
) < 0)
782 ACE_Message_Block::release (mb
);
787 Sender::check_destroy (void)
789 if (flg_mask_
== ACE_Event_Handler::NULL_MASK
)
795 int Sender::open (void *)
797 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
799 ACE_Reactor
* TPReactor
= ACE_Reactor::instance ();
801 this->reactor (TPReactor
);
803 flg_mask_
= ACE_Event_Handler::NULL_MASK
;
805 if (TPReactor
->register_handler (this,flg_mask_
) == -1)
808 if (this->initiate_write () == -1)
812 initiate_io (ACE_Event_Handler::READ_MASK
);
814 return check_destroy ();
818 Sender::initiate_write (void)
820 if ( this->msg_queue ()->message_count () < 20) // flow control
822 size_t nbytes
= ACE_OS::strlen (send_buf_
);
824 ACE_Message_Block
*mb
= 0;
826 ACE_Message_Block (nbytes
+8),
829 mb
->init (send_buf_
, nbytes
);
830 mb
->rd_ptr (mb
->base ());
831 mb
->wr_ptr (mb
->base ());
834 ACE_Time_Value tv
= ACE_Time_Value::zero
;
836 int qcount
=this->putq (mb
, & tv
);
840 ACE_Message_Block::release (mb
);
845 return initiate_io (ACE_Event_Handler::WRITE_MASK
);
849 Sender::initiate_io (ACE_Reactor_Mask mask
)
851 if (ACE_BIT_ENABLED (flg_mask_
, mask
))
854 if (ACE_Reactor::instance ()->schedule_wakeup (this, mask
) == -1)
857 ACE_SET_BITS (flg_mask_
, mask
);
862 Sender::terminate_io (ACE_Reactor_Mask mask
)
864 if (ACE_BIT_DISABLED (flg_mask_
, mask
))
867 if (ACE_Reactor::instance ()->cancel_wakeup (this, mask
) == -1)
870 ACE_CLR_BITS (flg_mask_
, mask
);
875 Sender::handle_close (ACE_HANDLE
, ACE_Reactor_Mask
)
877 ACE_Reactor
* TPReactor
= ACE_Reactor::instance ();
879 TPReactor
->remove_handler (this,
880 ACE_Event_Handler::ALL_EVENTS_MASK
|
881 ACE_Event_Handler::DONT_CALL
); // Don't call handle_close
888 Sender::handle_input (ACE_HANDLE h
)
890 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
892 ACE_Message_Block
*mb
= 0;
894 ACE_Message_Block (BUFSIZ
),
898 ssize_t res
= this->peer ().recv (mb
->rd_ptr (),
905 this->total_rcv_
+= res
;
910 mb
->wr_ptr ()[0] = '\0';
912 if (loglevel
== 0 || res
<= 0 || err
!= 0)
916 ACE_DEBUG ((LM_DEBUG
, "**** Sender::handle_input () SessionId=%d****\n", index_
));
917 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_to_read", BUFSIZ
));
918 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "handle", h
));
919 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_transferred", res
));
920 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "error", err
));
921 ACE_DEBUG ((LM_DEBUG
, "%C = %s\n", "message_block", mb
->rd_ptr ()));
922 ACE_DEBUG ((LM_DEBUG
, "**** end of message ****************\n"));
925 ACE_Message_Block::release (mb
);
927 if (err
== EWOULDBLOCK
)
931 return check_destroy ();
934 if (err
!=0 || res
<= 0)
939 if (duplex
!= 0) // full duplex, continue read
940 rc
= initiate_io (ACE_Event_Handler::READ_MASK
);
942 rc
= terminate_io (ACE_Event_Handler::READ_MASK
);
947 rc
= initiate_write ();
951 return check_destroy ();
955 Sender::handle_output (ACE_HANDLE h
)
957 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
959 ACE_Time_Value tv
= ACE_Time_Value::zero
;
960 ACE_Message_Block
*mb
= 0;
966 int qcount
= this->getq (mb
, & tv
);
968 if (mb
!= 0) // qcount >= 0
970 bytes
= mb
->length ();
971 res
= this->peer ().send (mb
->rd_ptr (), bytes
);
978 this->total_snd_
+= res
;
980 if (loglevel
== 0 || res
<= 0 || err
!= 0)
984 ACE_DEBUG ((LM_DEBUG
, "**** Sender::handle_output () SessionId=%d****\n", index_
));
985 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_to_write", bytes
));
986 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "handle", h
));
987 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_transferred", res
));
988 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "error", err
));
989 ACE_DEBUG ((LM_DEBUG
, "%C = %s\n", "message_block", mb
->rd_ptr ()));
990 ACE_DEBUG ((LM_DEBUG
, "**** end of message ****************\n"));
994 ACE_Message_Block::release (mb
);
996 if (err
!= 0 || res
< 0)
1001 if (qcount
<= 0) // no more message blocks in queue
1003 if (duplex
!= 0 && // full duplex, continue write
1004 (this->total_snd_
- this->total_rcv_
) < 1024*32 ) // flow control
1005 rc
= initiate_write ();
1007 rc
= terminate_io (ACE_Event_Handler::WRITE_MASK
);
1013 rc
= initiate_io (ACE_Event_Handler::READ_MASK
);
1017 return check_destroy ();
1021 // *************************************************************
1022 // Configuration helpers
1023 // *************************************************************
1025 print_usage (int /* argc */, ACE_TCHAR
*argv
[])
1029 ACE_TEXT ("\nusage: %s")
1030 ACE_TEXT ("\n-n <number threads in the thread pool>")
1031 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1032 ACE_TEXT ("\n-p <port to listen/connect>")
1033 ACE_TEXT ("\n-h <host> for Sender mode")
1034 ACE_TEXT ("\n-s <number of sender's instances>")
1035 ACE_TEXT ("\n-b run client and server at the same time")
1036 ACE_TEXT ("\n-v log level")
1037 ACE_TEXT ("\n 0 - log all messages")
1038 ACE_TEXT ("\n 1 - log only errors and unusual cases")
1039 ACE_TEXT ("\n-i time to run in seconds")
1040 ACE_TEXT ("\n-u show this message")
1048 parse_args (int argc
, ACE_TCHAR
*argv
[])
1050 if (argc
== 1) // no arguments , so one button test
1052 both
= 1; // client and server simultaneosly
1053 duplex
= 1; // full duplex is on
1054 host
= ACE_LOCALHOST
; // server to connect
1055 port
= ACE_DEFAULT_SERVER_PORT
; // port to connect/listen
1056 threads
= 3; // size of Proactor thread pool
1057 senders
= 20; // number of senders
1058 loglevel
= 1; // log level : 0 full/ 1 only errors
1059 seconds
= 20; // time to run in seconds
1060 #if defined(SOMAXCONN) // The test is invalid if senders > SOMAXCONN
1061 if(SOMAXCONN
< senders
)
1062 senders
= SOMAXCONN
;
1067 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("i:n:p:d:h:s:v:ub"));
1070 while ((c
= get_opt ()) != EOF
)
1074 case 'i': // time to run
1075 seconds
= ACE_OS::atoi (get_opt
.opt_arg());
1076 if (seconds
< MIN_TIME
)
1078 if (seconds
> MAX_TIME
)
1081 case 'b': // both client and server
1084 case 'v': // log level
1085 loglevel
= ACE_OS::atoi (get_opt
.opt_arg());
1088 duplex
= ACE_OS::atoi (get_opt
.opt_arg());
1090 case 'h': // host for sender
1091 host
= get_opt
.opt_arg();
1093 case 'p': // port number
1094 port
= ACE_OS::atoi (get_opt
.opt_arg());
1096 case 'n': // thread pool size
1097 threads
= ACE_OS::atoi (get_opt
.opt_arg());
1099 case 's': // number of senders
1100 senders
= ACE_OS::atoi (get_opt
.opt_arg());
1101 if (size_t (senders
) > MAX_SENDERS
)
1102 senders
= MAX_SENDERS
;
1106 return print_usage (argc
,argv
);
1114 disable_signal (int sigmin
, int sigmax
)
1116 #if !defined (ACE_LACKS_UNIX_SIGNALS)
1117 sigset_t signal_set
;
1118 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
1119 ACE_ERROR ((LM_ERROR
,
1120 ACE_TEXT("Error: (%P | %t):%p\n"),
1121 ACE_TEXT("sigemptyset failed")));
1123 for (int i
= sigmin
; i
<= sigmax
; i
++)
1124 ACE_OS::sigaddset (&signal_set
, i
);
1126 // Put the <signal_set>.
1127 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
1128 // In multi-threaded application this is not POSIX compliant
1129 // but let's leave it just in case.
1130 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
1132 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
1133 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
1134 ACE_ERROR_RETURN ((LM_ERROR
,
1135 ACE_TEXT ("Error: (%P|%t): %p\n"),
1136 ACE_TEXT ("SIG_BLOCK failed")),
1139 ACE_UNUSED_ARG(sigmin
);
1140 ACE_UNUSED_ARG(sigmax
);
1141 #endif /* ACE_LACKS_UNIX_SIGNALS */
1146 #endif /* ACE_HAS_THREADS */
1149 run_main (int argc
, ACE_TCHAR
*argv
[])
1151 ACE_START_TEST (ACE_TEXT ("TP_Reactor_Test"));
1153 #if defined(ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
1154 if (::parse_args (argc
, argv
) == -1)
1157 disable_signal (SIGPIPE
, SIGPIPE
);
1161 Connector connector
;
1163 if (task1
.start (threads
) == 0)
1167 ACE_INET_Addr
addr (port
);
1168 if (both
!= 0 || host
== 0) // Acceptor
1169 rc
+= acceptor
.start (addr
);
1171 if (both
!= 0 || host
!= 0)
1174 host
= ACE_LOCALHOST
;
1176 if (addr
.set (port
, host
, 1, addr
.get_type ()) == -1)
1177 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), host
));
1178 rc
+= connector
.start (addr
, senders
);
1183 ACE_OS::sleep (seconds
);
1188 ACE_DEBUG ((LM_DEBUG
,
1189 ACE_TEXT ("\nNumber of Receivers objects = %d\n")
1190 ACE_TEXT ("\nNumber of Sender objects = %d\n"),
1191 acceptor
.get_number_sessions (),
1192 connector
.get_number_sessions ()));
1194 // As Reactor event loop now is inactive it is safe to destroy all
1201 ACE_TCHAR bufs
[256];
1202 ACE_TCHAR bufr
[256];
1204 ACE_OS::snprintf (bufs
, 256, ACE_TEXT ("%ld(%ld)"),
1205 connector
.get_total_snd (),
1206 connector
.get_total_w ());
1208 ACE_OS::snprintf (bufr
, 256, ACE_TEXT ("%ld(%ld)"),
1209 connector
.get_total_rcv (),
1210 connector
.get_total_r ());
1212 ACE_DEBUG ((LM_DEBUG
,
1213 ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
1218 ACE_OS::snprintf (bufs
, 256, ACE_TEXT ("%ld(%ld)"),
1219 acceptor
.get_total_snd (),
1220 acceptor
.get_total_w ());
1222 ACE_OS::snprintf (bufr
, 256, ACE_TEXT ("%ld(%ld)"),
1223 acceptor
.get_total_rcv (),
1224 acceptor
.get_total_r ());
1226 ACE_DEBUG ((LM_DEBUG
,
1227 ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),
1232 #else /* ACE_HAS_THREADS */
1233 ACE_UNUSED_ARG( argc
);
1234 ACE_UNUSED_ARG( argv
);
1235 #endif /* ACE_HAS_THREADS */