1 //=============================================================================
3 * @file Priority_Reactor_Test.cpp
5 * This is a test of the <ACE_Priority_Reactor>. The test forks
6 * two processes (for a total of three processes) which connect to
7 * the main process and The clients send data to a connector,
8 * interestingly enough the acceptor will give more priority to
9 * the second connection, which should run always before the first
12 * The test itself is interesting, it shows how to write very
13 * simple <ACE_Svc_Handler>, <ACE_Connectors> and <ACE_Acceptors>.
15 * @author Carlos O'Ryan <coryan@cs.wustl.edu>
17 //=============================================================================
19 #include "test_config.h"
20 #include "ace/Get_Opt.h"
21 #include "ace/SOCK_Connector.h"
22 #include "ace/SOCK_Acceptor.h"
23 #include "ace/Acceptor.h"
24 #include "ace/Handle_Set.h"
25 #include "ace/Connector.h"
27 #include "ace/Priority_Reactor.h"
28 #include "Priority_Reactor_Test.h"
30 #include "ace/OS_NS_sys_wait.h"
31 #include "ace/OS_NS_unistd.h"
35 static const char ACE_ALPHABET
[] = "abcdefghijklmnopqrstuvwxyz";
37 // The number of children to run, it can be changed using the -c
39 static int opt_nchildren
= 10;
41 // The number of loops per children, it can be changed using the -l
43 static int opt_nloops
= 200;
45 // If not set use the normal reactor, it can be changed using the -d
47 static int opt_priority_reactor
= 1;
49 // Maximum time to wait for the test termination (-t)
50 static int opt_max_duration
= 60;
52 // Maximum number of retries to connect, it can be changed using the
54 static int max_retries
= 5;
56 using CONNECTOR
= ACE_Connector
<Write_Handler
, ACE_SOCK_Connector
>;
57 using ACCEPTOR
= ACE_Acceptor
<Read_Handler
, ACE_SOCK_Acceptor
>;
59 int Read_Handler::waiting_
= 0;
60 int Read_Handler::started_
= 0;
63 Read_Handler::set_countdown (int nchildren
)
65 Read_Handler::waiting_
= nchildren
;
69 Read_Handler::get_countdown ()
71 return Read_Handler::waiting_
;
75 Read_Handler::open (void *)
77 if (this->peer ().enable (ACE_NONBLOCK
) == -1)
78 ACE_ERROR_RETURN ((LM_ERROR
,
79 ACE_TEXT ("(%P|%t) Read_Handler::open, ")
80 ACE_TEXT ("cannot set non blocking mode")),
83 if (reactor ()->register_handler (this, READ_MASK
) == -1)
84 ACE_ERROR_RETURN ((LM_ERROR
,
85 ACE_TEXT ("(%P|%t) Read_Handler::open, ")
86 ACE_TEXT ("cannot register handler")),
89 // A number larger than the actual number of priorities, so some
90 // clients are misbehaved, hence pusnished.
91 const int max_priority
= 15;
93 this->priority (ACE_Event_Handler::LO_PRIORITY
+ started_
% max_priority
);
97 ACE_TEXT ("(%P|%t) created svc_handler for handle %d ")
98 ACE_TEXT ("with priority %d\n"),
105 Read_Handler::handle_input (ACE_HANDLE h
)
107 // ACE_DEBUG((LM_DEBUG,
108 // "(%P|%t) Read_Handler::handle_input (%d)\n", h));
113 ssize_t result
= this->peer ().recv (buf
, sizeof (buf
));
117 if (result
< 0 && errno
== EWOULDBLOCK
)
121 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) %p\n"),
122 ACE_TEXT ("Read_Handler::handle_input")));
127 ACE_DEBUG ((LM_DEBUG
,
128 ACE_TEXT ("Last svc_handler closed, shutting down\n")));
129 ACE_Reactor::instance()->end_reactor_event_loop();
132 ACE_DEBUG ((LM_DEBUG
,
133 ACE_TEXT ("(%P|%t) Read_Handler::handle_input closing down\n")));
137 // ACE_DEBUG((LM_DEBUG,
138 // "(%P|%t) read %d bytes from handle %d, priority %d\n",
139 // result, h, priority ()));
144 Write_Handler::open (void *)
150 Write_Handler::svc ()
152 // Send several short messages, doing pauses between each message.
153 // The number of messages can be controlled from the command line.
154 ACE_Time_Value
pause (0, 1000);
156 for (int i
= 0; i
< opt_nloops
; ++i
)
158 if (this->peer ().send_n (ACE_ALPHABET
,
159 sizeof (ACE_ALPHABET
) - 1) == -1)
160 ACE_ERROR ((LM_ERROR
,
161 ACE_TEXT ("(%P|%t) %p\n"),
162 ACE_TEXT ("send_n")));
163 ACE_OS::sleep (pause
);
169 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
171 // Execute the client tests.
175 ACE_INET_Addr
*connection_addr
=
176 reinterpret_cast<ACE_INET_Addr
*> (arg
);
177 ACE_DEBUG ((LM_DEBUG
,
178 ACE_TEXT ("(%P|%t) running client\n")));
181 Write_Handler
*writer
= 0;
183 // Do exponential backoff connections
184 ACE_Synch_Options options
= ACE_Synch_Options::synch
;
186 // Start with one msec timeouts.
187 ACE_Time_Value
msec (0, 1000);
188 options
.timeout (msec
);
190 // Try up to <max_retries> to connect to the server.
191 for (int i
= 0; i
< max_retries
; i
++)
193 if (connector
.connect (writer
,
197 // Double the timeout...
198 ACE_Time_Value tmp
= options
.timeout ();
199 tmp
+= options
.timeout ();
200 options
.timeout (tmp
);
202 ACE_DEBUG ((LM_DEBUG
,
203 ACE_TEXT ("(%P|%t) still trying to connect\n")));
207 // Let the new Svc_Handler to its job...
210 // then close the connection and release the Svc_Handler.
213 ACE_DEBUG ((LM_DEBUG
,
214 ACE_TEXT ("(%P|%t) finishing client\n")));
219 ACE_ERROR ((LM_ERROR
,
220 ACE_TEXT ("(%P|%t) failed to connect after %d retries\n"),
228 run_main (int argc
, ACE_TCHAR
*argv
[])
230 ACE_START_TEST (ACE_TEXT ("Priority_Reactor_Test"));
232 //FUZZ: disable check_for_lack_ACE_OS
233 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT ("dc:l:m:t:"));
235 for (int c
; (c
= getopt ()) != -1; )
238 //FUZZ: enable check_for_lack_ACE_OS
240 opt_priority_reactor
= 0;
243 opt_nchildren
= ACE_OS::atoi (getopt
.opt_arg ());
246 opt_nloops
= ACE_OS::atoi (getopt
.opt_arg ());
249 max_retries
= ACE_OS::atoi (getopt
.opt_arg ());
252 opt_max_duration
= ACE_OS::atoi (getopt
.opt_arg ());
256 ACE_ERROR_RETURN ((LM_ERROR
,
257 ACE_TEXT ("Usage: Priority_Reactor_Test ")
258 ACE_TEXT (" [-d] (disable priority reactor)\n")
259 ACE_TEXT (" [-c nchildren] (number of threads/processes)\n")
260 ACE_TEXT (" [-l loops] (number of loops per child)\n")
261 ACE_TEXT (" [-m maxretries] (attempts to connect)\n")
262 ACE_TEXT (" [-t max_time] (limits test duration)\n")),
264 ACE_NOTREACHED (break);
267 // Manage Reactor memory automagically.
268 // Note: If opt_priority_reactor is false, the default ACE_Reactor is used
269 // and we don't need to set one up.
270 ACE_Reactor
*orig_reactor
= 0;
271 std::unique_ptr
<ACE_Reactor
> reactor
;
273 if (opt_priority_reactor
)
275 ACE_Select_Reactor
*impl_ptr
;
276 ACE_NEW_RETURN (impl_ptr
, ACE_Priority_Reactor
, -1);
277 std::unique_ptr
<ACE_Select_Reactor
> auto_impl (impl_ptr
);
279 ACE_Reactor
*reactor_ptr
;
280 ACE_NEW_RETURN (reactor_ptr
, ACE_Reactor (impl_ptr
, 1), -1);
281 auto_impl
.release (); // ACE_Reactor dtor will take it from here
282 std::unique_ptr
<ACE_Reactor
> auto_reactor (reactor_ptr
);
283 reactor
= std::move(auto_reactor
);
284 orig_reactor
= ACE_Reactor::instance (reactor_ptr
);
287 Read_Handler::set_countdown (opt_nchildren
);
289 #ifndef ACE_LACKS_ACCEPT
293 acceptor
.priority (ACE_Event_Handler::HI_PRIORITY
);
294 ACE_INET_Addr server_addr
;
296 // Bind acceptor to any port and then find out what the port was.
297 if (acceptor
.open (ACE_sap_any_cast (const ACE_INET_Addr
&)) == -1
298 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
299 ACE_ERROR_RETURN ((LM_ERROR
,
300 ACE_TEXT ("(%P|%t) %p\n"),
304 ACE_DEBUG ((LM_DEBUG
,
305 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
306 server_addr
.get_port_number ()));
308 ACE_INET_Addr
connection_addr (server_addr
.get_port_number (),
309 ACE_DEFAULT_SERVER_HOST
);
313 #if defined (ACE_HAS_THREADS)
314 for (i
= 0; i
< opt_nchildren
; ++i
)
316 if (ACE_Thread_Manager::instance ()->spawn
317 (ACE_THR_FUNC (client
),
318 (void *) &connection_addr
,
319 THR_NEW_LWP
| THR_DETACHED
) == -1)
320 ACE_ERROR ((LM_ERROR
,
321 ACE_TEXT ("(%P|%t) %p\n%a"),
322 ACE_TEXT ("thread create failed"),
325 #elif !defined (ACE_LACKS_FORK)
326 for (i
= 0; i
< opt_nchildren
; ++i
)
328 switch (ACE_OS::fork ("child"))
331 ACE_ERROR ((LM_ERROR
,
332 ACE_TEXT ("(%P|%t) %p\n%a"),
333 ACE_TEXT ("fork failed"),
338 client (&connection_addr
);
349 ACE_TEXT ("(%P|%t) ")
350 ACE_TEXT ("only one thread may be run ")
351 ACE_TEXT ("in a process on this platform\n")));
352 #endif /* ACE_HAS_THREADS */
354 ACE_Time_Value
tv (opt_max_duration
);
356 ACE_Reactor::instance()->register_handler
357 (&acceptor
, ACE_Event_Handler::READ_MASK
);
358 ACE_Reactor::instance()->run_reactor_event_loop (tv
);
360 if (Read_Handler::get_countdown () != 0)
362 ACE_DEBUG ((LM_DEBUG
,
363 ACE_TEXT ("(%P|%t) running out of time, ")
364 ACE_TEXT ("probably due to failed connections.\n")));
367 ACE_DEBUG ((LM_DEBUG
,
368 ACE_TEXT ("(%P|%t) waiting for the children...\n")));
370 #if defined (ACE_HAS_THREADS)
371 ACE_Thread_Manager::instance ()->wait ();
372 #elif !defined (ACE_WIN32) && !defined (VXWORKS)
373 for (i
= 0; i
< opt_nchildren
; ++i
)
375 pid_t pid
= ACE_OS::wait();
376 ACE_DEBUG ((LM_DEBUG
,
377 ACE_TEXT ("(%P|%t) child %d terminated\n"),
380 #endif /* ACE_HAS_THREADS */
382 #endif // ACE_LACKS_ACCEPT
384 if (orig_reactor
!= 0)
385 ACE_Reactor::instance (orig_reactor
);