ACE+TAO-7_0_8
[ACE_TAO.git] / ACE / tests / Reactor_Fairness_Test.cpp
blob426adc3e269c26831b89c81d3bdf6ac40cff4d03
2 //=============================================================================
3 /**
4 * @file Reactor_Fairness_Test.cpp
6 * This test is used to time the dispatching mechanisms of the
7 * <ACE_Reactor>s. Both the <ACE_WFMO_Reactor> and
8 * <ACE_Select_Reactor> can be tested.
10 * @author Irfan Pyarali <irfan@cs.wustl.edu>
12 //=============================================================================
14 #include "test_config.h"
15 #include "Reactor_Fairness_Test.h"
16 #include "ace/Get_Opt.h"
17 #include "ace/SOCK_Connector.h"
18 #include "ace/SOCK_Acceptor.h"
19 #include "ace/Acceptor.h"
20 #include "ace/Reactor.h"
21 #include "ace/Dev_Poll_Reactor.h"
22 #include "ace/WFMO_Reactor.h"
23 #include "ace/Select_Reactor.h"
24 #include "ace/TP_Reactor.h"
25 #include "ace/Auto_Ptr.h"
26 #include "ace/Numeric_Limits.h"
27 #include "ace/Signal.h"
28 #include "ace/Atomic_Op.h"
29 #include "ace/Thread_Mutex.h"
31 #if defined (ACE_HAS_THREADS)
33 namespace {
35 const char ACE_ALPHABET[] = "abcdefghijklmnopqrstuvwxyz";
37 // Number of connections to run
38 int opt_nconnections = 5;
40 // How many seconds to run the test on each reactor
41 int opt_secs = 30;
43 // How many thread to run in the reactor loop
44 int opt_reactor_threads = 3;
46 // Extra debug messages
47 int opt_debug = 0;
49 ACE_Atomic_Op<ACE_Thread_Mutex, int> reactor_thread_nr = 0;
51 // Class to collect and report on data handling for each test pass.
52 struct Result_Set {
53 int nr_conns;
54 using report_map = ACE_Array_Map<ACE_HANDLE, unsigned int>;
55 report_map reports;
57 void reset (int n_connections) // Reset for next run
59 reports.clear ();
60 nr_conns = n_connections;
63 void report (ACE_HANDLE h, unsigned int chunks)
65 std::pair<ACE_HANDLE, unsigned int> newval (h, chunks);
66 reports.insert (newval);
69 // Return 1 if this looks like a failure wrt fairness.
70 int analyze_reports ()
72 ACE_DEBUG ((LM_DEBUG,
73 ACE_TEXT ("Results (%d entries):\n"),
74 reports.size()));
75 unsigned int max_chunks = 0;
76 unsigned int min_chunks = ACE_Numeric_Limits<unsigned int>::max();
77 for (report_map::iterator iter = reports.begin();
78 iter != reports.end ();
79 ++iter)
81 ACE_DEBUG ((LM_DEBUG,
82 ACE_TEXT (" handle %d: %u\n"),
83 (*iter).first, (*iter).second));
84 if ((*iter).second > max_chunks)
85 max_chunks = (*iter).second;
86 if ((*iter).second < min_chunks)
87 min_chunks = (*iter).second;
89 if ((max_chunks - min_chunks) > max_chunks / 10)
90 ACE_ERROR_RETURN ((LM_ERROR,
91 ACE_TEXT ("Too much unfairness (max %u, min %u)\n"),
92 max_chunks,
93 min_chunks),
94 1);
95 return 0;
98 Result_Set results;
101 // Handle incoming data
103 Read_Handler::handle_input (ACE_HANDLE h)
105 char buf[BUFSIZ];
106 ssize_t result = this->peer ().recv (buf, ACE_OS::strlen(ACE_ALPHABET));
107 if (opt_debug)
108 ACE_DEBUG((LM_DEBUG,
109 ACE_TEXT ("(%t) Read_Handler::handle_input h %d, result %b\n"),
110 h, result));
111 if (result > 0)
113 if (opt_debug)
115 buf[result] = 0;
116 ACE_DEBUG ((LM_DEBUG,
117 ACE_TEXT ("(%t) Read_Handler::handle_input: h %d: %C\n"),
119 buf));
121 ++this->chunks_in;
123 else if (result < 0)
125 if (errno == EWOULDBLOCK)
126 return 0;
127 else
129 ACE_ERROR ((LM_ERROR,
130 ACE_TEXT ("handle_input: h %d: %p (errno: %d)\n"),
131 h, ACE_TEXT ("recv"), ACE_ERRNO_GET));
133 // This will cause handle_close to get called.
134 return -1;
137 else // result == 0
139 if (opt_debug)
140 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Handle %d closing\n"), h));
141 // This will cause handle_close to get called.
142 return -1;
145 return 0;
148 // Handle connection shutdown.
151 Read_Handler::handle_close (ACE_HANDLE handle,
152 ACE_Reactor_Mask /*close_mask*/)
154 ACE_DEBUG ((LM_DEBUG,
155 ACE_TEXT ("(%t) Read_Handler handle %d close; %u chunks\n"),
156 handle, chunks_in));
157 results.report (handle, this->chunks_in);
159 // Shutdown
160 this->destroy ();
161 return 0;
164 // Pump data as fast as possible to all the sockets.
165 ACE_THR_FUNC_RETURN
166 sender (void *arg)
168 ACE_DEBUG ((LM_DEBUG,
169 ACE_TEXT ("(%t) running sender\n")));
171 // Ensure an error, not a signal, on broken pipe.
172 ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);
173 ACE_Sig_Action original_action;
174 no_sigpipe.register_action (SIGPIPE, &original_action);
176 ACE_INET_Addr *connection_addr =
177 reinterpret_cast<ACE_INET_Addr *> (arg);
179 int i;
181 // Automagic memory cleanup.
182 ACE_SOCK_Stream *temp_socks = 0;
183 ACE_NEW_RETURN (temp_socks,
184 ACE_SOCK_Stream [opt_nconnections],
186 ACE_Auto_Basic_Array_Ptr <ACE_SOCK_Stream> socks (temp_socks);
188 // Connection all <opt_nconnections> connections before sending data.
189 ACE_SOCK_Connector c;
190 for (i = 0; i < opt_nconnections; i++)
192 if (c.connect (socks[i], *connection_addr) == -1)
194 if (errno != ECONNREFUSED || i == 0)
196 ACE_ERROR ((LM_ERROR,
197 ACE_TEXT ("(%t) conn %d %p\n"),
198 ACE_TEXT ("connect")));
199 while (--i >= 0)
200 socks[i].close ();
201 break;
204 socks[i].enable (ACE_NONBLOCK);
206 if (i < opt_nconnections)
207 return 0;
209 // Keep blasting data on all possible connections until this thread
210 // is canceled. If we manage to overrun the receiver on all sockets,
211 // sleep a bit for the receivers to catch up.
212 ACE_thread_t me = ACE_Thread::self ();
213 ACE_Thread_Manager *tm = ACE_Thread_Manager::instance ();
214 size_t send_cnt = ACE_OS::strlen (ACE_ALPHABET);
215 bool fail = false;
216 while (!tm->testcancel (me) && !fail)
218 bool sent_something = false;
219 for (i = 0; i < opt_nconnections; i++)
221 ssize_t cnt = socks[i].send (ACE_ALPHABET, send_cnt);
222 if (opt_debug)
223 ACE_DEBUG ((LM_DEBUG,
224 ACE_TEXT ("(%t) h %d sent %b\n"),
225 socks[i].get_handle(),
226 cnt));
227 if (cnt > 0)
229 sent_something = true;
230 continue;
232 if (errno == EWOULDBLOCK)
233 continue;
234 ACE_ERROR ((LM_ERROR,
235 ACE_TEXT ("(%t) %p; giving up\n"),
236 ACE_TEXT ("sender")));
237 fail = true;
238 break;
240 if (!fail && !sent_something)
242 ACE_DEBUG ((LM_DEBUG,
243 ACE_TEXT ("(%t) Full sockets... pausing...\n")));
244 ACE_OS::sleep (1);
245 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Resuming sending.\n")));
249 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Done sending.\n")));
250 for (i = 0; i < opt_nconnections; i++)
251 socks[i].close ();
252 return 0;
255 ACE_THR_FUNC_RETURN
256 reactor_loop (void *p)
258 ACE_Reactor *r = reinterpret_cast<ACE_Reactor *> (p);
259 int me = reactor_thread_nr++;
260 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Reactor loop %d starting...\n"), me));
261 if (me == 0)
262 r->owner (ACE_Thread::self ());
263 if (r->run_reactor_event_loop () == -1)
264 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("reactor")));
265 else
266 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) reactor thread %d ending\n"), me));
267 return 0;
270 void
271 run (ACE_Reactor_Impl &ri, const ACE_TCHAR *what, bool tp = true)
273 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Starting test with %s\n"), what));
275 ACE_Reactor r (&ri);
276 ACE_Thread_Manager *tm = ACE_Thread_Manager::instance ();
277 ACE_Acceptor<Read_Handler, ACE_SOCK_ACCEPTOR> acceptor;
279 // Bind acceptor to any port and then find out what the port was.
280 ACE_INET_Addr server_addr;
281 ACE_INET_Addr local_addr (ACE_sap_any_cast (const ACE_INET_Addr &));
282 if (acceptor.open (local_addr, &r) == -1
283 || acceptor.acceptor ().get_local_addr (server_addr) == -1)
285 ACE_ERROR ((LM_ERROR,
286 ACE_TEXT ("(%t) %p\n"),
287 ACE_TEXT ("acceptor open")));
288 return;
291 ACE_DEBUG ((LM_DEBUG,
292 ACE_TEXT ("(%t) starting server at port %d\n"),
293 server_addr.get_port_number ()));
295 reactor_thread_nr = 0; // Reset for new set
296 if (-1 == tm->spawn_n (tp ? opt_reactor_threads : 1, reactor_loop, &r))
298 ACE_ERROR ((LM_ERROR,
299 ACE_TEXT ("%p\n"),
300 ACE_TEXT ("reactor thread spawn")));
301 acceptor.close();
302 return;
305 ACE_INET_Addr connection_addr (server_addr.get_port_number (),
306 ACE_DEFAULT_SERVER_HOST);
308 int sender_grp = tm->spawn (sender, &connection_addr);
309 if (-1 == sender_grp)
311 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("sender spawn")));
313 else
315 ACE_OS::sleep (opt_secs);
316 tm->cancel_grp (sender_grp);
318 r.end_reactor_event_loop ();
320 ACE_DEBUG ((LM_DEBUG,
321 ACE_TEXT ("(%t) waiting for the test threads...\n")));
322 tm->wait ();
326 run_main (int argc, ACE_TCHAR *argv[])
328 ACE_START_TEST (ACE_TEXT ("Reactor_Fairness_Test"));
330 //FUZZ: disable check_for_lack_ACE_OS
331 ACE_Get_Opt getopt (argc, argv, ACE_TEXT ("c:s:t:d"), 1);
332 for (int c; (c = getopt ()) != -1; )
333 //FUZZ: enble check_for_lack_ACE_OS
334 switch (c)
336 case 'c':
337 opt_nconnections = ACE_OS::atoi (getopt.opt_arg ());
338 break;
339 case 's':
340 opt_secs = ACE_OS::atoi (getopt.opt_arg ());
341 break;
342 case 't':
343 opt_reactor_threads = ACE_OS::atoi (getopt.opt_arg ());
344 break;
345 case 'd':
346 opt_debug = 1;
347 break;
350 // Run the test once for each reactor type available.
351 int fails = 0;
352 results.reset (opt_nconnections);
354 ACE_Select_Reactor r;
355 run (r, ACE_TEXT ("Select Reactor"), false); // No thread pool
357 fails += results.analyze_reports ();
359 results.reset (opt_nconnections);
361 ACE_TP_Reactor r;
362 run (r, ACE_TEXT ("TP Reactor"));
364 fails += results.analyze_reports ();
366 #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
367 results.reset (opt_nconnections);
369 ACE_Dev_Poll_Reactor r;
370 run (r, ACE_TEXT ("Dev_Poll Reactor"));
372 fails += results.analyze_reports ();
373 #endif /* ACE_HAS_EVENT_POLL || ACE_HAS_DEV_POLL */
375 #if defined (ACE_WIN32)
376 results.reset (opt_nconnections);
378 ACE_WFMO_Reactor r;
379 run (r, ACE_TEXT ("WFMO Reactor"));
381 fails += results.analyze_reports ();
382 #endif /* ACE_WIN32 */
384 ACE_END_TEST;
385 return fails;
388 #else
390 run_main (int, ACE_TCHAR *[])
392 ACE_START_TEST (ACE_TEXT ("Reactor_Fairness_Test"));
394 ACE_ERROR ((LM_INFO,
395 ACE_TEXT ("threads not supported on this platform\n")));
397 ACE_END_TEST;
398 return 0;
400 #endif /* ACE_HAS_THREADS */