2 //=============================================================================
4 * @file Priority_Reactor_Test.cpp
6 * This is a test of the <ACE_Priority_Reactor>. The test forks
7 * two processes (for a total of three processes) which connect to
8 * the main process and The clients send data to a connector,
9 * interestingly enough the acceptor will give more priority to
10 * the second connection, which should run always before the first
13 * The test itself is interesting, it shows how to write very
14 * simple <ACE_Svc_Handler>, <ACE_Connectors> and <ACE_Acceptors>.
16 * @author Carlos O'Ryan <coryan@cs.wustl.edu>
18 //=============================================================================
21 #include "test_config.h"
22 #include "ace/Get_Opt.h"
23 #include "ace/SOCK_Connector.h"
24 #include "ace/SOCK_Acceptor.h"
25 #include "ace/Acceptor.h"
26 #include "ace/Handle_Set.h"
27 #include "ace/Connector.h"
28 #include "ace/Auto_Ptr.h"
29 #include "ace/Priority_Reactor.h"
30 #include "Priority_Reactor_Test.h"
31 #include "ace/OS_NS_sys_wait.h"
32 #include "ace/OS_NS_unistd.h"
36 static const char ACE_ALPHABET
[] = "abcdefghijklmnopqrstuvwxyz";
38 // The number of children to run, it can be changed using the -c
40 static int opt_nchildren
= 10;
42 // The number of loops per children, it can be changed using the -l
44 static int opt_nloops
= 200;
46 // If not set use the normal reactor, it can be changed using the -d
48 static int opt_priority_reactor
= 1;
50 // Maximum time to wait for the test termination (-t)
51 static int opt_max_duration
= 60;
53 // Maximum number of retries to connect, it can be changed using the
55 static int max_retries
= 5;
57 typedef ACE_Connector
<Write_Handler
, ACE_SOCK_CONNECTOR
>
59 typedef ACE_Acceptor
<Read_Handler
, ACE_SOCK_ACCEPTOR
>
62 int Read_Handler::waiting_
= 0;
63 int Read_Handler::started_
= 0;
66 Read_Handler::set_countdown (int nchildren
)
68 Read_Handler::waiting_
= nchildren
;
72 Read_Handler::get_countdown (void)
74 return Read_Handler::waiting_
;
78 Read_Handler::open (void *)
80 if (this->peer ().enable (ACE_NONBLOCK
) == -1)
81 ACE_ERROR_RETURN ((LM_ERROR
,
82 ACE_TEXT ("(%P|%t) Read_Handler::open, ")
83 ACE_TEXT ("cannot set non blocking mode")),
86 if (reactor ()->register_handler (this, READ_MASK
) == -1)
87 ACE_ERROR_RETURN ((LM_ERROR
,
88 ACE_TEXT ("(%P|%t) Read_Handler::open, ")
89 ACE_TEXT ("cannot register handler")),
92 // A number larger than the actual number of priorities, so some
93 // clients are misbehaved, hence pusnished.
94 const int max_priority
= 15;
96 this->priority (ACE_Event_Handler::LO_PRIORITY
+ started_
% max_priority
);
100 ACE_TEXT ("(%P|%t) created svc_handler for handle %d ")
101 ACE_TEXT ("with priority %d\n"),
108 Read_Handler::handle_input (ACE_HANDLE h
)
110 // ACE_DEBUG((LM_DEBUG,
111 // "(%P|%t) Read_Handler::handle_input (%d)\n", h));
116 ssize_t result
= this->peer ().recv (buf
, sizeof (buf
));
120 if (result
< 0 && errno
== EWOULDBLOCK
)
124 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) %p\n"),
125 ACE_TEXT ("Read_Handler::handle_input")));
130 ACE_DEBUG ((LM_DEBUG
,
131 ACE_TEXT ("Last svc_handler closed, shutting down\n")));
132 ACE_Reactor::instance()->end_reactor_event_loop();
135 ACE_DEBUG ((LM_DEBUG
,
136 ACE_TEXT ("(%P|%t) Read_Handler::handle_input closing down\n")));
140 // ACE_DEBUG((LM_DEBUG,
141 // "(%P|%t) read %d bytes from handle %d, priority %d\n",
142 // result, h, priority ()));
147 Write_Handler::open (void *)
153 Write_Handler::svc (void)
155 // Send several short messages, doing pauses between each message.
156 // The number of messages can be controlled from the command line.
157 ACE_Time_Value
pause (0, 1000);
159 for (int i
= 0; i
< opt_nloops
; ++i
)
161 if (this->peer ().send_n (ACE_ALPHABET
,
162 sizeof (ACE_ALPHABET
) - 1) == -1)
163 ACE_ERROR ((LM_ERROR
,
164 ACE_TEXT ("(%P|%t) %p\n"),
165 ACE_TEXT ("send_n")));
166 ACE_OS::sleep (pause
);
172 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
174 // Execute the client tests.
178 ACE_INET_Addr
*connection_addr
=
179 reinterpret_cast<ACE_INET_Addr
*> (arg
);
180 ACE_DEBUG ((LM_DEBUG
,
181 ACE_TEXT ("(%P|%t) running client\n")));
184 Write_Handler
*writer
= 0;
186 // Do exponential backoff connections
187 ACE_Synch_Options options
= ACE_Synch_Options::synch
;
189 // Start with one msec timeouts.
190 ACE_Time_Value
msec (0, 1000);
191 options
.timeout (msec
);
193 // Try up to <max_retries> to connect to the server.
194 for (int i
= 0; i
< max_retries
; i
++)
196 if (connector
.connect (writer
,
200 // Double the timeout...
201 ACE_Time_Value tmp
= options
.timeout ();
202 tmp
+= options
.timeout ();
203 options
.timeout (tmp
);
205 ACE_DEBUG ((LM_DEBUG
,
206 ACE_TEXT ("(%P|%t) still trying to connect\n")));
210 // Let the new Svc_Handler to its job...
213 // then close the connection and release the Svc_Handler.
216 ACE_DEBUG ((LM_DEBUG
,
217 ACE_TEXT ("(%P|%t) finishing client\n")));
222 ACE_ERROR ((LM_ERROR
,
223 ACE_TEXT ("(%P|%t) failed to connect after %d retries\n"),
231 run_main (int argc
, ACE_TCHAR
*argv
[])
233 ACE_START_TEST (ACE_TEXT ("Priority_Reactor_Test"));
235 //FUZZ: disable check_for_lack_ACE_OS
236 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT ("dc:l:m:t:"));
238 for (int c
; (c
= getopt ()) != -1; )
241 //FUZZ: enable check_for_lack_ACE_OS
243 opt_priority_reactor
= 0;
246 opt_nchildren
= ACE_OS::atoi (getopt
.opt_arg ());
249 opt_nloops
= ACE_OS::atoi (getopt
.opt_arg ());
252 max_retries
= ACE_OS::atoi (getopt
.opt_arg ());
255 opt_max_duration
= ACE_OS::atoi (getopt
.opt_arg ());
259 ACE_ERROR_RETURN ((LM_ERROR
,
260 ACE_TEXT ("Usage: Priority_Reactor_Test ")
261 ACE_TEXT (" [-d] (disable priority reactor)\n")
262 ACE_TEXT (" [-c nchildren] (number of threads/processes)\n")
263 ACE_TEXT (" [-l loops] (number of loops per child)\n")
264 ACE_TEXT (" [-m maxretries] (attempts to connect)\n")
265 ACE_TEXT (" [-t max_time] (limits test duration)\n")),
267 ACE_NOTREACHED (break);
270 // Manage Reactor memory automagically.
271 // Note: If opt_priority_reactor is false, the default ACE_Reactor is used
272 // and we don't need to set one up.
273 ACE_Reactor
*orig_reactor
= 0;
274 auto_ptr
<ACE_Reactor
> reactor
;
276 if (opt_priority_reactor
)
278 ACE_Select_Reactor
*impl_ptr
;
279 ACE_NEW_RETURN (impl_ptr
, ACE_Priority_Reactor
, -1);
280 auto_ptr
<ACE_Select_Reactor
> auto_impl (impl_ptr
);
282 ACE_Reactor
*reactor_ptr
;
283 ACE_NEW_RETURN (reactor_ptr
, ACE_Reactor (impl_ptr
, 1), -1);
284 auto_impl
.release (); // ACE_Reactor dtor will take it from here
285 auto_ptr
<ACE_Reactor
> auto_reactor (reactor_ptr
);
286 reactor
= auto_reactor
;
287 orig_reactor
= ACE_Reactor::instance (reactor_ptr
);
290 Read_Handler::set_countdown (opt_nchildren
);
292 #ifndef ACE_LACKS_ACCEPT
296 acceptor
.priority (ACE_Event_Handler::HI_PRIORITY
);
297 ACE_INET_Addr server_addr
;
299 // Bind acceptor to any port and then find out what the port was.
300 if (acceptor
.open (ACE_sap_any_cast (const ACE_INET_Addr
&)) == -1
301 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
302 ACE_ERROR_RETURN ((LM_ERROR
,
303 ACE_TEXT ("(%P|%t) %p\n"),
307 ACE_DEBUG ((LM_DEBUG
,
308 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
309 server_addr
.get_port_number ()));
311 ACE_INET_Addr
connection_addr (server_addr
.get_port_number (),
312 ACE_DEFAULT_SERVER_HOST
);
316 #if defined (ACE_HAS_THREADS)
317 for (i
= 0; i
< opt_nchildren
; ++i
)
319 if (ACE_Thread_Manager::instance ()->spawn
320 (ACE_THR_FUNC (client
),
321 (void *) &connection_addr
,
322 THR_NEW_LWP
| THR_DETACHED
) == -1)
323 ACE_ERROR ((LM_ERROR
,
324 ACE_TEXT ("(%P|%t) %p\n%a"),
325 ACE_TEXT ("thread create failed"),
328 #elif !defined (ACE_LACKS_FORK)
329 for (i
= 0; i
< opt_nchildren
; ++i
)
331 switch (ACE_OS::fork ("child"))
334 ACE_ERROR ((LM_ERROR
,
335 ACE_TEXT ("(%P|%t) %p\n%a"),
336 ACE_TEXT ("fork failed"),
341 client (&connection_addr
);
352 ACE_TEXT ("(%P|%t) ")
353 ACE_TEXT ("only one thread may be run ")
354 ACE_TEXT ("in a process on this platform\n")));
355 #endif /* ACE_HAS_THREADS */
357 ACE_Time_Value
tv (opt_max_duration
);
359 ACE_Reactor::instance()->register_handler
360 (&acceptor
, ACE_Event_Handler::READ_MASK
);
361 ACE_Reactor::instance()->run_reactor_event_loop (tv
);
363 if (Read_Handler::get_countdown () != 0)
365 ACE_DEBUG ((LM_DEBUG
,
366 ACE_TEXT ("(%P|%t) running out of time, ")
367 ACE_TEXT ("probably due to failed connections.\n")));
370 ACE_DEBUG ((LM_DEBUG
,
371 ACE_TEXT ("(%P|%t) waiting for the children...\n")));
373 #if defined (ACE_HAS_THREADS)
374 ACE_Thread_Manager::instance ()->wait ();
375 #elif !defined (ACE_WIN32) && !defined (VXWORKS)
376 for (i
= 0; i
< opt_nchildren
; ++i
)
378 pid_t pid
= ACE_OS::wait();
379 ACE_DEBUG ((LM_DEBUG
,
380 ACE_TEXT ("(%P|%t) child %d terminated\n"),
383 #endif /* ACE_HAS_THREADS */
385 #endif // ACE_LACKS_ACCEPT
387 if (orig_reactor
!= 0)
388 ACE_Reactor::instance (orig_reactor
);