2 //=============================================================================
4 * @file Thread_Pool_Reactor_Resume_Test.cpp
6 * This program is an additional torture test of thread pool
7 * reactors. This test is based on Thread_Pool_Reactor_Test.cpp
8 * in $ACE_ROOT/tests. This test differs from the other one
9 * slightly. The TP reactor is instantiated with the
10 * with a value of 1 for the <resume_flag> argument. The server
11 * threads during the handle_input call resumes the handle that
12 * would have been suspended by the reactor.
14 * Usage: Thread_Pool_Reactor_Test [-r <hostname:port#>]
15 * [-s <server thr#>] [-c <client thr#>] [-d <delay>]
16 * [-i <client conn attempt#>] [-n <client request# per conn>]
19 * <hostname:port#>: ACE_DEFAULT_RENDEZVOUS
20 * <server thr#>: ACE_MAX_THREADS
21 * <client thr#>: ACE_MAX_ITERATIONS
22 * <client conn attempt#>: ACE_MAX_ITERATIONS
23 * <client req# per conn>: ACE_MAX_THREADS
26 * @author Balachandran Natarajan <bala@cs.wustl.edu>
28 //=============================================================================
31 #include "test_config.h"
32 #include "ace/OS_NS_string.h"
33 #include "ace/OS_NS_unistd.h"
34 #include "ace/Get_Opt.h"
35 #include "ace/SOCK_Connector.h"
36 #include "ace/SOCK_Acceptor.h"
37 #include "ace/Acceptor.h"
38 #include "ace/Thread_Manager.h"
39 #include "ace/TP_Reactor.h"
42 #if defined (ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
44 #include "Thread_Pool_Reactor_Resume_Test.h"
45 using ACCEPTOR
= ACE_Strategy_Acceptor
<Request_Handler
, ACE_SOCK_Acceptor
>;
47 // Accepting end point. This is actually "localhost:10010", but some
48 // platform couldn't resolve the name so we use the IP address
50 static const ACE_TCHAR
*rendezvous
= ACE_TEXT ("127.0.0.1:10010");
52 // Total number of server threads.
53 static size_t svr_thrno
= ACE_MAX_THREADS
;
56 // Default network parameters (MAX_BINDS and system buffers) are too small
57 // for full test on some platforms; add platforms that can't handle too many
58 // connection simultaneously here.
59 #if defined (ACE_VXWORKS)
60 #define ACE_LOAD_FACTOR /2
62 #define ACE_LOAD_FACTOR
65 // Total number of client threads.
66 static size_t cli_thrno
= ACE_MAX_THREADS ACE_LOAD_FACTOR
;
68 // Total connection attempts of a client thread.
69 static size_t cli_conn_no
= ACE_MAX_ITERATIONS ACE_LOAD_FACTOR
;
71 // Total requests a client thread sends.
72 static size_t cli_req_no
= ACE_MAX_THREADS ACE_LOAD_FACTOR
;
74 // Delay before a thread sending the next request (in msec.)
75 static int req_delay
= 50;
78 parse_arg (int argc
, ACE_TCHAR
*argv
[])
80 //FUZZ: disable check_for_lack_ACE_OS
81 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT ("r:s:c:d:i:n:"));
85 while ((c
= getopt ()) != -1)
87 //FUZZ: enable check_for_lack_ACE_OS
90 case 'r': // hostname:port
91 rendezvous
= getopt
.opt_arg ();
94 svr_thrno
= ACE_OS::atoi (getopt
.opt_arg ());
97 cli_thrno
= ACE_OS::atoi (getopt
.opt_arg ());
100 req_delay
= ACE_OS::atoi (getopt
.opt_arg ());
103 cli_conn_no
= ACE_OS::atoi (getopt
.opt_arg ());
106 cli_req_no
= ACE_OS::atoi (getopt
.opt_arg ());
109 ACE_ERROR ((LM_ERROR
,
110 "Usage: Thread_Pool_Reactor_Resume_Test [-r <hostname:port#>]"
111 "\t[-s <server thr#>] [-c <client thr#>] [-d <delay>]"
112 "\t[-i <client conn attempt#>]"
113 "[-n <client request# per conn>]\n"));
119 Request_Handler::Request_Handler (ACE_Thread_Manager
*thr_mgr
)
120 : ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_MT_SYNCH
> (thr_mgr
),
123 // Enable reference counting.
124 this->reference_counting_policy ().value
125 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED
);
127 // Make sure we use TP_Reactor with this class (that's the whole
129 this->reactor (ACE_Reactor::instance ());
133 Request_Handler::open (void *arg
)
137 ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_MT_SYNCH
>::open (arg
);
143 // Else we have successfully registered with the Reactor. Give our
144 // ownership to the Reactor.
145 this->remove_reference ();
151 Request_Handler::~Request_Handler ()
156 Request_Handler::resume_handler ()
158 ACE_DEBUG ((LM_DEBUG
,
159 ACE_TEXT ("(%t) resume_handler () called\n")));
164 Request_Handler::handle_input (ACE_HANDLE fd
)
166 ACE_TCHAR buffer
[BUFSIZ
];
168 ssize_t result
= this->peer ().recv (&len
, sizeof (ACE_TCHAR
));
171 && this->peer ().recv_n (buffer
, len
* sizeof (ACE_TCHAR
))
172 == static_cast<ssize_t
> (len
* sizeof (ACE_TCHAR
)))
174 ++this->nr_msgs_rcvd_
;
176 // Now the handle_input method has done what it can do, namely
177 // read the data from the socket we can just resume the handler
179 ACE_DEBUG ((LM_DEBUG
,
180 "(%t) svr input; fd: 0x%x; input: %s\n",
183 if (ACE_OS::strcmp (buffer
, ACE_TEXT ("shutdown")) == 0)
184 ACE_Reactor::instance()->end_reactor_event_loop ();
186 this->reactor ()->resume_handler (fd
);
191 ACE_DEBUG ((LM_DEBUG
,
192 "(%t) Errno is %d and result is %d\n",
193 ACE_ERRNO_GET
, result
));
194 ACE_DEBUG ((LM_DEBUG
,
195 "(%t) Request_Handler: 0x%x peer closed (0x%x)\n",
202 Request_Handler::handle_close (ACE_HANDLE fd
, ACE_Reactor_Mask
)
204 ACE_DEBUG ((LM_DEBUG
,
205 "(%t) svr close; fd: 0x%x, rcvd %d msgs\n",
207 this->nr_msgs_rcvd_
));
208 if (this->nr_msgs_rcvd_
!= cli_req_no
)
210 ACE_TEXT ("(%t) Handler 0x%x: Expected %d messages; got %d\n"),
213 this->nr_msgs_rcvd_
));
219 reactor_event_hook (ACE_Reactor
*)
221 ACE_DEBUG ((LM_DEBUG
,
222 "(%t) handling events ....\n"));
227 static ACE_THR_FUNC_RETURN
230 // Server thread function.
232 ACE_Reactor::instance ()->run_reactor_event_loop (&reactor_event_hook
);
235 ACE_ERROR_RETURN ((LM_ERROR
,
236 ACE_TEXT ("(%t) %p\n"),
237 ACE_TEXT ("Error handling events")),
240 ACE_DEBUG ((LM_DEBUG
,
241 "(%t) I am done handling events. Bye, bye\n"));
246 static ACE_THR_FUNC_RETURN
247 cli_worker (void *arg
)
249 // Client thread function.
250 ACE_INET_Addr
addr (rendezvous
);
251 ACE_SOCK_Stream stream
;
252 ACE_SOCK_Connector connect
;
253 ACE_Time_Value
delay (0, req_delay
);
254 size_t len
= * reinterpret_cast<ACE_TCHAR
*> (arg
);
256 for (size_t i
= 0 ; i
< cli_conn_no
; i
++)
258 if (connect
.connect (stream
, addr
) < 0)
260 ACE_ERROR ((LM_ERROR
,
261 ACE_TEXT ("(%t) %p\n"),
262 ACE_TEXT ("connect")));
266 for (size_t j
= 0; j
< cli_req_no
; j
++)
268 ACE_DEBUG ((LM_DEBUG
,
269 "(%t) conn_worker handle = %x, req = %d\n",
270 stream
.get_handle (),
272 if (stream
.send_n (arg
,
273 (len
+ 1) * sizeof (ACE_TCHAR
)) == -1)
275 ACE_ERROR ((LM_ERROR
,
276 ACE_TEXT ("(%t) %p\n"),
277 ACE_TEXT ("send_n")));
280 ACE_OS::sleep (delay
);
289 static ACE_THR_FUNC_RETURN
293 const ACE_TCHAR
*msg
= ACE_TEXT ("Message from Connection worker");
294 ACE_TCHAR buf
[BUFSIZ
];
295 buf
[0] = static_cast<ACE_TCHAR
> ((ACE_OS::strlen (msg
) + 1));
296 ACE_OS::strcpy (&buf
[1], msg
);
298 ACE_INET_Addr
addr (rendezvous
);
301 "(%t) Spawning %d client threads...\n",
303 int grp
= ACE_Thread_Manager::instance ()->spawn_n (cli_thrno
,
306 ACE_TEST_ASSERT (grp
!= -1);
308 ACE_Thread_Manager::instance ()->wait_grp (grp
);
310 ACE_DEBUG ((LM_DEBUG
,
311 "(%t) Client threads done; shutting down...\n"));
312 ACE_SOCK_Stream stream
;
313 ACE_SOCK_Connector connect
;
315 if (connect
.connect (stream
, addr
) == -1)
316 ACE_ERROR ((LM_ERROR
,
317 ACE_TEXT ("(%t) %p Error while connecting\n"),
318 ACE_TEXT ("connect")));
320 const ACE_TCHAR
*sbuf
= ACE_TEXT ("\011shutdown");
322 ACE_DEBUG ((LM_DEBUG
,
323 "shutdown stream handle = %x\n",
324 stream
.get_handle ()));
326 if (stream
.send_n (sbuf
, (ACE_OS::strlen (sbuf
) + 1) * sizeof (ACE_TCHAR
)) == -1)
327 ACE_ERROR ((LM_ERROR
,
328 ACE_TEXT ("(%t) %p\n"),
329 ACE_TEXT ("send_n")));
331 ACE_DEBUG ((LM_DEBUG
,
332 "Sent message of length = %d\n",
333 ACE_OS::strlen (sbuf
)));
340 run_main (int argc
, ACE_TCHAR
*argv
[])
342 ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Resume_Test"));
343 parse_arg (argc
, argv
);
345 // Changed the default
349 ACE_Reactor
new_reactor (&sr
);
350 ACE_Reactor::instance (&new_reactor
);
353 ACE_INET_Addr
accept_addr (rendezvous
);
355 if (acceptor
.open (accept_addr
) == -1)
356 ACE_ERROR_RETURN ((LM_ERROR
,
362 ACE_TEXT ("(%t) Spawning %d server threads...\n"),
364 ACE_Thread_Manager::instance ()->spawn_n (svr_thrno
,
366 ACE_Thread_Manager::instance ()->spawn (worker
);
368 ACE_Thread_Manager::instance ()->wait ();
376 run_main (int, ACE_TCHAR
*[])
378 ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Resume_Test"));
381 "threads/accept not supported on this platform\n"));
386 #endif /* ACE_HAS_THREADS */