Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Thread_Pool_Reactor_Resume_Test.cpp
blob3770f3d63ec9a7b4ef29877dc5824fa04ee56391
2 //=============================================================================
3 /**
4 * @file Thread_Pool_Reactor_Resume_Test.cpp
6 * This program is an additional torture test of thread pool
7 * reactors. This test is based on Thread_Pool_Reactor_Test.cpp
8 * in $ACE_ROOT/tests. This test differs from the other one
9 * slightly. The TP reactor is instantiated with the
10 * with a value of 1 for the <resume_flag> argument. The server
11 * threads during the handle_input call resumes the handle that
12 * would have been suspended by the reactor.
14 * Usage: Thread_Pool_Reactor_Test [-r <hostname:port#>]
15 * [-s <server thr#>] [-c <client thr#>] [-d <delay>]
16 * [-i <client conn attempt#>] [-n <client request# per conn>]
18 * Default value:
19 * <hostname:port#>: ACE_DEFAULT_RENDEZVOUS
20 * <server thr#>: ACE_MAX_THREADS
21 * <client thr#>: ACE_MAX_ITERATIONS
22 * <client conn attempt#>: ACE_MAX_ITERATIONS
23 * <client req# per conn>: ACE_MAX_THREADS
24 * <delay>: 50 usec
26 * @author Balachandran Natarajan <bala@cs.wustl.edu>
28 //=============================================================================
31 #include "test_config.h"
32 #include "ace/OS_NS_string.h"
33 #include "ace/OS_NS_unistd.h"
34 #include "ace/Get_Opt.h"
35 #include "ace/SOCK_Connector.h"
36 #include "ace/SOCK_Acceptor.h"
37 #include "ace/Acceptor.h"
38 #include "ace/Thread_Manager.h"
39 #include "ace/TP_Reactor.h"
43 #if defined (ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
45 #include "Thread_Pool_Reactor_Resume_Test.h"
46 typedef ACE_Strategy_Acceptor <Request_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR;
48 // Accepting end point. This is actually "localhost:10010", but some
49 // platform couldn't resolve the name so we use the IP address
50 // directly here.
51 static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010");
53 // Total number of server threads.
54 static size_t svr_thrno = ACE_MAX_THREADS;
57 // Default network parameters (MAX_BINDS and system buffers) are too small
58 // for full test on some platforms; add platforms that can't handle too many
59 // connection simultaneously here.
60 #if defined (ACE_VXWORKS) || defined (ACE_HAS_PHARLAP)
61 #define ACE_LOAD_FACTOR /2
62 #else
63 #define ACE_LOAD_FACTOR
64 #endif
66 // Total number of client threads.
67 static size_t cli_thrno = ACE_MAX_THREADS ACE_LOAD_FACTOR;
69 // Total connection attempts of a client thread.
70 static size_t cli_conn_no = ACE_MAX_ITERATIONS ACE_LOAD_FACTOR;
72 // Total requests a client thread sends.
73 static size_t cli_req_no = ACE_MAX_THREADS ACE_LOAD_FACTOR;
75 // Delay before a thread sending the next request (in msec.)
76 static int req_delay = 50;
78 static void
79 parse_arg (int argc, ACE_TCHAR *argv[])
81 //FUZZ: disable check_for_lack_ACE_OS
82 ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("r:s:c:d:i:n:"));
84 int c;
86 while ((c = getopt ()) != -1)
88 //FUZZ: enable check_for_lack_ACE_OS
89 switch (c)
91 case 'r': // hostname:port
92 rendezvous = getopt.opt_arg ();
93 break;
94 case 's':
95 svr_thrno = ACE_OS::atoi (getopt.opt_arg ());
96 break;
97 case 'c':
98 cli_thrno = ACE_OS::atoi (getopt.opt_arg ());
99 break;
100 case 'd':
101 req_delay = ACE_OS::atoi (getopt.opt_arg ());
102 break;
103 case 'i':
104 cli_conn_no = ACE_OS::atoi (getopt.opt_arg ());
105 break;
106 case 'n':
107 cli_req_no = ACE_OS::atoi (getopt.opt_arg ());
108 break;
109 default:
110 ACE_ERROR ((LM_ERROR,
111 "Usage: Thread_Pool_Reactor_Resume_Test [-r <hostname:port#>]"
112 "\t[-s <server thr#>] [-c <client thr#>] [-d <delay>]"
113 "\t[-i <client conn attempt#>]"
114 "[-n <client request# per conn>]\n"));
115 break;
120 Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr)
121 : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> (thr_mgr),
122 nr_msgs_rcvd_(0)
124 // Enable reference counting.
125 this->reference_counting_policy ().value
126 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
128 // Make sure we use TP_Reactor with this class (that's the whole
129 // point, right?)
130 this->reactor (ACE_Reactor::instance ());
134 Request_Handler::open (void *arg)
136 // Open base class.
137 int result =
138 ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>::open (arg);
140 // Return on error.
141 if (result == -1)
142 return -1;
144 // Else we have successfully registered with the Reactor. Give our
145 // ownership to the Reactor.
146 this->remove_reference ();
148 // Return result.
149 return result;
152 Request_Handler::~Request_Handler (void)
157 Request_Handler::resume_handler (void)
159 ACE_DEBUG ((LM_DEBUG,
160 ACE_TEXT ("(%t) resume_handler () called\n")));
161 return 1;
165 Request_Handler::handle_input (ACE_HANDLE fd)
167 ACE_TCHAR buffer[BUFSIZ];
168 ACE_TCHAR len = 0;
169 ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR));
171 if (result > 0
172 && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR))
173 == static_cast<ssize_t> (len * sizeof (ACE_TCHAR)))
175 ++this->nr_msgs_rcvd_;
177 // Now the handle_input method has done what it can do, namely
178 // read the data from the socket we can just resume the handler
179 // at this point
180 ACE_DEBUG ((LM_DEBUG,
181 "(%t) svr input; fd: 0x%x; input: %s\n",
183 buffer));
184 if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0)
185 ACE_Reactor::instance()->end_reactor_event_loop ();
187 this->reactor ()->resume_handler (fd);
188 return 0;
190 else
192 ACE_DEBUG ((LM_DEBUG,
193 "(%t) Errno is %d and result is %d\n",
194 ACE_ERRNO_GET, result));
195 ACE_DEBUG ((LM_DEBUG,
196 "(%t) Request_Handler: 0x%x peer closed (0x%x)\n",
197 this, fd));
199 return -1;
203 Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
205 ACE_DEBUG ((LM_DEBUG,
206 "(%t) svr close; fd: 0x%x, rcvd %d msgs\n",
208 this->nr_msgs_rcvd_));
209 if (this->nr_msgs_rcvd_ != cli_req_no)
210 ACE_ERROR((LM_ERROR,
211 ACE_TEXT ("(%t) Handler 0x%x: Expected %d messages; got %d\n"),
212 this,
213 cli_req_no,
214 this->nr_msgs_rcvd_));
216 return 0;
219 static int
220 reactor_event_hook (ACE_Reactor *)
222 ACE_DEBUG ((LM_DEBUG,
223 "(%t) handling events ....\n"));
225 return 0;
228 static ACE_THR_FUNC_RETURN
229 svr_worker (void *)
231 // Server thread function.
232 int result =
233 ACE_Reactor::instance ()->run_reactor_event_loop (&reactor_event_hook);
235 if (result == -1)
236 ACE_ERROR_RETURN ((LM_ERROR,
237 ACE_TEXT ("(%t) %p\n"),
238 ACE_TEXT ("Error handling events")),
241 ACE_DEBUG ((LM_DEBUG,
242 "(%t) I am done handling events. Bye, bye\n"));
244 return 0;
247 static ACE_THR_FUNC_RETURN
248 cli_worker (void *arg)
250 // Client thread function.
251 ACE_INET_Addr addr (rendezvous);
252 ACE_SOCK_Stream stream;
253 ACE_SOCK_Connector connect;
254 ACE_Time_Value delay (0, req_delay);
255 size_t len = * reinterpret_cast<ACE_TCHAR *> (arg);
257 for (size_t i = 0 ; i < cli_conn_no; i++)
259 if (connect.connect (stream, addr) < 0)
261 ACE_ERROR ((LM_ERROR,
262 ACE_TEXT ("(%t) %p\n"),
263 ACE_TEXT ("connect")));
264 continue;
267 for (size_t j = 0; j < cli_req_no; j++)
269 ACE_DEBUG ((LM_DEBUG,
270 "(%t) conn_worker handle = %x, req = %d\n",
271 stream.get_handle (),
272 j+1));
273 if (stream.send_n (arg,
274 (len + 1) * sizeof (ACE_TCHAR)) == -1)
276 ACE_ERROR ((LM_ERROR,
277 ACE_TEXT ("(%t) %p\n"),
278 ACE_TEXT ("send_n")));
279 continue;
281 ACE_OS::sleep (delay);
284 stream.close ();
287 return 0;
290 static ACE_THR_FUNC_RETURN
291 worker (void *)
293 ACE_OS::sleep (3);
294 const ACE_TCHAR *msg = ACE_TEXT ("Message from Connection worker");
295 ACE_TCHAR buf [BUFSIZ];
296 buf[0] = static_cast<ACE_TCHAR> ((ACE_OS::strlen (msg) + 1));
297 ACE_OS::strcpy (&buf[1], msg);
299 ACE_INET_Addr addr (rendezvous);
301 ACE_DEBUG((LM_DEBUG,
302 "(%t) Spawning %d client threads...\n",
303 cli_thrno));
304 int grp = ACE_Thread_Manager::instance ()->spawn_n (cli_thrno,
305 &cli_worker,
306 buf);
307 ACE_TEST_ASSERT (grp != -1);
309 ACE_Thread_Manager::instance ()->wait_grp (grp);
311 ACE_DEBUG ((LM_DEBUG,
312 "(%t) Client threads done; shutting down...\n"));
313 ACE_SOCK_Stream stream;
314 ACE_SOCK_Connector connect;
316 if (connect.connect (stream, addr) == -1)
317 ACE_ERROR ((LM_ERROR,
318 ACE_TEXT ("(%t) %p Error while connecting\n"),
319 ACE_TEXT ("connect")));
321 const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown");
323 ACE_DEBUG ((LM_DEBUG,
324 "shutdown stream handle = %x\n",
325 stream.get_handle ()));
327 if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1)
328 ACE_ERROR ((LM_ERROR,
329 ACE_TEXT ("(%t) %p\n"),
330 ACE_TEXT ("send_n")));
332 ACE_DEBUG ((LM_DEBUG,
333 "Sent message of length = %d\n",
334 ACE_OS::strlen (sbuf)));
335 stream.close ();
337 return 0;
341 run_main (int argc, ACE_TCHAR *argv[])
343 ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Resume_Test"));
344 parse_arg (argc, argv);
346 // Changed the default
347 ACE_TP_Reactor sr;
350 ACE_Reactor new_reactor (&sr);
351 ACE_Reactor::instance (&new_reactor);
353 ACCEPTOR acceptor;
354 ACE_INET_Addr accept_addr (rendezvous);
356 if (acceptor.open (accept_addr) == -1)
357 ACE_ERROR_RETURN ((LM_ERROR,
358 ACE_TEXT ("%p\n"),
359 ACE_TEXT ("open")),
362 ACE_DEBUG((LM_DEBUG,
363 ACE_TEXT ("(%t) Spawning %d server threads...\n"),
364 svr_thrno));
365 ACE_Thread_Manager::instance ()->spawn_n (svr_thrno,
366 svr_worker);
367 ACE_Thread_Manager::instance ()->spawn (worker);
369 ACE_Thread_Manager::instance ()->wait ();
371 ACE_END_TEST;
372 return 0;
375 #else
377 run_main (int, ACE_TCHAR *[])
379 ACE_START_TEST (ACE_TEXT ("Thread_Pool_Reactor_Resume_Test"));
381 ACE_ERROR ((LM_INFO,
382 "threads/accept not supported on this platform\n"));
384 ACE_END_TEST;
385 return 0;
387 #endif /* ACE_HAS_THREADS */