Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Priority_Reactor_Test.cpp
blob37fd07c07dadd50c6245779d2e2f9c2084e7aab2
2 //=============================================================================
3 /**
4 * @file Priority_Reactor_Test.cpp
6 * This is a test of the <ACE_Priority_Reactor>. The test forks
7 * two processes (for a total of three processes) which connect to
8 * the main process and The clients send data to a connector,
9 * interestingly enough the acceptor will give more priority to
10 * the second connection, which should run always before the first
11 * one.
13 * The test itself is interesting, it shows how to write very
14 * simple <ACE_Svc_Handler>, <ACE_Connectors> and <ACE_Acceptors>.
16 * @author Carlos O'Ryan <coryan@cs.wustl.edu>
18 //=============================================================================
21 #include "test_config.h"
22 #include "ace/Get_Opt.h"
23 #include "ace/SOCK_Connector.h"
24 #include "ace/SOCK_Acceptor.h"
25 #include "ace/Acceptor.h"
26 #include "ace/Handle_Set.h"
27 #include "ace/Connector.h"
28 #include "ace/Auto_Ptr.h"
29 #include "ace/Priority_Reactor.h"
30 #include "Priority_Reactor_Test.h"
31 #include "ace/OS_NS_sys_wait.h"
32 #include "ace/OS_NS_unistd.h"
36 static const char ACE_ALPHABET[] = "abcdefghijklmnopqrstuvwxyz";
38 // The number of children to run, it can be changed using the -c
39 // option.
40 static int opt_nchildren = 10;
42 // The number of loops per children, it can be changed using the -l
43 // option.
44 static int opt_nloops = 200;
46 // If not set use the normal reactor, it can be changed using the -d
47 // option.
48 static int opt_priority_reactor = 1;
50 // Maximum time to wait for the test termination (-t)
51 static int opt_max_duration = 60;
53 // Maximum number of retries to connect, it can be changed using the
54 // -m option.
55 static int max_retries = 5;
57 typedef ACE_Connector<Write_Handler, ACE_SOCK_CONNECTOR>
58 CONNECTOR;
59 typedef ACE_Acceptor<Read_Handler, ACE_SOCK_ACCEPTOR>
60 ACCEPTOR;
62 int Read_Handler::waiting_ = 0;
63 int Read_Handler::started_ = 0;
65 void
66 Read_Handler::set_countdown (int nchildren)
68 Read_Handler::waiting_ = nchildren;
71 int
72 Read_Handler::get_countdown (void)
74 return Read_Handler::waiting_;
77 int
78 Read_Handler::open (void *)
80 if (this->peer ().enable (ACE_NONBLOCK) == -1)
81 ACE_ERROR_RETURN ((LM_ERROR,
82 ACE_TEXT ("(%P|%t) Read_Handler::open, ")
83 ACE_TEXT ("cannot set non blocking mode")),
84 -1);
86 if (reactor ()->register_handler (this, READ_MASK) == -1)
87 ACE_ERROR_RETURN ((LM_ERROR,
88 ACE_TEXT ("(%P|%t) Read_Handler::open, ")
89 ACE_TEXT ("cannot register handler")),
90 -1);
92 // A number larger than the actual number of priorities, so some
93 // clients are misbehaved, hence pusnished.
94 const int max_priority = 15;
96 this->priority (ACE_Event_Handler::LO_PRIORITY + started_ % max_priority);
97 started_++;
99 ACE_DEBUG ((LM_DEBUG,
100 ACE_TEXT ("(%P|%t) created svc_handler for handle %d ")
101 ACE_TEXT ("with priority %d\n"),
102 get_handle (),
103 priority ()));
104 return 0;
108 Read_Handler::handle_input (ACE_HANDLE h)
110 // ACE_DEBUG((LM_DEBUG,
111 // "(%P|%t) Read_Handler::handle_input (%d)\n", h));
112 ACE_UNUSED_ARG (h);
114 char buf[BUFSIZ];
116 ssize_t result = this->peer ().recv (buf, sizeof (buf));
118 if (result <= 0)
120 if (result < 0 && errno == EWOULDBLOCK)
121 return 0;
123 if (result != 0)
124 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) %p\n"),
125 ACE_TEXT ("Read_Handler::handle_input")));
126 waiting_--;
128 if (waiting_ == 0)
130 ACE_DEBUG ((LM_DEBUG,
131 ACE_TEXT ("Last svc_handler closed, shutting down\n")));
132 ACE_Reactor::instance()->end_reactor_event_loop();
135 ACE_DEBUG ((LM_DEBUG,
136 ACE_TEXT ("(%P|%t) Read_Handler::handle_input closing down\n")));
137 return -1;
140 // ACE_DEBUG((LM_DEBUG,
141 // "(%P|%t) read %d bytes from handle %d, priority %d\n",
142 // result, h, priority ()));
143 return 0;
147 Write_Handler::open (void *)
149 return 0;
153 Write_Handler::svc (void)
155 // Send several short messages, doing pauses between each message.
156 // The number of messages can be controlled from the command line.
157 ACE_Time_Value pause (0, 1000);
159 for (int i = 0; i < opt_nloops; ++i)
161 if (this->peer ().send_n (ACE_ALPHABET,
162 sizeof (ACE_ALPHABET) - 1) == -1)
163 ACE_ERROR ((LM_ERROR,
164 ACE_TEXT ("(%P|%t) %p\n"),
165 ACE_TEXT ("send_n")));
166 ACE_OS::sleep (pause);
169 return 0;
172 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
174 // Execute the client tests.
175 void *
176 client (void *arg)
178 ACE_INET_Addr *connection_addr =
179 reinterpret_cast<ACE_INET_Addr *> (arg);
180 ACE_DEBUG ((LM_DEBUG,
181 ACE_TEXT ("(%P|%t) running client\n")));
182 CONNECTOR connector;
184 Write_Handler *writer = 0;
186 // Do exponential backoff connections
187 ACE_Synch_Options options = ACE_Synch_Options::synch;
189 // Start with one msec timeouts.
190 ACE_Time_Value msec (0, 1000);
191 options.timeout (msec);
193 // Try up to <max_retries> to connect to the server.
194 for (int i = 0; i < max_retries; i++)
196 if (connector.connect (writer,
197 *connection_addr,
198 options) == -1)
200 // Double the timeout...
201 ACE_Time_Value tmp = options.timeout ();
202 tmp += options.timeout ();
203 options.timeout (tmp);
204 writer = 0;
205 ACE_DEBUG ((LM_DEBUG,
206 ACE_TEXT ("(%P|%t) still trying to connect\n")));
208 else
210 // Let the new Svc_Handler to its job...
211 writer->svc ();
213 // then close the connection and release the Svc_Handler.
214 writer->destroy ();
216 ACE_DEBUG ((LM_DEBUG,
217 ACE_TEXT ("(%P|%t) finishing client\n")));
218 return 0;
222 ACE_ERROR ((LM_ERROR,
223 ACE_TEXT ("(%P|%t) failed to connect after %d retries\n"),
224 max_retries));
225 return 0;
228 #endif
231 run_main (int argc, ACE_TCHAR *argv[])
233 ACE_START_TEST (ACE_TEXT ("Priority_Reactor_Test"));
235 //FUZZ: disable check_for_lack_ACE_OS
236 ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("dc:l:m:t:"));
238 for (int c; (c = getopt ()) != -1; )
239 switch (c)
241 //FUZZ: enable check_for_lack_ACE_OS
242 case 'd':
243 opt_priority_reactor = 0;
244 break;
245 case 'c':
246 opt_nchildren = ACE_OS::atoi (getopt.opt_arg ());
247 break;
248 case 'l':
249 opt_nloops = ACE_OS::atoi (getopt.opt_arg ());
250 break;
251 case 'm':
252 max_retries = ACE_OS::atoi (getopt.opt_arg ());
253 break;
254 case 't':
255 opt_max_duration = ACE_OS::atoi (getopt.opt_arg ());
256 break;
257 case '?':
258 default:
259 ACE_ERROR_RETURN ((LM_ERROR,
260 ACE_TEXT ("Usage: Priority_Reactor_Test ")
261 ACE_TEXT (" [-d] (disable priority reactor)\n")
262 ACE_TEXT (" [-c nchildren] (number of threads/processes)\n")
263 ACE_TEXT (" [-l loops] (number of loops per child)\n")
264 ACE_TEXT (" [-m maxretries] (attempts to connect)\n")
265 ACE_TEXT (" [-t max_time] (limits test duration)\n")),
266 -1);
267 ACE_NOTREACHED (break);
270 // Manage Reactor memory automagically.
271 // Note: If opt_priority_reactor is false, the default ACE_Reactor is used
272 // and we don't need to set one up.
273 ACE_Reactor *orig_reactor = 0;
274 auto_ptr<ACE_Reactor> reactor;
276 if (opt_priority_reactor)
278 ACE_Select_Reactor *impl_ptr;
279 ACE_NEW_RETURN (impl_ptr, ACE_Priority_Reactor, -1);
280 auto_ptr<ACE_Select_Reactor> auto_impl (impl_ptr);
282 ACE_Reactor *reactor_ptr;
283 ACE_NEW_RETURN (reactor_ptr, ACE_Reactor (impl_ptr, 1), -1);
284 auto_impl.release (); // ACE_Reactor dtor will take it from here
285 auto_ptr<ACE_Reactor> auto_reactor (reactor_ptr);
286 reactor = auto_reactor;
287 orig_reactor = ACE_Reactor::instance (reactor_ptr);
290 Read_Handler::set_countdown (opt_nchildren);
292 #ifndef ACE_LACKS_ACCEPT
294 ACCEPTOR acceptor;
296 acceptor.priority (ACE_Event_Handler::HI_PRIORITY);
297 ACE_INET_Addr server_addr;
299 // Bind acceptor to any port and then find out what the port was.
300 if (acceptor.open (ACE_sap_any_cast (const ACE_INET_Addr &)) == -1
301 || acceptor.acceptor ().get_local_addr (server_addr) == -1)
302 ACE_ERROR_RETURN ((LM_ERROR,
303 ACE_TEXT ("(%P|%t) %p\n"),
304 ACE_TEXT ("open")),
305 -1);
307 ACE_DEBUG ((LM_DEBUG,
308 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
309 server_addr.get_port_number ()));
311 ACE_INET_Addr connection_addr (server_addr.get_port_number (),
312 ACE_DEFAULT_SERVER_HOST);
314 int i;
316 #if defined (ACE_HAS_THREADS)
317 for (i = 0; i < opt_nchildren; ++i)
319 if (ACE_Thread_Manager::instance ()->spawn
320 (ACE_THR_FUNC (client),
321 (void *) &connection_addr,
322 THR_NEW_LWP | THR_DETACHED) == -1)
323 ACE_ERROR ((LM_ERROR,
324 ACE_TEXT ("(%P|%t) %p\n%a"),
325 ACE_TEXT ("thread create failed"),
326 1));
328 #elif !defined (ACE_LACKS_FORK)
329 for (i = 0; i < opt_nchildren; ++i)
331 switch (ACE_OS::fork ("child"))
333 case -1:
334 ACE_ERROR ((LM_ERROR,
335 ACE_TEXT ("(%P|%t) %p\n%a"),
336 ACE_TEXT ("fork failed"),
337 1));
338 ACE_OS::exit (-1);
339 /* NOTREACHED */
340 case 0:
341 client (&connection_addr);
342 ACE_OS::exit (0);
343 break;
344 /* NOTREACHED */
345 default:
346 break;
347 /* NOTREACHED */
350 #else
351 ACE_ERROR ((LM_INFO,
352 ACE_TEXT ("(%P|%t) ")
353 ACE_TEXT ("only one thread may be run ")
354 ACE_TEXT ("in a process on this platform\n")));
355 #endif /* ACE_HAS_THREADS */
357 ACE_Time_Value tv (opt_max_duration);
359 ACE_Reactor::instance()->register_handler
360 (&acceptor, ACE_Event_Handler::READ_MASK);
361 ACE_Reactor::instance()->run_reactor_event_loop (tv);
363 if (Read_Handler::get_countdown () != 0)
365 ACE_DEBUG ((LM_DEBUG,
366 ACE_TEXT ("(%P|%t) running out of time, ")
367 ACE_TEXT ("probably due to failed connections.\n")));
370 ACE_DEBUG ((LM_DEBUG,
371 ACE_TEXT ("(%P|%t) waiting for the children...\n")));
373 #if defined (ACE_HAS_THREADS)
374 ACE_Thread_Manager::instance ()->wait ();
375 #elif !defined (ACE_WIN32) && !defined (VXWORKS)
376 for (i = 0; i < opt_nchildren; ++i)
378 pid_t pid = ACE_OS::wait();
379 ACE_DEBUG ((LM_DEBUG,
380 ACE_TEXT ("(%P|%t) child %d terminated\n"),
381 pid));
383 #endif /* ACE_HAS_THREADS */
385 #endif // ACE_LACKS_ACCEPT
387 if (orig_reactor != 0)
388 ACE_Reactor::instance (orig_reactor);
390 ACE_END_TEST;
391 return 0;