Merge pull request #2220 from DOCGroup/revert-2217-jwi-inetwraning
[ACE_TAO.git] / ACE / tests / Priority_Reactor_Test.cpp
blob259de386a6994010cd94c55c192d033393cc266f
1 //=============================================================================
2 /**
3 * @file Priority_Reactor_Test.cpp
5 * This is a test of the <ACE_Priority_Reactor>. The test forks
6 * two processes (for a total of three processes) which connect to
7 * the main process and The clients send data to a connector,
8 * interestingly enough the acceptor will give more priority to
9 * the second connection, which should run always before the first
10 * one.
12 * The test itself is interesting, it shows how to write very
13 * simple <ACE_Svc_Handler>, <ACE_Connectors> and <ACE_Acceptors>.
15 * @author Carlos O'Ryan <coryan@cs.wustl.edu>
17 //=============================================================================
19 #include "test_config.h"
20 #include "ace/Get_Opt.h"
21 #include "ace/SOCK_Connector.h"
22 #include "ace/SOCK_Acceptor.h"
23 #include "ace/Acceptor.h"
24 #include "ace/Handle_Set.h"
25 #include "ace/Connector.h"
26 #include <memory>
27 #include "ace/Priority_Reactor.h"
28 #include "Priority_Reactor_Test.h"
30 #include "ace/OS_NS_sys_wait.h"
31 #include "ace/OS_NS_unistd.h"
32 #include <utility>
35 static const char ACE_ALPHABET[] = "abcdefghijklmnopqrstuvwxyz";
37 // The number of children to run, it can be changed using the -c
38 // option.
39 static int opt_nchildren = 10;
41 // The number of loops per children, it can be changed using the -l
42 // option.
43 static int opt_nloops = 200;
45 // If not set use the normal reactor, it can be changed using the -d
46 // option.
47 static int opt_priority_reactor = 1;
49 // Maximum time to wait for the test termination (-t)
50 static int opt_max_duration = 60;
52 // Maximum number of retries to connect, it can be changed using the
53 // -m option.
54 static int max_retries = 5;
56 using CONNECTOR = ACE_Connector<Write_Handler, ACE_SOCK_Connector>;
57 using ACCEPTOR = ACE_Acceptor<Read_Handler, ACE_SOCK_Acceptor>;
59 int Read_Handler::waiting_ = 0;
60 int Read_Handler::started_ = 0;
62 void
63 Read_Handler::set_countdown (int nchildren)
65 Read_Handler::waiting_ = nchildren;
68 int
69 Read_Handler::get_countdown ()
71 return Read_Handler::waiting_;
74 int
75 Read_Handler::open (void *)
77 if (this->peer ().enable (ACE_NONBLOCK) == -1)
78 ACE_ERROR_RETURN ((LM_ERROR,
79 ACE_TEXT ("(%P|%t) Read_Handler::open, ")
80 ACE_TEXT ("cannot set non blocking mode")),
81 -1);
83 if (reactor ()->register_handler (this, READ_MASK) == -1)
84 ACE_ERROR_RETURN ((LM_ERROR,
85 ACE_TEXT ("(%P|%t) Read_Handler::open, ")
86 ACE_TEXT ("cannot register handler")),
87 -1);
89 // A number larger than the actual number of priorities, so some
90 // clients are misbehaved, hence pusnished.
91 const int max_priority = 15;
93 this->priority (ACE_Event_Handler::LO_PRIORITY + started_ % max_priority);
94 started_++;
96 ACE_DEBUG ((LM_DEBUG,
97 ACE_TEXT ("(%P|%t) created svc_handler for handle %d ")
98 ACE_TEXT ("with priority %d\n"),
99 get_handle (),
100 priority ()));
101 return 0;
105 Read_Handler::handle_input (ACE_HANDLE h)
107 // ACE_DEBUG((LM_DEBUG,
108 // "(%P|%t) Read_Handler::handle_input (%d)\n", h));
109 ACE_UNUSED_ARG (h);
111 char buf[BUFSIZ];
113 ssize_t result = this->peer ().recv (buf, sizeof (buf));
115 if (result <= 0)
117 if (result < 0 && errno == EWOULDBLOCK)
118 return 0;
120 if (result != 0)
121 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) %p\n"),
122 ACE_TEXT ("Read_Handler::handle_input")));
123 waiting_--;
125 if (waiting_ == 0)
127 ACE_DEBUG ((LM_DEBUG,
128 ACE_TEXT ("Last svc_handler closed, shutting down\n")));
129 ACE_Reactor::instance()->end_reactor_event_loop();
132 ACE_DEBUG ((LM_DEBUG,
133 ACE_TEXT ("(%P|%t) Read_Handler::handle_input closing down\n")));
134 return -1;
137 // ACE_DEBUG((LM_DEBUG,
138 // "(%P|%t) read %d bytes from handle %d, priority %d\n",
139 // result, h, priority ()));
140 return 0;
144 Write_Handler::open (void *)
146 return 0;
150 Write_Handler::svc ()
152 // Send several short messages, doing pauses between each message.
153 // The number of messages can be controlled from the command line.
154 ACE_Time_Value pause (0, 1000);
156 for (int i = 0; i < opt_nloops; ++i)
158 if (this->peer ().send_n (ACE_ALPHABET,
159 sizeof (ACE_ALPHABET) - 1) == -1)
160 ACE_ERROR ((LM_ERROR,
161 ACE_TEXT ("(%P|%t) %p\n"),
162 ACE_TEXT ("send_n")));
163 ACE_OS::sleep (pause);
166 return 0;
169 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
171 // Execute the client tests.
172 void *
173 client (void *arg)
175 ACE_INET_Addr *connection_addr =
176 reinterpret_cast<ACE_INET_Addr *> (arg);
177 ACE_DEBUG ((LM_DEBUG,
178 ACE_TEXT ("(%P|%t) running client\n")));
179 CONNECTOR connector;
181 Write_Handler *writer = 0;
183 // Do exponential backoff connections
184 ACE_Synch_Options options = ACE_Synch_Options::synch;
186 // Start with one msec timeouts.
187 ACE_Time_Value msec (0, 1000);
188 options.timeout (msec);
190 // Try up to <max_retries> to connect to the server.
191 for (int i = 0; i < max_retries; i++)
193 if (connector.connect (writer,
194 *connection_addr,
195 options) == -1)
197 // Double the timeout...
198 ACE_Time_Value tmp = options.timeout ();
199 tmp += options.timeout ();
200 options.timeout (tmp);
201 writer = 0;
202 ACE_DEBUG ((LM_DEBUG,
203 ACE_TEXT ("(%P|%t) still trying to connect\n")));
205 else
207 // Let the new Svc_Handler to its job...
208 writer->svc ();
210 // then close the connection and release the Svc_Handler.
211 writer->destroy ();
213 ACE_DEBUG ((LM_DEBUG,
214 ACE_TEXT ("(%P|%t) finishing client\n")));
215 return 0;
219 ACE_ERROR ((LM_ERROR,
220 ACE_TEXT ("(%P|%t) failed to connect after %d retries\n"),
221 max_retries));
222 return 0;
225 #endif
228 run_main (int argc, ACE_TCHAR *argv[])
230 ACE_START_TEST (ACE_TEXT ("Priority_Reactor_Test"));
232 //FUZZ: disable check_for_lack_ACE_OS
233 ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("dc:l:m:t:"));
235 for (int c; (c = getopt ()) != -1; )
236 switch (c)
238 //FUZZ: enable check_for_lack_ACE_OS
239 case 'd':
240 opt_priority_reactor = 0;
241 break;
242 case 'c':
243 opt_nchildren = ACE_OS::atoi (getopt.opt_arg ());
244 break;
245 case 'l':
246 opt_nloops = ACE_OS::atoi (getopt.opt_arg ());
247 break;
248 case 'm':
249 max_retries = ACE_OS::atoi (getopt.opt_arg ());
250 break;
251 case 't':
252 opt_max_duration = ACE_OS::atoi (getopt.opt_arg ());
253 break;
254 case '?':
255 default:
256 ACE_ERROR_RETURN ((LM_ERROR,
257 ACE_TEXT ("Usage: Priority_Reactor_Test ")
258 ACE_TEXT (" [-d] (disable priority reactor)\n")
259 ACE_TEXT (" [-c nchildren] (number of threads/processes)\n")
260 ACE_TEXT (" [-l loops] (number of loops per child)\n")
261 ACE_TEXT (" [-m maxretries] (attempts to connect)\n")
262 ACE_TEXT (" [-t max_time] (limits test duration)\n")),
263 -1);
264 ACE_NOTREACHED (break);
267 // Manage Reactor memory automagically.
268 // Note: If opt_priority_reactor is false, the default ACE_Reactor is used
269 // and we don't need to set one up.
270 ACE_Reactor *orig_reactor = 0;
271 std::unique_ptr<ACE_Reactor> reactor;
273 if (opt_priority_reactor)
275 ACE_Select_Reactor *impl_ptr;
276 ACE_NEW_RETURN (impl_ptr, ACE_Priority_Reactor, -1);
277 std::unique_ptr<ACE_Select_Reactor> auto_impl (impl_ptr);
279 ACE_Reactor *reactor_ptr;
280 ACE_NEW_RETURN (reactor_ptr, ACE_Reactor (impl_ptr, 1), -1);
281 auto_impl.release (); // ACE_Reactor dtor will take it from here
282 std::unique_ptr<ACE_Reactor> auto_reactor (reactor_ptr);
283 reactor = std::move(auto_reactor);
284 orig_reactor = ACE_Reactor::instance (reactor_ptr);
287 Read_Handler::set_countdown (opt_nchildren);
289 #ifndef ACE_LACKS_ACCEPT
291 ACCEPTOR acceptor;
293 acceptor.priority (ACE_Event_Handler::HI_PRIORITY);
294 ACE_INET_Addr server_addr;
296 // Bind acceptor to any port and then find out what the port was.
297 if (acceptor.open (ACE_sap_any_cast (const ACE_INET_Addr &)) == -1
298 || acceptor.acceptor ().get_local_addr (server_addr) == -1)
299 ACE_ERROR_RETURN ((LM_ERROR,
300 ACE_TEXT ("(%P|%t) %p\n"),
301 ACE_TEXT ("open")),
302 -1);
304 ACE_DEBUG ((LM_DEBUG,
305 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
306 server_addr.get_port_number ()));
308 ACE_INET_Addr connection_addr (server_addr.get_port_number (),
309 ACE_DEFAULT_SERVER_HOST);
311 int i;
313 #if defined (ACE_HAS_THREADS)
314 for (i = 0; i < opt_nchildren; ++i)
316 if (ACE_Thread_Manager::instance ()->spawn
317 (ACE_THR_FUNC (client),
318 (void *) &connection_addr,
319 THR_NEW_LWP | THR_DETACHED) == -1)
320 ACE_ERROR ((LM_ERROR,
321 ACE_TEXT ("(%P|%t) %p\n%a"),
322 ACE_TEXT ("thread create failed"),
323 1));
325 #elif !defined (ACE_LACKS_FORK)
326 for (i = 0; i < opt_nchildren; ++i)
328 switch (ACE_OS::fork ("child"))
330 case -1:
331 ACE_ERROR ((LM_ERROR,
332 ACE_TEXT ("(%P|%t) %p\n%a"),
333 ACE_TEXT ("fork failed"),
334 1));
335 ACE_OS::exit (-1);
336 /* NOTREACHED */
337 case 0:
338 client (&connection_addr);
339 ACE_OS::exit (0);
340 break;
341 /* NOTREACHED */
342 default:
343 break;
344 /* NOTREACHED */
347 #else
348 ACE_ERROR ((LM_INFO,
349 ACE_TEXT ("(%P|%t) ")
350 ACE_TEXT ("only one thread may be run ")
351 ACE_TEXT ("in a process on this platform\n")));
352 #endif /* ACE_HAS_THREADS */
354 ACE_Time_Value tv (opt_max_duration);
356 ACE_Reactor::instance()->register_handler
357 (&acceptor, ACE_Event_Handler::READ_MASK);
358 ACE_Reactor::instance()->run_reactor_event_loop (tv);
360 if (Read_Handler::get_countdown () != 0)
362 ACE_DEBUG ((LM_DEBUG,
363 ACE_TEXT ("(%P|%t) running out of time, ")
364 ACE_TEXT ("probably due to failed connections.\n")));
367 ACE_DEBUG ((LM_DEBUG,
368 ACE_TEXT ("(%P|%t) waiting for the children...\n")));
370 #if defined (ACE_HAS_THREADS)
371 ACE_Thread_Manager::instance ()->wait ();
372 #elif !defined (ACE_WIN32) && !defined (VXWORKS)
373 for (i = 0; i < opt_nchildren; ++i)
375 pid_t pid = ACE_OS::wait();
376 ACE_DEBUG ((LM_DEBUG,
377 ACE_TEXT ("(%P|%t) child %d terminated\n"),
378 pid));
380 #endif /* ACE_HAS_THREADS */
382 #endif // ACE_LACKS_ACCEPT
384 if (orig_reactor != 0)
385 ACE_Reactor::instance (orig_reactor);
387 ACE_END_TEST;
388 return 0;