Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Thread_Pool_Reactor_Test.cpp
blob15d718d6c1ec15dbba9153e5b39aa14b6da9df0e
2 //=============================================================================
3 /**
4 * @file Thread_Pool_Reactor_Test.cpp
6 * This program is a torture test of thread pool reactors. It
7 * starts by spawning several server threads waiting to handle
8 * events. Several other client threads are spawned right after
9 * to initiate connections to server threads. Each connection
10 * adds a new Svc_Handler into the TP_Reactor and sends out
11 * several "requests" to the server thread. After the connection
12 * is closed, the Svc_Handler is removed from the TP_Reactor.
13 * Each message is treated as a separate request by the server so
14 * two consecutive requests might be serviced by two different
15 * threads.
17 * Usage: Thread_Pool_Reactor_Test [-r <hostname:port#>]
18 * [-s <server thr#>] [-c <client thr#>] [-d <delay>]
19 * [-i <client conn attempt#>] [-n <client request# per conn>]
21 * Default value:
22 * <hostname:port#>: ACE_DEFAULT_RENDEZVOUS
23 * <server thr#>: ACE_MAX_THREADS
24 * <client thr#>: ACE_MAX_ITERATIONS
25 * <client conn attempt#>: ACE_MAX_ITERATIONS
26 * <client req# per conn>: ACE_MAX_THREADS
27 * <delay>: 50 usec
29 * @author Irfan Pyarali <irfan@cs.wustl.edu> and Nanbor Wang <nanbor@cs.wustl.edu>
31 //=============================================================================
34 #include "test_config.h"
35 #include "ace/OS_NS_string.h"
36 #include "ace/OS_NS_unistd.h"
37 #include "ace/Get_Opt.h"
38 #include "ace/SOCK_Connector.h"
39 #include "ace/SOCK_Acceptor.h"
40 #include "ace/Acceptor.h"
41 #include "ace/Thread_Manager.h"
42 #include "ace/TP_Reactor.h"
46 #if defined (ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
48 #include "Thread_Pool_Reactor_Test.h"
49 typedef ACE_Strategy_Acceptor <Request_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR;
51 // Accepting end point. This is actually "localhost:10010", but some
52 // platform couldn't resolve the name so we use the IP address
53 // directly here.
54 static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010");
56 // Total number of server threads.
57 static size_t svr_thrno = ACE_MAX_THREADS;
59 // Default network parameters (MAX_BINDS and system buffers) are too small
60 // for full test on some platforms; add platforms that can't handle too many
61 // connection simultaneously here.
62 #if defined (ACE_VXWORKS) || defined (ACE_HAS_PHARLAP)
63 #define ACE_LOAD_FACTOR /2
64 #else
65 #define ACE_LOAD_FACTOR
66 #endif
68 // Total number of client threads.
69 static size_t cli_thrno = ACE_MAX_THREADS ACE_LOAD_FACTOR;
71 // Total connection attempts of a client thread.
72 static size_t cli_conn_no = ACE_MAX_ITERATIONS ACE_LOAD_FACTOR;
74 // Total requests a client thread sends.
75 static size_t cli_req_no = ACE_MAX_THREADS ACE_LOAD_FACTOR;
77 // Delay before a thread sending the next request (in msec.)
78 static int req_delay = 50;
80 static void
81 parse_arg (int argc, ACE_TCHAR *argv[])
83 //FUZZ: disable check_for_lack_ACE_OS
84 ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("r:s:c:d:i:n:"));
86 int c;
88 while ((c = getopt ()) != -1)
90 //FUZZ: enable check_for_lack_ACE_OS
91 switch (c)
93 case 'r': // hostname:port
94 rendezvous = getopt.opt_arg ();
95 break;
96 case 's':
97 svr_thrno = ACE_OS::atoi (getopt.opt_arg ());
98 break;
99 case 'c':
100 cli_thrno = ACE_OS::atoi (getopt.opt_arg ());
101 break;
102 case 'd':
103 req_delay = ACE_OS::atoi (getopt.opt_arg ());
104 break;
105 case 'i':
106 cli_conn_no = ACE_OS::atoi (getopt.opt_arg ());
107 break;
108 case 'n':
109 cli_req_no = ACE_OS::atoi (getopt.opt_arg ());
110 break;
111 default:
112 ACE_ERROR ((LM_ERROR,
113 "Usage: Thread_Pool_Reactor_Test [-r <hostname:port#>]"
114 "\t[-s <server thr#>] [-c <client thr#>] [-d <delay>]"
115 "\t[-i <client conn attempt#>]"
116 "[-n <client request# per conn>]\n"));
117 break;
122 Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr)
123 : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> (thr_mgr),
124 nr_msgs_rcvd_(0)
126 // Make sure we use TP_Reactor with this class (that's the whole
127 // point, right?)
128 this->reactor (ACE_Reactor::instance ());
132 Request_Handler::handle_input (ACE_HANDLE fd)
134 ACE_TCHAR buffer[BUFSIZ];
135 ACE_TCHAR len = 0;
136 ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR));
138 if (result > 0
139 && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR))
140 == static_cast<ssize_t> (len * sizeof (ACE_TCHAR)))
142 ++this->nr_msgs_rcvd_;
144 ACE_DEBUG ((LM_DEBUG,
145 "(%t) svr input; fd: 0x%x; input: %s\n",
147 buffer));
148 if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0)
149 ACE_Reactor::instance()->end_reactor_event_loop ();
150 return 0;
152 else
153 ACE_DEBUG ((LM_DEBUG,
154 "(%t) Request_Handler: 0x%x peer closed (0x%x)\n",
155 this, fd));
156 return -1;
160 Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
162 ACE_DEBUG ((LM_DEBUG,
163 "(%t) svr close; fd: 0x%x, rcvd %d msgs\n",
165 this->nr_msgs_rcvd_));
166 if (this->nr_msgs_rcvd_ != cli_req_no)
167 ACE_ERROR((LM_ERROR,
168 "(%t) Handler 0x%x: Expected %d messages; got %d\n",
169 this,
170 cli_req_no,
171 this->nr_msgs_rcvd_));
172 this->destroy ();
173 return 0;
176 static int
177 reactor_event_hook (ACE_Reactor *)
179 ACE_DEBUG ((LM_DEBUG,
180 "(%t) handling events ....\n"));
182 return 0;
185 static ACE_THR_FUNC_RETURN
186 svr_worker (void *)
188 // Server thread function.
189 int result =
190 ACE_Reactor::instance ()->run_reactor_event_loop (&reactor_event_hook);
192 if (result == -1)
193 ACE_ERROR_RETURN ((LM_ERROR,
194 "(%t) %p\n",
195 "Error handling events"),
198 ACE_DEBUG ((LM_DEBUG,
199 "(%t) I am done handling events. Bye, bye\n"));
201 return 0;
204 static ACE_THR_FUNC_RETURN
205 cli_worker (void *arg)
207 // Client thread function.
208 ACE_INET_Addr addr (rendezvous);
209 ACE_SOCK_Stream stream;
210 ACE_SOCK_Connector connect;
211 ACE_Time_Value delay (0, req_delay);
212 size_t len = * reinterpret_cast<ACE_TCHAR *> (arg);
214 for (size_t i = 0 ; i < cli_conn_no; i++)
216 if (connect.connect (stream, addr) < 0)
218 ACE_ERROR ((LM_ERROR,
219 "(%t) %p\n",
220 "connect"));
221 continue;
224 for (size_t j = 0; j < cli_req_no; j++)
226 ACE_DEBUG ((LM_DEBUG,
227 "(%t) conn_worker handle 0x%x, req %d\n",
228 stream.get_handle (),
229 j+1));
230 if (stream.send_n (arg,
231 (len + 1) * sizeof (ACE_TCHAR)) == -1)
233 ACE_ERROR ((LM_ERROR,
234 "(%t) %p\n",
235 "send_n"));
236 continue;
238 ACE_OS::sleep (delay);
241 stream.close ();
244 return 0;
247 static ACE_THR_FUNC_RETURN
248 worker (void *)
250 ACE_OS::sleep (3);
251 const ACE_TCHAR *msg = ACE_TEXT ("Message from Connection worker");
252 ACE_TCHAR buf [BUFSIZ];
253 buf[0] = static_cast<ACE_TCHAR> ((ACE_OS::strlen (msg) + 1));
254 ACE_OS::strcpy (&buf[1], msg);
256 ACE_INET_Addr addr (rendezvous);
258 ACE_DEBUG((LM_DEBUG,
259 "(%t) Spawning %d client threads...\n",
260 cli_thrno));
261 int grp = ACE_Thread_Manager::instance ()->spawn_n (cli_thrno,
262 &cli_worker,
263 buf);
264 ACE_TEST_ASSERT (grp != -1);
266 ACE_Thread_Manager::instance ()->wait_grp (grp);
268 ACE_DEBUG ((LM_DEBUG,
269 "(%t) Client threads done; shutting down...\n"));
270 ACE_SOCK_Stream stream;
271 ACE_SOCK_Connector connect;
273 if (connect.connect (stream, addr) == -1)
274 ACE_ERROR ((LM_ERROR,
275 "(%t) %p Error while connecting\n",
276 "connect"));
278 const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown");
280 ACE_DEBUG ((LM_DEBUG,
281 "shutdown stream handle = %x\n",
282 stream.get_handle ()));
284 if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1)
285 ACE_ERROR ((LM_ERROR,
286 "(%t) %p\n",
287 "send_n"));
289 stream.close ();
291 return 0;
295 run_main (int argc, ACE_TCHAR *argv[])
297 ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Test"));
298 parse_arg (argc, argv);
300 // Changed the default
301 ACE_TP_Reactor sr;
302 ACE_Reactor new_reactor (&sr);
303 ACE_Reactor::instance (&new_reactor);
305 ACCEPTOR acceptor;
306 ACE_INET_Addr accept_addr (rendezvous);
308 if (acceptor.open (accept_addr) == -1)
309 ACE_ERROR_RETURN ((LM_ERROR,
310 ACE_TEXT ("%p\n"),
311 ACE_TEXT ("open")),
314 ACE_DEBUG((LM_DEBUG,
315 ACE_TEXT ("(%t) Spawning %d server threads...\n"),
316 svr_thrno));
317 ACE_Thread_Manager::instance ()->spawn_n (svr_thrno,
318 svr_worker);
319 ACE_Thread_Manager::instance ()->spawn (worker);
321 ACE_Thread_Manager::instance ()->wait ();
323 ACE_END_TEST;
324 return 0;
327 #else
329 run_main (int, ACE_TCHAR *[])
331 ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Test"));
333 ACE_ERROR ((LM_INFO,
334 "threads/accept not supported on this platform\n"));
336 ACE_END_TEST;
337 return 0;
339 #endif /* ACE_HAS_THREADS */