2 //=============================================================================
4 * @file TP_Reactor_Test.cpp
6 * This program illustrates how the <ACE_TP_Reactor> can be used to
7 * implement an application that does various operations.
8 * usage: TP_Reactor_Test
9 * -n number threads in the TP_Reactor thread pool
10 * -d duplex mode 1 (full-duplex) vs. 0 (half-duplex)
11 * -p port to listen(Server)/connect(Client)
12 * -h host to connect (Client mode)
13 * -s number of sender's instances ( Client mode)
14 * -b run client and server (both modes ) at the same time
16 * 0 - log all messages
17 * 1 - log only errors and unusual cases
18 * -i time to run in seconds
19 * -u show this message
21 * The main differences between Thread_Pool_Reactor_Test.cpp and
24 * 1. Thread_Pool_Reactor_Test.cpp tests only handle_input()
25 * events on the server, whereas this one tests both handle_input() and
26 * handle_output() on both server and client, i.e., the receiver
27 * and sender are completely event-driven.
29 * 2. The receiver and sender in this test can work in full duplex
30 * mode, i.e., input and ouput events are processed independently.
31 * Half-duplex mode (request-reply) is also supported.
33 * This test is therefore a bit more stressful than the
34 * Thread_Pool_Reactor.cpp for the ACE_TP_Reactor since same
35 * thread pool is shared between client and server.
37 * This test is a "twin" of the Proactor_Test.cpp, so it can help for
38 * developers to provide independent of Reactor/Proactor solutions.
40 * @author Alexander Libman <alibman@ihug.com.au>
41 * @author <alexl@rumblgroup.com>
43 //=============================================================================
46 #include "test_config.h"
48 #if defined(ACE_HAS_THREADS)
50 #include "TP_Reactor_Test.h"
52 #include "ace/Signal.h"
53 #include "ace/Service_Config.h"
54 #include "ace/Get_Opt.h"
56 #include "ace/Reactor.h"
57 #include "ace/TP_Reactor.h"
58 #include "ace/OS_NS_signal.h"
59 #include "ace/OS_NS_stdio.h"
60 #include "ace/OS_NS_string.h"
61 #include "ace/OS_NS_unistd.h"
62 #include "ace/Synch_Traits.h"
63 #include "ace/Thread_Semaphore.h"
65 // Some debug helper functions
66 static int disable_signal (int sigmin
, int sigmax
);
68 // both: 0 run client or server / depends on host
69 // != 0 run client and server
72 // Host that we're connecting to.
73 static const ACE_TCHAR
*host
= 0;
75 // number of Senders instances
76 static int senders
= 1;
78 // duplex mode: == 0 half-duplex
80 static int duplex
= 0;
82 // number threads in the TP_Reactor thread pool
83 static int threads
= 1;
85 // Port that we're receiving connections on.
86 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
89 static int loglevel
= 1; // 0 full , 1 only errors
91 static const size_t MIN_TIME
= 1; // min 1 sec
92 static const size_t MAX_TIME
= 3600; // max 1 hour
93 static u_int seconds
= 2; // default time to run - 2 seconds
98 "Accept-Language: C++\r\n"
99 "Accept-Encoding: gzip, deflate\r\n"
100 "User-Agent: TPReactor_Test/1.0 (non-compatible)\r\n"
101 "Connection: Keep-Alive\r\n"
104 // *************************************************************
109 LogLocker () { ACE_LOG_MSG
->acquire (); }
110 virtual ~LogLocker () { ACE_LOG_MSG
->release (); }
112 // *************************************************************
117 * MyTask plays role for TP_Reactor threads pool
119 * MyTask is ACE_Task resposible for:
120 * 1. Creation and deletion of TP_Reactor and TP_Reactor thread pool
121 * 2. Running TP_Reactor event loop
123 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
126 MyTask (): sem_ ((unsigned int) 0),
129 ~MyTask () override
{ stop (); }
133 int start (int num_threads
);
137 int create_reactor ();
138 int delete_reactor ();
140 ACE_SYNCH_RECURSIVE_MUTEX lock_
;
141 ACE_Thread_Semaphore sem_
;
142 ACE_Reactor
*my_reactor_
;
146 MyTask::create_reactor ()
148 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
153 ACE_TEST_ASSERT (this->my_reactor_
== 0);
155 ACE_TP_Reactor
* pImpl
= 0;
157 ACE_NEW_RETURN (pImpl
,ACE_TP_Reactor
, -1);
159 ACE_NEW_RETURN (my_reactor_
,
160 ACE_Reactor (pImpl
,1),
163 ACE_DEBUG ((LM_DEBUG
,
164 ACE_TEXT (" (%t) Create TP_Reactor\n")));
166 ACE_Reactor::instance (this->my_reactor_
);
168 this->reactor (my_reactor_
);
174 MyTask::delete_reactor ()
176 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
181 ACE_DEBUG ((LM_DEBUG
,
182 ACE_TEXT (" (%t) Delete TP_Reactor\n")));
184 delete this->my_reactor_
;
185 ACE_Reactor::instance ((ACE_Reactor
*) 0);
186 this->my_reactor_
= 0;
193 MyTask::start (int num_threads
)
195 if (this->create_reactor () == -1)
196 ACE_ERROR_RETURN ((LM_ERROR
,
198 ACE_TEXT ("unable to create reactor")),
201 if (this->activate (THR_NEW_LWP
, num_threads
) == -1)
202 ACE_ERROR_RETURN ((LM_ERROR
,
204 ACE_TEXT ("unable to activate thread pool")),
207 for (; num_threads
> 0 ; num_threads
--)
217 if (this->my_reactor_
!= 0)
219 ACE_DEBUG ((LM_DEBUG
,
220 ACE_TEXT ("End TP_Reactor event loop\n")));
222 ACE_Reactor::instance()->end_reactor_event_loop ();
225 if (this->wait () == -1)
226 ACE_ERROR ((LM_ERROR
,
228 ACE_TEXT ("unable to stop thread pool")));
230 if (this->delete_reactor () == -1)
231 ACE_ERROR ((LM_ERROR
,
233 ACE_TEXT ("unable to delete reactor")));
241 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" (%t) MyTask started\n")));
243 disable_signal (SIGPIPE
, SIGPIPE
);
245 // signal that we are ready
248 while (ACE_Reactor::instance()->reactor_event_loop_done () == 0)
249 ACE_Reactor::instance()->run_reactor_event_loop ();
251 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" (%t) MyTask finished\n")));
255 // *************************************************************
257 Acceptor::Acceptor ()
258 : ACE_Acceptor
<Receiver
,ACE_SOCK_ACCEPTOR
> ((ACE_Reactor
*) 0),
265 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
267 for (size_t i
= 0; i
< MAX_RECEIVERS
; ++i
)
268 this->list_receivers_
[i
] =0;
271 Acceptor::~Acceptor ()
280 // this method can be called only after reactor event loop id done
283 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
285 for (size_t i
= 0; i
< MAX_RECEIVERS
; ++i
)
287 delete this->list_receivers_
[i
];
288 this->list_receivers_
[i
] =0;
293 Acceptor::on_new_receiver (Receiver
&rcvr
)
295 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
297 this->list_receivers_
[rcvr
.index_
] = & rcvr
;
298 ACE_DEBUG ((LM_DEBUG
,
299 "Receiver::CTOR sessions_=%d\n",
304 Acceptor::on_delete_receiver (Receiver
&rcvr
)
306 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
310 this->total_snd_
+= rcvr
.get_total_snd ();
311 this->total_rcv_
+= rcvr
.get_total_rcv ();
312 this->total_w_
+= rcvr
.get_total_w ();
313 this->total_r_
+= rcvr
.get_total_r ();
315 if (rcvr
.index_
< MAX_RECEIVERS
316 && this->list_receivers_
[rcvr
.index_
] == &rcvr
)
317 this->list_receivers_
[rcvr
.index_
] = 0;
319 ACE_TCHAR bufs
[256];
320 ACE_TCHAR bufr
[256];
322 ACE_OS::snprintf (bufs
, 256, ACE_TEXT ("%ld(%ld)"),
323 rcvr
.get_total_snd (),
324 rcvr
.get_total_w ());
326 ACE_OS::snprintf (bufr
, 256, ACE_TEXT ("%ld(%ld)"),
327 rcvr
.get_total_rcv (),
328 rcvr
.get_total_r ());
330 ACE_DEBUG ((LM_DEBUG
,
331 ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
339 Acceptor::start (const ACE_INET_Addr
&addr
)
341 if (ACE_Acceptor
<Receiver
,ACE_SOCK_ACCEPTOR
>::open (addr
,
342 ACE_Reactor::instance (),
344 ACE_ERROR_RETURN ((LM_ERROR
,
346 ACE_TEXT("Acceptor::start () - open failed")),
352 Acceptor::make_svc_handler (Receiver
*&sh
)
354 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
356 if (sessions_
>= MAX_RECEIVERS
)
359 for (size_t i
= 0; i
< MAX_RECEIVERS
; ++i
)
360 if (this->list_receivers_
[i
] == 0)
370 // *************************************************************
372 Receiver::Receiver (Acceptor
* acceptor
, size_t index
)
373 : acceptor_ (acceptor
),
375 flg_mask_ (ACE_Event_Handler::NULL_MASK
),
382 acceptor_
->on_new_receiver (*this);
386 Receiver::~Receiver ()
390 acceptor_
->on_delete_receiver (*this);
396 ACE_Time_Value tv
= ACE_Time_Value::zero
;
397 ACE_Message_Block
*mb
= 0;
399 if (this->getq (mb
, &tv
) < 0)
402 ACE_Message_Block::release (mb
);
407 Receiver::check_destroy ()
409 if (flg_mask_
== ACE_Event_Handler::NULL_MASK
)
416 Receiver::open (void *)
418 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
420 ACE_Reactor
*TPReactor
= ACE_Reactor::instance ();
422 this->reactor (TPReactor
);
424 flg_mask_
= ACE_Event_Handler::NULL_MASK
;
426 if (TPReactor
->register_handler (this, flg_mask_
) == -1)
429 initiate_io (ACE_Event_Handler::READ_MASK
);
431 return check_destroy ();
435 Receiver::initiate_io (ACE_Reactor_Mask mask
)
437 if (ACE_BIT_ENABLED (flg_mask_
, mask
))
440 if (ACE_Reactor::instance ()->schedule_wakeup (this, mask
) == -1)
443 ACE_SET_BITS (flg_mask_
, mask
);
448 Receiver::terminate_io (ACE_Reactor_Mask mask
)
450 if (ACE_BIT_DISABLED (flg_mask_
, mask
))
453 if (ACE_Reactor::instance ()->cancel_wakeup (this, mask
) == -1)
456 ACE_CLR_BITS (flg_mask_
, mask
);
461 Receiver::handle_close (ACE_HANDLE
, ACE_Reactor_Mask
)
463 ACE_Reactor
*TPReactor
= ACE_Reactor::instance ();
465 TPReactor
->remove_handler (this,
466 ACE_Event_Handler::ALL_EVENTS_MASK
|
467 ACE_Event_Handler::DONT_CALL
); // Don't call handle_close
474 Receiver::handle_input (ACE_HANDLE h
)
476 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
478 ACE_Message_Block
*mb
= 0;
480 ACE_Message_Block (BUFSIZ
),
484 ssize_t res
= this->peer ().recv (mb
->rd_ptr (), BUFSIZ
-1);
491 this->total_rcv_
+= res
;
496 mb
->wr_ptr ()[0] = '\0';
498 if (loglevel
== 0 || res
<= 0 || err
!= 0)
502 ACE_DEBUG ((LM_DEBUG
, "**** Receiver::handle_input () SessionId=%d****\n", index_
));
503 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_to_read", BUFSIZ
));
504 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "handle", h
));
505 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_transferred", res
));
506 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "error", err
));
507 ACE_DEBUG ((LM_DEBUG
, "%C = %s\n", "message_block", mb
->rd_ptr ()));
508 ACE_DEBUG ((LM_DEBUG
, "**** end of message ****************\n"));
511 if (err
== EWOULDBLOCK
)
515 return check_destroy ();
518 if (err
!=0 || res
<= 0)
520 ACE_Message_Block::release (mb
);
524 ACE_Time_Value tv
= ACE_Time_Value::zero
;
526 int qcount
= this->putq (mb
, & tv
);
528 if (qcount
<= 0) // failed to putq
530 ACE_Message_Block::release (mb
);
536 if (duplex
== 0) // half-duplex , stop read
537 rc
= this->terminate_io (ACE_Event_Handler::READ_MASK
);
540 if (qcount
>= 20 ) // flow control, stop read
541 rc
= this->terminate_io (ACE_Event_Handler::READ_MASK
);
543 rc
= this->initiate_io (ACE_Event_Handler::READ_MASK
);
550 if (this->initiate_io (ACE_Event_Handler::WRITE_MASK
) != 0)
553 return check_destroy ();
557 Receiver::handle_output (ACE_HANDLE h
)
559 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
561 ACE_Time_Value tv
= ACE_Time_Value::zero
;
562 ACE_Message_Block
*mb
= 0;
568 int qcount
= this->getq (mb
, &tv
);
570 if (mb
!= 0) // qcount >= 0)
572 bytes
= mb
->length ();
573 res
= this->peer ().send (mb
->rd_ptr (), bytes
);
580 this->total_snd_
+= res
;
583 if (loglevel
== 0 || res
<= 0 || err
!= 0)
587 ACE_DEBUG ((LM_DEBUG
, "**** Receiver::handle_output () SessionId=%d****\n", index_
));
588 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_to_write", bytes
));
589 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "handle", h
));
590 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_transferred", res
));
591 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "error", err
));
592 ACE_DEBUG ((LM_DEBUG
, "%C = %s\n", "message_block", mb
->rd_ptr ()));
593 ACE_DEBUG ((LM_DEBUG
, "**** end of message ****************\n"));
597 ACE_Message_Block::release (mb
);
599 if (err
!= 0 || res
< 0)
602 if (qcount
<= 0) // no more message blocks in queue
604 if (this->terminate_io (ACE_Event_Handler::WRITE_MASK
) != 0)
607 if (this->initiate_io (ACE_Event_Handler::READ_MASK
) != 0)
611 return check_destroy ();
614 // *************************************************************
616 Connector::Connector ()
617 : ACE_Connector
<Sender
,ACE_SOCK_CONNECTOR
> ((ACE_Reactor
*) 0),
624 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
626 for (size_t i
= 0; i
< MAX_SENDERS
; ++i
)
627 this->list_senders_
[i
] = 0;
630 Connector::~Connector ()
639 // this method can be called only
640 // after reactor event loop id done
643 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
645 for (size_t i
= 0; i
< MAX_SENDERS
; ++i
)
647 delete this->list_senders_
[i
];
648 this->list_senders_
[i
] =0;
653 Connector::on_new_sender (Sender
& sndr
)
655 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
657 this->list_senders_
[sndr
.index_
] = &sndr
;
658 ACE_DEBUG ((LM_DEBUG
,
659 "Sender::CTOR sessions_=%d\n",
664 Connector::on_delete_sender (Sender
& sndr
)
666 ACE_GUARD (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
);
669 this->total_snd_
+= sndr
.get_total_snd();
670 this->total_rcv_
+= sndr
.get_total_rcv();
671 this->total_w_
+= sndr
.get_total_w();
672 this->total_r_
+= sndr
.get_total_r();
674 if (sndr
.index_
< MAX_SENDERS
675 && this->list_senders_
[sndr
.index_
] == &sndr
)
676 this->list_senders_
[sndr
.index_
] = 0;
678 ACE_TCHAR bufs
[256];
679 ACE_TCHAR bufr
[256];
681 ACE_OS::snprintf (bufs
, 256, ACE_TEXT ("%ld(%ld)"),
682 sndr
.get_total_snd (),
683 sndr
.get_total_w ());
685 ACE_OS::snprintf (bufr
, 256, ACE_TEXT ("%ld(%ld)"),
686 sndr
.get_total_rcv (),
687 sndr
.get_total_r ());
689 ACE_DEBUG ((LM_DEBUG
,
690 ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
698 Connector::start (const ACE_INET_Addr
& addr
, int num
)
700 if (ACE_Connector
<Sender
,ACE_SOCK_CONNECTOR
>::open (ACE_Reactor::instance (),
705 ACE_TEXT("Connector::start () - open failed")),
710 for (int i
= 0 ; i
< num
; i
++)
714 if (ACE_Connector
<Sender
,ACE_SOCK_CONNECTOR
>::connect (sender
, addr
) < 0)
718 ACE_TEXT("Connector::start () - connect failed")),
726 Connector::make_svc_handler (Sender
* & sh
)
728 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
730 if (sessions_
>= MAX_SENDERS
)
733 for (size_t i
= 0; i
< MAX_SENDERS
; ++i
)
734 if (this->list_senders_
[i
] == 0)
745 // *************************************************************
747 Sender::Sender (Connector
* connector
, size_t index
)
748 : connector_ (connector
),
750 flg_mask_ (ACE_Event_Handler::NULL_MASK
),
757 connector_
->on_new_sender (*this);
759 ACE_OS::snprintf (send_buf_
, 1024, "%s", data
);
767 connector_
->on_delete_sender (*this);
773 ACE_Time_Value tv
= ACE_Time_Value::zero
;
774 ACE_Message_Block
*mb
= 0;
776 if (this->getq (mb
, &tv
) < 0)
779 ACE_Message_Block::release (mb
);
784 Sender::check_destroy ()
786 if (flg_mask_
== ACE_Event_Handler::NULL_MASK
)
792 int Sender::open (void *)
794 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
796 ACE_Reactor
* TPReactor
= ACE_Reactor::instance ();
798 this->reactor (TPReactor
);
800 flg_mask_
= ACE_Event_Handler::NULL_MASK
;
802 if (TPReactor
->register_handler (this,flg_mask_
) == -1)
805 if (this->initiate_write () == -1)
809 initiate_io (ACE_Event_Handler::READ_MASK
);
811 return check_destroy ();
815 Sender::initiate_write ()
817 if ( this->msg_queue ()->message_count () < 20) // flow control
819 size_t nbytes
= ACE_OS::strlen (send_buf_
);
821 ACE_Message_Block
*mb
= 0;
823 ACE_Message_Block (nbytes
+8),
826 mb
->init (send_buf_
, nbytes
);
827 mb
->rd_ptr (mb
->base ());
828 mb
->wr_ptr (mb
->base ());
831 ACE_Time_Value tv
= ACE_Time_Value::zero
;
833 int qcount
=this->putq (mb
, & tv
);
837 ACE_Message_Block::release (mb
);
842 return initiate_io (ACE_Event_Handler::WRITE_MASK
);
846 Sender::initiate_io (ACE_Reactor_Mask mask
)
848 if (ACE_BIT_ENABLED (flg_mask_
, mask
))
851 if (ACE_Reactor::instance ()->schedule_wakeup (this, mask
) == -1)
854 ACE_SET_BITS (flg_mask_
, mask
);
859 Sender::terminate_io (ACE_Reactor_Mask mask
)
861 if (ACE_BIT_DISABLED (flg_mask_
, mask
))
864 if (ACE_Reactor::instance ()->cancel_wakeup (this, mask
) == -1)
867 ACE_CLR_BITS (flg_mask_
, mask
);
872 Sender::handle_close (ACE_HANDLE
, ACE_Reactor_Mask
)
874 ACE_Reactor
* TPReactor
= ACE_Reactor::instance ();
876 TPReactor
->remove_handler (this,
877 ACE_Event_Handler::ALL_EVENTS_MASK
|
878 ACE_Event_Handler::DONT_CALL
); // Don't call handle_close
885 Sender::handle_input (ACE_HANDLE h
)
887 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
889 ACE_Message_Block
*mb
= 0;
891 ACE_Message_Block (BUFSIZ
),
895 ssize_t res
= this->peer ().recv (mb
->rd_ptr (),
902 this->total_rcv_
+= res
;
907 mb
->wr_ptr ()[0] = '\0';
909 if (loglevel
== 0 || res
<= 0 || err
!= 0)
913 ACE_DEBUG ((LM_DEBUG
, "**** Sender::handle_input () SessionId=%d****\n", index_
));
914 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_to_read", BUFSIZ
));
915 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "handle", h
));
916 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_transferred", res
));
917 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "error", err
));
918 ACE_DEBUG ((LM_DEBUG
, "%C = %s\n", "message_block", mb
->rd_ptr ()));
919 ACE_DEBUG ((LM_DEBUG
, "**** end of message ****************\n"));
922 ACE_Message_Block::release (mb
);
924 if (err
== EWOULDBLOCK
)
928 return check_destroy ();
931 if (err
!=0 || res
<= 0)
936 if (duplex
!= 0) // full duplex, continue read
937 rc
= initiate_io (ACE_Event_Handler::READ_MASK
);
939 rc
= terminate_io (ACE_Event_Handler::READ_MASK
);
944 rc
= initiate_write ();
948 return check_destroy ();
952 Sender::handle_output (ACE_HANDLE h
)
954 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex
, locker
, this->mutex_
, -1);
956 ACE_Time_Value tv
= ACE_Time_Value::zero
;
957 ACE_Message_Block
*mb
= 0;
963 int qcount
= this->getq (mb
, & tv
);
965 if (mb
!= 0) // qcount >= 0
967 bytes
= mb
->length ();
968 res
= this->peer ().send (mb
->rd_ptr (), bytes
);
975 this->total_snd_
+= res
;
977 if (loglevel
== 0 || res
<= 0 || err
!= 0)
981 ACE_DEBUG ((LM_DEBUG
, "**** Sender::handle_output () SessionId=%d****\n", index_
));
982 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_to_write", bytes
));
983 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "handle", h
));
984 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "bytes_transferred", res
));
985 ACE_DEBUG ((LM_DEBUG
, "%C = %d\n", "error", err
));
986 ACE_DEBUG ((LM_DEBUG
, "%C = %s\n", "message_block", mb
->rd_ptr ()));
987 ACE_DEBUG ((LM_DEBUG
, "**** end of message ****************\n"));
991 ACE_Message_Block::release (mb
);
993 if (err
!= 0 || res
< 0)
998 if (qcount
<= 0) // no more message blocks in queue
1000 if (duplex
!= 0 && // full duplex, continue write
1001 (this->total_snd_
- this->total_rcv_
) < 1024*32 ) // flow control
1002 rc
= initiate_write ();
1004 rc
= terminate_io (ACE_Event_Handler::WRITE_MASK
);
1010 rc
= initiate_io (ACE_Event_Handler::READ_MASK
);
1014 return check_destroy ();
1018 // *************************************************************
1019 // Configuration helpers
1020 // *************************************************************
1022 print_usage (int /* argc */, ACE_TCHAR
*argv
[])
1026 ACE_TEXT ("\nusage: %s")
1027 ACE_TEXT ("\n-n <number threads in the thread pool>")
1028 ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
1029 ACE_TEXT ("\n-p <port to listen/connect>")
1030 ACE_TEXT ("\n-h <host> for Sender mode")
1031 ACE_TEXT ("\n-s <number of sender's instances>")
1032 ACE_TEXT ("\n-b run client and server at the same time")
1033 ACE_TEXT ("\n-v log level")
1034 ACE_TEXT ("\n 0 - log all messages")
1035 ACE_TEXT ("\n 1 - log only errors and unusual cases")
1036 ACE_TEXT ("\n-i time to run in seconds")
1037 ACE_TEXT ("\n-u show this message")
1045 parse_args (int argc
, ACE_TCHAR
*argv
[])
1047 if (argc
== 1) // no arguments , so one button test
1049 both
= 1; // client and server simultaneosly
1050 duplex
= 1; // full duplex is on
1051 host
= ACE_LOCALHOST
; // server to connect
1052 port
= ACE_DEFAULT_SERVER_PORT
; // port to connect/listen
1053 threads
= 3; // size of Proactor thread pool
1054 senders
= 20; // number of senders
1055 loglevel
= 1; // log level : 0 full/ 1 only errors
1056 seconds
= 20; // time to run in seconds
1057 #if defined(SOMAXCONN) // The test is invalid if senders > SOMAXCONN
1058 if(SOMAXCONN
< senders
)
1059 senders
= SOMAXCONN
;
1064 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("i:n:p:d:h:s:v:ub"));
1067 while ((c
= get_opt ()) != EOF
)
1071 case 'i': // time to run
1072 seconds
= ACE_OS::atoi (get_opt
.opt_arg());
1073 if (seconds
< MIN_TIME
)
1075 if (seconds
> MAX_TIME
)
1078 case 'b': // both client and server
1081 case 'v': // log level
1082 loglevel
= ACE_OS::atoi (get_opt
.opt_arg());
1085 duplex
= ACE_OS::atoi (get_opt
.opt_arg());
1087 case 'h': // host for sender
1088 host
= get_opt
.opt_arg();
1090 case 'p': // port number
1091 port
= ACE_OS::atoi (get_opt
.opt_arg());
1093 case 'n': // thread pool size
1094 threads
= ACE_OS::atoi (get_opt
.opt_arg());
1096 case 's': // number of senders
1097 senders
= ACE_OS::atoi (get_opt
.opt_arg());
1098 if (size_t (senders
) > MAX_SENDERS
)
1099 senders
= MAX_SENDERS
;
1103 return print_usage (argc
,argv
);
1111 disable_signal (int sigmin
, int sigmax
)
1113 #if !defined (ACE_LACKS_UNIX_SIGNALS)
1114 sigset_t signal_set
;
1115 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
1116 ACE_ERROR ((LM_ERROR
,
1117 ACE_TEXT("Error: (%P | %t):%p\n"),
1118 ACE_TEXT("sigemptyset failed")));
1120 for (int i
= sigmin
; i
<= sigmax
; i
++)
1121 ACE_OS::sigaddset (&signal_set
, i
);
1123 // Put the <signal_set>.
1124 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
1125 // In multi-threaded application this is not POSIX compliant
1126 // but let's leave it just in case.
1127 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
1129 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
1130 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
1131 ACE_ERROR_RETURN ((LM_ERROR
,
1132 ACE_TEXT ("Error: (%P|%t): %p\n"),
1133 ACE_TEXT ("SIG_BLOCK failed")),
1136 ACE_UNUSED_ARG(sigmin
);
1137 ACE_UNUSED_ARG(sigmax
);
1138 #endif /* ACE_LACKS_UNIX_SIGNALS */
1143 #endif /* ACE_HAS_THREADS */
1146 run_main (int argc
, ACE_TCHAR
*argv
[])
1148 ACE_START_TEST (ACE_TEXT ("TP_Reactor_Test"));
1150 #if defined(ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
1151 if (::parse_args (argc
, argv
) == -1)
1154 disable_signal (SIGPIPE
, SIGPIPE
);
1158 Connector connector
;
1160 if (task1
.start (threads
) == 0)
1164 ACE_INET_Addr
addr (port
);
1165 if (both
!= 0 || host
== 0) // Acceptor
1166 rc
+= acceptor
.start (addr
);
1168 if (both
!= 0 || host
!= 0)
1171 host
= ACE_LOCALHOST
;
1173 if (addr
.set (port
, host
, 1, addr
.get_type ()) == -1)
1174 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), host
));
1175 rc
+= connector
.start (addr
, senders
);
1179 ACE_OS::sleep (seconds
);
1184 ACE_DEBUG ((LM_DEBUG
,
1185 ACE_TEXT ("\nNumber of Receivers objects = %d\n")
1186 ACE_TEXT ("\nNumber of Sender objects = %d\n"),
1187 acceptor
.get_number_sessions (),
1188 connector
.get_number_sessions ()));
1190 // As Reactor event loop now is inactive it is safe to destroy all
1197 ACE_TCHAR bufs
[256];
1198 ACE_TCHAR bufr
[256];
1200 ACE_OS::snprintf (bufs
, 256, ACE_TEXT ("%ld(%ld)"),
1201 connector
.get_total_snd (),
1202 connector
.get_total_w ());
1204 ACE_OS::snprintf (bufr
, 256, ACE_TEXT ("%ld(%ld)"),
1205 connector
.get_total_rcv (),
1206 connector
.get_total_r ());
1208 ACE_DEBUG ((LM_DEBUG
,
1209 ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
1214 ACE_OS::snprintf (bufs
, 256, ACE_TEXT ("%ld(%ld)"),
1215 acceptor
.get_total_snd (),
1216 acceptor
.get_total_w ());
1218 ACE_OS::snprintf (bufr
, 256, ACE_TEXT ("%ld(%ld)"),
1219 acceptor
.get_total_rcv (),
1220 acceptor
.get_total_r ());
1222 ACE_DEBUG ((LM_DEBUG
,
1223 ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),
1228 #else /* ACE_HAS_THREADS */
1229 ACE_UNUSED_ARG( argc
);
1230 ACE_UNUSED_ARG( argv
);
1231 #endif /* ACE_HAS_THREADS */