Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / ACE / tests / Reactor_Fairness_Test.cpp
blobf84ceafd4495c93afb8815d1d3d392aa7b08f17d
2 //=============================================================================
3 /**
4 * @file Reactor_Fairness_Test.cpp
6 * This test is used to time the dispatching mechanisms of the
7 * <ACE_Reactor>s. Both the <ACE_WFMO_Reactor> and
8 * <ACE_Select_Reactor> can be tested.
10 * @author Irfan Pyarali <irfan@cs.wustl.edu>
12 //=============================================================================
14 #include "test_config.h"
15 #include "Reactor_Fairness_Test.h"
16 #include "ace/Get_Opt.h"
17 #include "ace/SOCK_Connector.h"
18 #include "ace/SOCK_Acceptor.h"
19 #include "ace/Acceptor.h"
20 #include "ace/Reactor.h"
21 #include "ace/Dev_Poll_Reactor.h"
22 #include "ace/WFMO_Reactor.h"
23 #include "ace/Select_Reactor.h"
24 #include "ace/TP_Reactor.h"
25 #include <memory>
26 #include "ace/Numeric_Limits.h"
27 #include "ace/Signal.h"
28 #include "ace/Atomic_Op.h"
29 #include "ace/Thread_Mutex.h"
31 #if defined (ACE_HAS_THREADS)
33 namespace {
34 const char ACE_ALPHABET[] = "abcdefghijklmnopqrstuvwxyz";
36 // Number of connections to run
37 int opt_nconnections = 5;
39 // How many seconds to run the test on each reactor
40 int opt_secs = 30;
42 // How many thread to run in the reactor loop
43 int opt_reactor_threads = 3;
45 // Extra debug messages
46 int opt_debug = 0;
48 ACE_Atomic_Op<ACE_Thread_Mutex, int> reactor_thread_nr = 0;
50 // Class to collect and report on data handling for each test pass.
51 struct Result_Set {
52 int nr_conns;
53 using report_map = ACE_Array_Map<ACE_HANDLE, unsigned int>;
54 report_map reports;
56 void reset (int n_connections) // Reset for next run
58 reports.clear ();
59 nr_conns = n_connections;
62 void report (ACE_HANDLE h, unsigned int chunks)
64 std::pair<ACE_HANDLE, unsigned int> newval (h, chunks);
65 reports.insert (newval);
68 // Return 1 if this looks like a failure wrt fairness.
69 int analyze_reports ()
71 ACE_DEBUG ((LM_DEBUG,
72 ACE_TEXT ("Results (%d entries):\n"),
73 reports.size()));
74 unsigned int max_chunks = 0;
75 unsigned int min_chunks = ACE_Numeric_Limits<unsigned int>::max();
76 for (report_map::iterator iter = reports.begin();
77 iter != reports.end ();
78 ++iter)
80 ACE_DEBUG ((LM_DEBUG,
81 ACE_TEXT (" handle %d: %u\n"),
82 (*iter).first, (*iter).second));
83 if ((*iter).second > max_chunks)
84 max_chunks = (*iter).second;
85 if ((*iter).second < min_chunks)
86 min_chunks = (*iter).second;
88 if ((max_chunks - min_chunks) > max_chunks / 10)
89 ACE_ERROR_RETURN ((LM_ERROR,
90 ACE_TEXT ("Too much unfairness (max %u, min %u)\n"),
91 max_chunks,
92 min_chunks),
93 1);
94 return 0;
97 Result_Set results;
100 // Handle incoming data
102 Read_Handler::handle_input (ACE_HANDLE h)
104 char buf[BUFSIZ];
105 ssize_t result = this->peer ().recv (buf, ACE_OS::strlen(ACE_ALPHABET));
106 if (opt_debug)
107 ACE_DEBUG((LM_DEBUG,
108 ACE_TEXT ("(%t) Read_Handler::handle_input h %d, result %b\n"),
109 h, result));
110 if (result > 0)
112 if (opt_debug)
114 buf[result] = 0;
115 ACE_DEBUG ((LM_DEBUG,
116 ACE_TEXT ("(%t) Read_Handler::handle_input: h %d: %C\n"),
118 buf));
120 ++this->chunks_in;
122 else if (result < 0)
124 if (errno == EWOULDBLOCK)
125 return 0;
126 else
128 ACE_ERROR ((LM_ERROR,
129 ACE_TEXT ("handle_input: h %d: %p (errno: %d)\n"),
130 h, ACE_TEXT ("recv"), ACE_ERRNO_GET));
132 // This will cause handle_close to get called.
133 return -1;
136 else // result == 0
138 if (opt_debug)
139 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Handle %d closing\n"), h));
140 // This will cause handle_close to get called.
141 return -1;
144 return 0;
147 // Handle connection shutdown.
150 Read_Handler::handle_close (ACE_HANDLE handle,
151 ACE_Reactor_Mask /*close_mask*/)
153 ACE_DEBUG ((LM_DEBUG,
154 ACE_TEXT ("(%t) Read_Handler handle %d close; %u chunks\n"),
155 handle, chunks_in));
156 results.report (handle, this->chunks_in);
158 // Shutdown
159 this->destroy ();
160 return 0;
163 // Pump data as fast as possible to all the sockets.
164 ACE_THR_FUNC_RETURN
165 sender (void *arg)
167 ACE_DEBUG ((LM_DEBUG,
168 ACE_TEXT ("(%t) running sender\n")));
170 // Ensure an error, not a signal, on broken pipe.
171 ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);
172 ACE_Sig_Action original_action;
173 no_sigpipe.register_action (SIGPIPE, &original_action);
175 ACE_INET_Addr *connection_addr =
176 reinterpret_cast<ACE_INET_Addr *> (arg);
178 int i;
180 // Automagic memory cleanup.
181 ACE_SOCK_Stream *temp_socks = 0;
182 ACE_NEW_RETURN (temp_socks,
183 ACE_SOCK_Stream [opt_nconnections],
185 std::unique_ptr <ACE_SOCK_Stream[]> socks (temp_socks);
187 // Connection all <opt_nconnections> connections before sending data.
188 ACE_SOCK_Connector c;
189 for (i = 0; i < opt_nconnections; i++)
191 if (c.connect (socks[i], *connection_addr) == -1)
193 if (errno != ECONNREFUSED || i == 0)
195 ACE_ERROR ((LM_ERROR,
196 ACE_TEXT ("(%t) conn %d %p\n"),
197 ACE_TEXT ("connect")));
198 while (--i >= 0)
199 socks[i].close ();
200 break;
203 socks[i].enable (ACE_NONBLOCK);
205 if (i < opt_nconnections)
206 return 0;
208 // Keep blasting data on all possible connections until this thread
209 // is canceled. If we manage to overrun the receiver on all sockets,
210 // sleep a bit for the receivers to catch up.
211 ACE_thread_t me = ACE_Thread::self ();
212 ACE_Thread_Manager *tm = ACE_Thread_Manager::instance ();
213 size_t send_cnt = ACE_OS::strlen (ACE_ALPHABET);
214 bool fail = false;
215 while (!tm->testcancel (me) && !fail)
217 bool sent_something = false;
218 for (i = 0; i < opt_nconnections; i++)
220 ssize_t cnt = socks[i].send (ACE_ALPHABET, send_cnt);
221 if (opt_debug)
222 ACE_DEBUG ((LM_DEBUG,
223 ACE_TEXT ("(%t) h %d sent %b\n"),
224 socks[i].get_handle(),
225 cnt));
226 if (cnt > 0)
228 sent_something = true;
229 continue;
231 if (errno == EWOULDBLOCK)
232 continue;
233 ACE_ERROR ((LM_ERROR,
234 ACE_TEXT ("(%t) %p; giving up\n"),
235 ACE_TEXT ("sender")));
236 fail = true;
237 break;
239 if (!fail && !sent_something)
241 ACE_DEBUG ((LM_DEBUG,
242 ACE_TEXT ("(%t) Full sockets... pausing...\n")));
243 ACE_OS::sleep (1);
244 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Resuming sending.\n")));
248 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Done sending.\n")));
249 for (i = 0; i < opt_nconnections; i++)
250 socks[i].close ();
251 return 0;
254 ACE_THR_FUNC_RETURN
255 reactor_loop (void *p)
257 ACE_Reactor *r = reinterpret_cast<ACE_Reactor *> (p);
258 int me = reactor_thread_nr++;
259 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Reactor loop %d starting...\n"), me));
260 if (me == 0)
261 r->owner (ACE_Thread::self ());
262 if (r->run_reactor_event_loop () == -1)
263 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("reactor")));
264 else
265 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) reactor thread %d ending\n"), me));
266 return 0;
269 void
270 run (ACE_Reactor_Impl &ri, const ACE_TCHAR *what, bool tp = true)
272 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Starting test with %s\n"), what));
274 ACE_Reactor r (&ri);
275 ACE_Thread_Manager *tm = ACE_Thread_Manager::instance ();
276 ACE_Acceptor<Read_Handler, ACE_SOCK_ACCEPTOR> acceptor;
278 // Bind acceptor to any port and then find out what the port was.
279 ACE_INET_Addr server_addr;
280 ACE_INET_Addr local_addr (ACE_sap_any_cast (const ACE_INET_Addr &));
281 if (acceptor.open (local_addr, &r) == -1
282 || acceptor.acceptor ().get_local_addr (server_addr) == -1)
284 ACE_ERROR ((LM_ERROR,
285 ACE_TEXT ("(%t) %p\n"),
286 ACE_TEXT ("acceptor open")));
287 return;
290 ACE_DEBUG ((LM_DEBUG,
291 ACE_TEXT ("(%t) starting server at port %d\n"),
292 server_addr.get_port_number ()));
294 reactor_thread_nr = 0; // Reset for new set
295 if (-1 == tm->spawn_n (tp ? opt_reactor_threads : 1, reactor_loop, &r))
297 ACE_ERROR ((LM_ERROR,
298 ACE_TEXT ("%p\n"),
299 ACE_TEXT ("reactor thread spawn")));
300 acceptor.close();
301 return;
304 ACE_INET_Addr connection_addr (server_addr.get_port_number (),
305 ACE_DEFAULT_SERVER_HOST);
307 int sender_grp = tm->spawn (sender, &connection_addr);
308 if (-1 == sender_grp)
310 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("sender spawn")));
312 else
314 ACE_OS::sleep (opt_secs);
315 tm->cancel_grp (sender_grp);
317 r.end_reactor_event_loop ();
319 ACE_DEBUG ((LM_DEBUG,
320 ACE_TEXT ("(%t) waiting for the test threads...\n")));
321 tm->wait ();
325 run_main (int argc, ACE_TCHAR *argv[])
327 ACE_START_TEST (ACE_TEXT ("Reactor_Fairness_Test"));
329 //FUZZ: disable check_for_lack_ACE_OS
330 ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("c:s:t:d"), 1);
331 for (int c; (c = getopt ()) != -1; )
332 //FUZZ: enble check_for_lack_ACE_OS
333 switch (c)
335 case 'c':
336 opt_nconnections = ACE_OS::atoi (getopt.opt_arg ());
337 break;
338 case 's':
339 opt_secs = ACE_OS::atoi (getopt.opt_arg ());
340 break;
341 case 't':
342 opt_reactor_threads = ACE_OS::atoi (getopt.opt_arg ());
343 break;
344 case 'd':
345 opt_debug = 1;
346 break;
349 // Run the test once for each reactor type available.
350 int fails = 0;
351 results.reset (opt_nconnections);
353 ACE_Select_Reactor r;
354 run (r, ACE_TEXT ("Select Reactor"), false); // No thread pool
356 fails += results.analyze_reports ();
358 results.reset (opt_nconnections);
360 ACE_TP_Reactor r;
361 run (r, ACE_TEXT ("TP Reactor"));
363 fails += results.analyze_reports ();
365 #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
366 results.reset (opt_nconnections);
368 ACE_Dev_Poll_Reactor r;
369 run (r, ACE_TEXT ("Dev_Poll Reactor"));
371 fails += results.analyze_reports ();
372 #endif /* ACE_HAS_EVENT_POLL || ACE_HAS_DEV_POLL */
374 #if defined (ACE_WIN32)
375 results.reset (opt_nconnections);
377 ACE_WFMO_Reactor r;
378 run (r, ACE_TEXT ("WFMO Reactor"));
380 fails += results.analyze_reports ();
381 #endif /* ACE_WIN32 */
383 ACE_END_TEST;
384 return fails;
387 #else
389 run_main (int, ACE_TCHAR *[])
391 ACE_START_TEST (ACE_TEXT ("Reactor_Fairness_Test"));
393 ACE_ERROR ((LM_INFO,
394 ACE_TEXT ("threads not supported on this platform\n")));
396 ACE_END_TEST;
397 return 0;
399 #endif /* ACE_HAS_THREADS */