2 //=============================================================================
4 * @file Reactor_Fairness_Test.cpp
6 * This test is used to time the dispatching mechanisms of the
7 * <ACE_Reactor>s. Both the <ACE_WFMO_Reactor> and
8 * <ACE_Select_Reactor> can be tested.
10 * @author Irfan Pyarali <irfan@cs.wustl.edu>
12 //=============================================================================
14 #include "test_config.h"
15 #include "Reactor_Fairness_Test.h"
16 #include "ace/Get_Opt.h"
17 #include "ace/SOCK_Connector.h"
18 #include "ace/SOCK_Acceptor.h"
19 #include "ace/Acceptor.h"
20 #include "ace/Reactor.h"
21 #include "ace/Dev_Poll_Reactor.h"
22 #include "ace/WFMO_Reactor.h"
23 #include "ace/Select_Reactor.h"
24 #include "ace/TP_Reactor.h"
25 #include "ace/Auto_Ptr.h"
26 #include "ace/Numeric_Limits.h"
27 #include "ace/Signal.h"
28 #include "ace/Atomic_Op.h"
29 #include "ace/Thread_Mutex.h"
31 #if defined (ACE_HAS_THREADS)
35 const char ACE_ALPHABET
[] = "abcdefghijklmnopqrstuvwxyz";
37 // Number of connections to run
38 int opt_nconnections
= 5;
40 // How many seconds to run the test on each reactor
43 // How many thread to run in the reactor loop
44 int opt_reactor_threads
= 3;
46 // Extra debug messages
49 ACE_Atomic_Op
<ACE_Thread_Mutex
, int> reactor_thread_nr
= 0;
51 // Class to collect and report on data handling for each test pass.
54 using report_map
= ACE_Array_Map
<ACE_HANDLE
, unsigned int>;
57 void reset (int n_connections
) // Reset for next run
60 nr_conns
= n_connections
;
63 void report (ACE_HANDLE h
, unsigned int chunks
)
65 std::pair
<ACE_HANDLE
, unsigned int> newval (h
, chunks
);
66 reports
.insert (newval
);
69 // Return 1 if this looks like a failure wrt fairness.
70 int analyze_reports ()
73 ACE_TEXT ("Results (%d entries):\n"),
75 unsigned int max_chunks
= 0;
76 unsigned int min_chunks
= ACE_Numeric_Limits
<unsigned int>::max();
77 for (report_map::iterator iter
= reports
.begin();
78 iter
!= reports
.end ();
82 ACE_TEXT (" handle %d: %u\n"),
83 (*iter
).first
, (*iter
).second
));
84 if ((*iter
).second
> max_chunks
)
85 max_chunks
= (*iter
).second
;
86 if ((*iter
).second
< min_chunks
)
87 min_chunks
= (*iter
).second
;
89 if ((max_chunks
- min_chunks
) > max_chunks
/ 10)
90 ACE_ERROR_RETURN ((LM_ERROR
,
91 ACE_TEXT ("Too much unfairness (max %u, min %u)\n"),
101 // Handle incoming data
103 Read_Handler::handle_input (ACE_HANDLE h
)
106 ssize_t result
= this->peer ().recv (buf
, ACE_OS::strlen(ACE_ALPHABET
));
109 ACE_TEXT ("(%t) Read_Handler::handle_input h %d, result %b\n"),
116 ACE_DEBUG ((LM_DEBUG
,
117 ACE_TEXT ("(%t) Read_Handler::handle_input: h %d: %C\n"),
125 if (errno
== EWOULDBLOCK
)
129 ACE_ERROR ((LM_ERROR
,
130 ACE_TEXT ("handle_input: h %d: %p (errno: %d)\n"),
131 h
, ACE_TEXT ("recv"), ACE_ERRNO_GET
));
133 // This will cause handle_close to get called.
140 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Handle %d closing\n"), h
));
141 // This will cause handle_close to get called.
148 // Handle connection shutdown.
151 Read_Handler::handle_close (ACE_HANDLE handle
,
152 ACE_Reactor_Mask
/*close_mask*/)
154 ACE_DEBUG ((LM_DEBUG
,
155 ACE_TEXT ("(%t) Read_Handler handle %d close; %u chunks\n"),
157 results
.report (handle
, this->chunks_in
);
164 // Pump data as fast as possible to all the sockets.
168 ACE_DEBUG ((LM_DEBUG
,
169 ACE_TEXT ("(%t) running sender\n")));
171 // Ensure an error, not a signal, on broken pipe.
172 ACE_Sig_Action
no_sigpipe ((ACE_SignalHandler
) SIG_IGN
);
173 ACE_Sig_Action original_action
;
174 no_sigpipe
.register_action (SIGPIPE
, &original_action
);
176 ACE_INET_Addr
*connection_addr
=
177 reinterpret_cast<ACE_INET_Addr
*> (arg
);
181 // Automagic memory cleanup.
182 ACE_SOCK_Stream
*temp_socks
= 0;
183 ACE_NEW_RETURN (temp_socks
,
184 ACE_SOCK_Stream
[opt_nconnections
],
186 ACE_Auto_Basic_Array_Ptr
<ACE_SOCK_Stream
> socks (temp_socks
);
188 // Connection all <opt_nconnections> connections before sending data.
189 ACE_SOCK_Connector c
;
190 for (i
= 0; i
< opt_nconnections
; i
++)
192 if (c
.connect (socks
[i
], *connection_addr
) == -1)
194 if (errno
!= ECONNREFUSED
|| i
== 0)
196 ACE_ERROR ((LM_ERROR
,
197 ACE_TEXT ("(%t) conn %d %p\n"),
198 ACE_TEXT ("connect")));
204 socks
[i
].enable (ACE_NONBLOCK
);
206 if (i
< opt_nconnections
)
209 // Keep blasting data on all possible connections until this thread
210 // is canceled. If we manage to overrun the receiver on all sockets,
211 // sleep a bit for the receivers to catch up.
212 ACE_thread_t me
= ACE_Thread::self ();
213 ACE_Thread_Manager
*tm
= ACE_Thread_Manager::instance ();
214 size_t send_cnt
= ACE_OS::strlen (ACE_ALPHABET
);
216 while (!tm
->testcancel (me
) && !fail
)
218 bool sent_something
= false;
219 for (i
= 0; i
< opt_nconnections
; i
++)
221 ssize_t cnt
= socks
[i
].send (ACE_ALPHABET
, send_cnt
);
223 ACE_DEBUG ((LM_DEBUG
,
224 ACE_TEXT ("(%t) h %d sent %b\n"),
225 socks
[i
].get_handle(),
229 sent_something
= true;
232 if (errno
== EWOULDBLOCK
)
234 ACE_ERROR ((LM_ERROR
,
235 ACE_TEXT ("(%t) %p; giving up\n"),
236 ACE_TEXT ("sender")));
240 if (!fail
&& !sent_something
)
242 ACE_DEBUG ((LM_DEBUG
,
243 ACE_TEXT ("(%t) Full sockets... pausing...\n")));
245 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Resuming sending.\n")));
249 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Done sending.\n")));
250 for (i
= 0; i
< opt_nconnections
; i
++)
256 reactor_loop (void *p
)
258 ACE_Reactor
*r
= reinterpret_cast<ACE_Reactor
*> (p
);
259 int me
= reactor_thread_nr
++;
260 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Reactor loop %d starting...\n"), me
));
262 r
->owner (ACE_Thread::self ());
263 if (r
->run_reactor_event_loop () == -1)
264 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("reactor")));
266 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) reactor thread %d ending\n"), me
));
271 run (ACE_Reactor_Impl
&ri
, const ACE_TCHAR
*what
, bool tp
= true)
273 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Starting test with %s\n"), what
));
276 ACE_Thread_Manager
*tm
= ACE_Thread_Manager::instance ();
277 ACE_Acceptor
<Read_Handler
, ACE_SOCK_ACCEPTOR
> acceptor
;
279 // Bind acceptor to any port and then find out what the port was.
280 ACE_INET_Addr server_addr
;
281 ACE_INET_Addr
local_addr (ACE_sap_any_cast (const ACE_INET_Addr
&));
282 if (acceptor
.open (local_addr
, &r
) == -1
283 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
285 ACE_ERROR ((LM_ERROR
,
286 ACE_TEXT ("(%t) %p\n"),
287 ACE_TEXT ("acceptor open")));
291 ACE_DEBUG ((LM_DEBUG
,
292 ACE_TEXT ("(%t) starting server at port %d\n"),
293 server_addr
.get_port_number ()));
295 reactor_thread_nr
= 0; // Reset for new set
296 if (-1 == tm
->spawn_n (tp
? opt_reactor_threads
: 1, reactor_loop
, &r
))
298 ACE_ERROR ((LM_ERROR
,
300 ACE_TEXT ("reactor thread spawn")));
305 ACE_INET_Addr
connection_addr (server_addr
.get_port_number (),
306 ACE_DEFAULT_SERVER_HOST
);
308 int sender_grp
= tm
->spawn (sender
, &connection_addr
);
309 if (-1 == sender_grp
)
311 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("sender spawn")));
315 ACE_OS::sleep (opt_secs
);
316 tm
->cancel_grp (sender_grp
);
318 r
.end_reactor_event_loop ();
320 ACE_DEBUG ((LM_DEBUG
,
321 ACE_TEXT ("(%t) waiting for the test threads...\n")));
326 run_main (int argc
, ACE_TCHAR
*argv
[])
328 ACE_START_TEST (ACE_TEXT ("Reactor_Fairness_Test"));
330 //FUZZ: disable check_for_lack_ACE_OS
331 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT ("c:s:t:d"), 1);
332 for (int c
; (c
= getopt ()) != -1; )
333 //FUZZ: enble check_for_lack_ACE_OS
337 opt_nconnections
= ACE_OS::atoi (getopt
.opt_arg ());
340 opt_secs
= ACE_OS::atoi (getopt
.opt_arg ());
343 opt_reactor_threads
= ACE_OS::atoi (getopt
.opt_arg ());
350 // Run the test once for each reactor type available.
352 results
.reset (opt_nconnections
);
354 ACE_Select_Reactor r
;
355 run (r
, ACE_TEXT ("Select Reactor"), false); // No thread pool
357 fails
+= results
.analyze_reports ();
359 results
.reset (opt_nconnections
);
362 run (r
, ACE_TEXT ("TP Reactor"));
364 fails
+= results
.analyze_reports ();
366 #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
367 results
.reset (opt_nconnections
);
369 ACE_Dev_Poll_Reactor r
;
370 run (r
, ACE_TEXT ("Dev_Poll Reactor"));
372 fails
+= results
.analyze_reports ();
373 #endif /* ACE_HAS_EVENT_POLL || ACE_HAS_DEV_POLL */
375 #if defined (ACE_WIN32)
376 results
.reset (opt_nconnections
);
379 run (r
, ACE_TEXT ("WFMO Reactor"));
381 fails
+= results
.analyze_reports ();
382 #endif /* ACE_WIN32 */
390 run_main (int, ACE_TCHAR
*[])
392 ACE_START_TEST (ACE_TEXT ("Reactor_Fairness_Test"));
395 ACE_TEXT ("threads not supported on this platform\n")));
400 #endif /* ACE_HAS_THREADS */