2 //=============================================================================
4 * @file Reactor_Fairness_Test.cpp
6 * This test is used to time the dispatching mechanisms of the
7 * <ACE_Reactor>s. Both the <ACE_WFMO_Reactor> and
8 * <ACE_Select_Reactor> can be tested.
10 * @author Irfan Pyarali <irfan@cs.wustl.edu>
12 //=============================================================================
14 #include "test_config.h"
15 #include "Reactor_Fairness_Test.h"
16 #include "ace/Get_Opt.h"
17 #include "ace/SOCK_Connector.h"
18 #include "ace/SOCK_Acceptor.h"
19 #include "ace/Acceptor.h"
20 #include "ace/Reactor.h"
21 #include "ace/Dev_Poll_Reactor.h"
22 #include "ace/WFMO_Reactor.h"
23 #include "ace/Select_Reactor.h"
24 #include "ace/TP_Reactor.h"
26 #include "ace/Numeric_Limits.h"
27 #include "ace/Signal.h"
28 #include "ace/Atomic_Op.h"
29 #include "ace/Thread_Mutex.h"
31 #if defined (ACE_HAS_THREADS)
34 const char ACE_ALPHABET
[] = "abcdefghijklmnopqrstuvwxyz";
36 // Number of connections to run
37 int opt_nconnections
= 5;
39 // How many seconds to run the test on each reactor
42 // How many thread to run in the reactor loop
43 int opt_reactor_threads
= 3;
45 // Extra debug messages
48 ACE_Atomic_Op
<ACE_Thread_Mutex
, int> reactor_thread_nr
= 0;
50 // Class to collect and report on data handling for each test pass.
53 using report_map
= ACE_Array_Map
<ACE_HANDLE
, unsigned int>;
56 void reset (int n_connections
) // Reset for next run
59 nr_conns
= n_connections
;
62 void report (ACE_HANDLE h
, unsigned int chunks
)
64 std::pair
<ACE_HANDLE
, unsigned int> newval (h
, chunks
);
65 reports
.insert (newval
);
68 // Return 1 if this looks like a failure wrt fairness.
69 int analyze_reports ()
72 ACE_TEXT ("Results (%d entries):\n"),
74 unsigned int max_chunks
= 0;
75 unsigned int min_chunks
= ACE_Numeric_Limits
<unsigned int>::max();
76 for (report_map::iterator iter
= reports
.begin();
77 iter
!= reports
.end ();
81 ACE_TEXT (" handle %d: %u\n"),
82 (*iter
).first
, (*iter
).second
));
83 if ((*iter
).second
> max_chunks
)
84 max_chunks
= (*iter
).second
;
85 if ((*iter
).second
< min_chunks
)
86 min_chunks
= (*iter
).second
;
88 if ((max_chunks
- min_chunks
) > max_chunks
/ 10)
89 ACE_ERROR_RETURN ((LM_ERROR
,
90 ACE_TEXT ("Too much unfairness (max %u, min %u)\n"),
100 // Handle incoming data
102 Read_Handler::handle_input (ACE_HANDLE h
)
105 ssize_t result
= this->peer ().recv (buf
, ACE_OS::strlen(ACE_ALPHABET
));
108 ACE_TEXT ("(%t) Read_Handler::handle_input h %d, result %b\n"),
115 ACE_DEBUG ((LM_DEBUG
,
116 ACE_TEXT ("(%t) Read_Handler::handle_input: h %d: %C\n"),
124 if (errno
== EWOULDBLOCK
)
128 ACE_ERROR ((LM_ERROR
,
129 ACE_TEXT ("handle_input: h %d: %p (errno: %d)\n"),
130 h
, ACE_TEXT ("recv"), ACE_ERRNO_GET
));
132 // This will cause handle_close to get called.
139 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Handle %d closing\n"), h
));
140 // This will cause handle_close to get called.
147 // Handle connection shutdown.
150 Read_Handler::handle_close (ACE_HANDLE handle
,
151 ACE_Reactor_Mask
/*close_mask*/)
153 ACE_DEBUG ((LM_DEBUG
,
154 ACE_TEXT ("(%t) Read_Handler handle %d close; %u chunks\n"),
156 results
.report (handle
, this->chunks_in
);
163 // Pump data as fast as possible to all the sockets.
167 ACE_DEBUG ((LM_DEBUG
,
168 ACE_TEXT ("(%t) running sender\n")));
170 // Ensure an error, not a signal, on broken pipe.
171 ACE_Sig_Action
no_sigpipe ((ACE_SignalHandler
) SIG_IGN
);
172 ACE_Sig_Action original_action
;
173 no_sigpipe
.register_action (SIGPIPE
, &original_action
);
175 ACE_INET_Addr
*connection_addr
=
176 reinterpret_cast<ACE_INET_Addr
*> (arg
);
180 // Automagic memory cleanup.
181 ACE_SOCK_Stream
*temp_socks
= 0;
182 ACE_NEW_RETURN (temp_socks
,
183 ACE_SOCK_Stream
[opt_nconnections
],
185 std::unique_ptr
<ACE_SOCK_Stream
[]> socks (temp_socks
);
187 // Connection all <opt_nconnections> connections before sending data.
188 ACE_SOCK_Connector c
;
189 for (i
= 0; i
< opt_nconnections
; i
++)
191 if (c
.connect (socks
[i
], *connection_addr
) == -1)
193 if (errno
!= ECONNREFUSED
|| i
== 0)
195 ACE_ERROR ((LM_ERROR
,
196 ACE_TEXT ("(%t) conn %d %p\n"),
197 ACE_TEXT ("connect")));
203 socks
[i
].enable (ACE_NONBLOCK
);
205 if (i
< opt_nconnections
)
208 // Keep blasting data on all possible connections until this thread
209 // is canceled. If we manage to overrun the receiver on all sockets,
210 // sleep a bit for the receivers to catch up.
211 ACE_thread_t me
= ACE_Thread::self ();
212 ACE_Thread_Manager
*tm
= ACE_Thread_Manager::instance ();
213 size_t send_cnt
= ACE_OS::strlen (ACE_ALPHABET
);
215 while (!tm
->testcancel (me
) && !fail
)
217 bool sent_something
= false;
218 for (i
= 0; i
< opt_nconnections
; i
++)
220 ssize_t cnt
= socks
[i
].send (ACE_ALPHABET
, send_cnt
);
222 ACE_DEBUG ((LM_DEBUG
,
223 ACE_TEXT ("(%t) h %d sent %b\n"),
224 socks
[i
].get_handle(),
228 sent_something
= true;
231 if (errno
== EWOULDBLOCK
)
233 ACE_ERROR ((LM_ERROR
,
234 ACE_TEXT ("(%t) %p; giving up\n"),
235 ACE_TEXT ("sender")));
239 if (!fail
&& !sent_something
)
241 ACE_DEBUG ((LM_DEBUG
,
242 ACE_TEXT ("(%t) Full sockets... pausing...\n")));
244 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Resuming sending.\n")));
248 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Done sending.\n")));
249 for (i
= 0; i
< opt_nconnections
; i
++)
255 reactor_loop (void *p
)
257 ACE_Reactor
*r
= reinterpret_cast<ACE_Reactor
*> (p
);
258 int me
= reactor_thread_nr
++;
259 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Reactor loop %d starting...\n"), me
));
261 r
->owner (ACE_Thread::self ());
262 if (r
->run_reactor_event_loop () == -1)
263 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("reactor")));
265 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) reactor thread %d ending\n"), me
));
270 run (ACE_Reactor_Impl
&ri
, const ACE_TCHAR
*what
, bool tp
= true)
272 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Starting test with %s\n"), what
));
275 ACE_Thread_Manager
*tm
= ACE_Thread_Manager::instance ();
276 ACE_Acceptor
<Read_Handler
, ACE_SOCK_ACCEPTOR
> acceptor
;
278 // Bind acceptor to any port and then find out what the port was.
279 ACE_INET_Addr server_addr
;
280 ACE_INET_Addr
local_addr (ACE_sap_any_cast (const ACE_INET_Addr
&));
281 if (acceptor
.open (local_addr
, &r
) == -1
282 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
284 ACE_ERROR ((LM_ERROR
,
285 ACE_TEXT ("(%t) %p\n"),
286 ACE_TEXT ("acceptor open")));
290 ACE_DEBUG ((LM_DEBUG
,
291 ACE_TEXT ("(%t) starting server at port %d\n"),
292 server_addr
.get_port_number ()));
294 reactor_thread_nr
= 0; // Reset for new set
295 if (-1 == tm
->spawn_n (tp
? opt_reactor_threads
: 1, reactor_loop
, &r
))
297 ACE_ERROR ((LM_ERROR
,
299 ACE_TEXT ("reactor thread spawn")));
304 ACE_INET_Addr
connection_addr (server_addr
.get_port_number (),
305 ACE_DEFAULT_SERVER_HOST
);
307 int sender_grp
= tm
->spawn (sender
, &connection_addr
);
308 if (-1 == sender_grp
)
310 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("sender spawn")));
314 ACE_OS::sleep (opt_secs
);
315 tm
->cancel_grp (sender_grp
);
317 r
.end_reactor_event_loop ();
319 ACE_DEBUG ((LM_DEBUG
,
320 ACE_TEXT ("(%t) waiting for the test threads...\n")));
325 run_main (int argc
, ACE_TCHAR
*argv
[])
327 ACE_START_TEST (ACE_TEXT ("Reactor_Fairness_Test"));
329 //FUZZ: disable check_for_lack_ACE_OS
330 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT ("c:s:t:d"), 1);
331 for (int c
; (c
= getopt ()) != -1; )
332 //FUZZ: enble check_for_lack_ACE_OS
336 opt_nconnections
= ACE_OS::atoi (getopt
.opt_arg ());
339 opt_secs
= ACE_OS::atoi (getopt
.opt_arg ());
342 opt_reactor_threads
= ACE_OS::atoi (getopt
.opt_arg ());
349 // Run the test once for each reactor type available.
351 results
.reset (opt_nconnections
);
353 ACE_Select_Reactor r
;
354 run (r
, ACE_TEXT ("Select Reactor"), false); // No thread pool
356 fails
+= results
.analyze_reports ();
358 results
.reset (opt_nconnections
);
361 run (r
, ACE_TEXT ("TP Reactor"));
363 fails
+= results
.analyze_reports ();
365 #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
366 results
.reset (opt_nconnections
);
368 ACE_Dev_Poll_Reactor r
;
369 run (r
, ACE_TEXT ("Dev_Poll Reactor"));
371 fails
+= results
.analyze_reports ();
372 #endif /* ACE_HAS_EVENT_POLL || ACE_HAS_DEV_POLL */
374 #if defined (ACE_WIN32)
375 results
.reset (opt_nconnections
);
378 run (r
, ACE_TEXT ("WFMO Reactor"));
380 fails
+= results
.analyze_reports ();
381 #endif /* ACE_WIN32 */
389 run_main (int, ACE_TCHAR
*[])
391 ACE_START_TEST (ACE_TEXT ("Reactor_Fairness_Test"));
394 ACE_TEXT ("threads not supported on this platform\n")));
399 #endif /* ACE_HAS_THREADS */