2 //=============================================================================
4 * @file MEM_Stream_Test.cpp
6 * This is a test of the <ACE_MEM_Acceptor> and
7 * <ACE_MEM_Connector> classes.
9 * @author Nanbor Wang <nanbor@cs.wustl.edu>
11 //=============================================================================
13 #include "test_config.h"
14 #include "ace/OS_NS_stdio.h"
15 #include "ace/OS_NS_string.h"
16 #include "ace/OS_NS_unistd.h"
17 #include "ace/Get_Opt.h"
18 #include "ace/Thread_Manager.h"
19 #include "ace/MEM_Connector.h"
20 #include "ace/MEM_Acceptor.h"
21 #include "ace/Select_Reactor.h"
22 #include "ace/Connector.h"
23 #include "ace/Acceptor.h"
24 #include "ace/Svc_Handler.h"
25 #include "ace/Singleton.h"
26 #include "ace/Atomic_Op.h"
28 #if (defined (ACE_HAS_THREADS) || defined (ACE_HAS_PROCESS_SPAWN)) && \
29 (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
31 #if !defined (ACE_HAS_PROCESS_SPAWN) && defined (ACE_HAS_THREADS)
32 # define _TEST_USES_THREADS
34 # define _TEST_USES_PROCESSES
35 # include "ace/Process.h"
36 # include "ace/Process_Manager.h"
39 #include "MEM_Stream_Test.h" // Defines Echo_Handler
41 #define NUMBER_OF_REACTIVE_CONNECTIONS 3
42 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
43 # define NUMBER_OF_MT_CONNECTIONS 3
45 // We will use SysV Semaphore in this case which is not very scalable
46 // and can only handle one connection.
47 # define NUMBER_OF_MT_CONNECTIONS 1
48 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
50 #define NUMBER_OF_ITERATIONS 100
52 // If we don't have winsock2 we can't use WFMO_Reactor.
53 #if defined (ACE_WIN32) \
54 && defined (ACE_HAS_WINSOCK2) \
55 && ACE_HAS_WINSOCK2 != 0
56 # define TEST_CAN_USE_WFMO_REACTOR
59 #if defined (TEST_CAN_USE_WFMO_REACTOR)
60 static const int opt_wfmo_reactor
= 1;
61 #endif /* TEST_CAN_USE_WFMO_REACTOR */
63 static int opt_select_reactor
= 1;
64 static ACE_MEM_IO::Signal_Strategy client_strategy
= ACE_MEM_IO::Reactive
;
66 typedef ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, u_short
> WaitingCounter
;
67 typedef ACE_Singleton
<WaitingCounter
, ACE_SYNCH_RECURSIVE_MUTEX
> Waiting
;
69 // Number of connections that are currently open
70 static u_short connection_count
= 0;
72 typedef ACE_Acceptor
<Echo_Handler
, ACE_MEM_ACCEPTOR
> ACCEPTOR
;
73 typedef ACE_Strategy_Acceptor
<Echo_Handler
, ACE_MEM_ACCEPTOR
> S_ACCEPTOR
;
75 void reset_handler (int conn
)
77 // Reset the number of connection the test should perform.
78 *Waiting::instance () = conn
;
83 Echo_Handler::open (void *)
88 Echo_Handler::Echo_Handler (ACE_Thread_Manager
*thr_mgr
)
89 : ACE_Svc_Handler
<ACE_MEM_STREAM
, ACE_SYNCH
> (thr_mgr
),
90 connection_ (++connection_count
)
92 ACE_OS::snprintf (this->name_
, MAXPATHLEN
, ACE_TEXT ("Connection %d --> "),
97 Echo_Handler::handle_input (ACE_HANDLE
)
99 ACE_TCHAR buf
[MAXPATHLEN
];
102 len
= this->peer ().recv (buf
, MAXPATHLEN
* sizeof (ACE_TCHAR
));
105 ACE_ERROR_RETURN ((LM_ERROR
,
106 ACE_TEXT ("Error receiving from MEM_Stream\n")),
108 else if (len
== 0) // Connection closed.
111 ACE_TEXT ("Connection %d closed\n"),
116 ACE_TCHAR return_buf
[MAXPATHLEN
];
117 ACE_OS::strcpy (return_buf
, this->name_
);
118 ACE_OS::strcat (return_buf
, buf
);
119 len
= (ACE_OS::strlen (return_buf
) + 1) * sizeof (ACE_TCHAR
);
121 if (this->peer ().send (return_buf
, len
) != len
)
122 ACE_ERROR_RETURN ((LM_ERROR
,
123 ACE_TEXT ("Error sending from MEM_Stream\n")),
130 Echo_Handler::handle_close (ACE_HANDLE
,
131 ACE_Reactor_Mask mask
)
134 (*Waiting::instance ())--;
136 if (client_strategy
!= ACE_MEM_IO::Reactive
)
137 this->reactor ()->remove_handler (this,
138 mask
| ACE_Event_Handler::DONT_CALL
);
140 // If no connections are open.
141 if (*Waiting::instance () == 0)
142 ACE_Reactor::instance ()->end_reactor_event_loop ();
144 ACE_DEBUG ((LM_DEBUG
,
145 ACE_TEXT ("(%t) Echo_Handler %d::handle_close closing down\n"),
154 Echo_Handler::svc (void)
156 while (this->handle_input (this->get_handle ()) >= 0)
162 run_client (u_short port
,
163 ACE_MEM_IO::Signal_Strategy strategy
)
166 ACE_MEM_Addr
to_server (port
);
167 ACE_MEM_Connector connector
;
168 connector
.preferred_strategy (strategy
);
169 ACE_MEM_Stream stream
;
171 // connector.preferred_strategy (ACE_MEM_IO::MT);
173 if (connector
.connect (stream
, to_server
.get_remote_addr ()) == -1)
174 ACE_ERROR_RETURN ((LM_ERROR
,
175 ACE_TEXT ("Failed to connect to <%C> %p\n"),
176 to_server
.get_host_name (),
177 ACE_TEXT ("connector.connect()")),
180 ACE_TCHAR buf
[MAXPATHLEN
];
182 for (size_t cntr
= 0; cntr
< NUMBER_OF_ITERATIONS
; cntr
++)
184 ACE_OS::snprintf (buf
, MAXPATHLEN
,
185 ACE_TEXT ("Iteration ") ACE_SIZE_T_FORMAT_SPECIFIER
,
188 ssize_t slen
= (ACE_OS::strlen (buf
) + 1) * sizeof (ACE_TCHAR
);
190 if (stream
.send (buf
, slen
) < slen
)
192 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"),
193 ACE_TEXT ("In stream.send()")));
198 if (stream
.recv (buf
, MAXPATHLEN
) == -1)
200 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"),
201 ACE_TEXT ("stream.recv()")));
206 ACE_DEBUG ((LM_DEBUG
,
207 ACE_TEXT ("run_client(), got echo %s\n"),
211 status
= stream
.close () == -1 ? -1 : status
;
215 #if defined (_TEST_USES_THREADS)
217 connect_client (void *arg
)
219 u_short
*sport
= reinterpret_cast <u_short
*> (arg
);
220 run_client (*sport
, client_strategy
);
226 create_reactor (void)
228 ACE_Reactor_Impl
*impl
= 0;
230 #if defined (TEST_CAN_USE_WFMO_REACTOR)
231 if (opt_wfmo_reactor
)
234 #endif /* TEST_CAN_USE_WFMO_REACTOR */
236 if (impl
== 0 && opt_select_reactor
)
240 ACE_Reactor
*reactor
= 0;
243 ACE_Reactor::instance (reactor
);
247 test_reactive (const ACE_TCHAR
*prog
,
248 ACE_MEM_Addr
&server_addr
)
250 ACE_DEBUG ((LM_DEBUG
, "Testing Reactive MEM_Stream\n\n"));
254 client_strategy
= ACE_MEM_IO::Reactive
; // Echo_Handler uses this.
256 ACE_Accept_Strategy
<Echo_Handler
, ACE_MEM_ACCEPTOR
> accept_strategy
;
257 ACE_Creation_Strategy
<Echo_Handler
> create_strategy
;
258 ACE_Reactive_Strategy
<Echo_Handler
> reactive_strategy (ACE_Reactor::instance ());
260 if (acceptor
.open (server_addr
,
261 ACE_Reactor::instance (),
264 &reactive_strategy
) == -1)
265 ACE_ERROR_RETURN ((LM_ERROR
,
266 ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
267 acceptor
.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
269 ACE_MEM_Addr local_addr
;
270 if (acceptor
.acceptor ().get_local_addr (local_addr
) == -1)
271 ACE_ERROR_RETURN ((LM_ERROR
,
272 ACE_TEXT ("MEM_Acceptor::get_local_addr\n")),
275 u_short sport
= local_addr
.get_port_number ();
277 #if defined (_TEST_USES_THREADS)
278 if (ACE_Thread_Manager::instance ()->spawn_n (NUMBER_OF_REACTIVE_CONNECTIONS
,
281 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n ()")));
283 ACE_Process_Options opts
;
284 # if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR)
285 const ACE_TCHAR
*cmdline_fmt
= ACE_TEXT ("%s -p%d -r");
287 const ACE_TCHAR
*cmdline_fmt
= ACE_TEXT ("%ls -p%d -r");
288 # endif /* ACE_WIN32 || !ACE_USES_WCHAR */
289 opts
.command_line (cmdline_fmt
, prog
, sport
);
290 if (ACE_Process_Manager::instance ()->spawn_n (NUMBER_OF_REACTIVE_CONNECTIONS
,
292 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n ()")));
293 #endif /* _TEST_USES_THREADS */
295 ACE_Time_Value
tv (60, 0);
296 ACE_Reactor::instance ()->run_reactor_event_loop (tv
);
298 if (tv
== ACE_Time_Value::zero
)
300 ACE_ERROR ((LM_ERROR
,
301 ACE_TEXT ("Reactor::run_event_loop timeout\n")));
305 ACE_DEBUG ((LM_DEBUG
, "Reactor::run_event_loop finished\n"));
307 #if defined (_TEST_USES_THREADS)
308 if (ACE_Thread_Manager::instance ()->wait () == -1)
309 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
311 if (ACE_Process_Manager::instance ()->wait () == -1)
312 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
313 #endif /* _TEST_USES_THREADS */
315 if (acceptor
.close () == -1)
317 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"),
318 ACE_TEXT ("MEM_Acceptor::close\n")));
322 ACE_UNUSED_ARG (prog
);
327 test_concurrent (const ACE_TCHAR
*prog
,
328 ACE_MEM_Addr
&server_addr
)
330 ACE_DEBUG ((LM_DEBUG
, "Testing Multithreaded MEM_Stream\n\n"));
333 client_strategy
= ACE_MEM_IO::MT
; // Echo_Handler uses this.
335 ACE_Accept_Strategy
<Echo_Handler
, ACE_MEM_ACCEPTOR
> accept_strategy
;
336 ACE_Creation_Strategy
<Echo_Handler
> create_strategy
;
337 #if defined (ACE_HAS_THREADS)
338 ACE_Thread_Strategy
<Echo_Handler
> act_strategy
;
340 ACE_Reactive_Strategy
<Echo_Handler
> act_strategy (ACE_Reactor::instance ());
341 #endif /* ACE_HAS_THREADS */
344 if (acceptor
.open (server_addr
,
345 ACE_Reactor::instance (),
348 &act_strategy
) == -1)
349 ACE_ERROR_RETURN ((LM_ERROR
,
350 ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
352 // Make sure the MEM_Stream created by the underlying MEM_Acceptor
353 // is capable of passing messages of 1MB.
354 acceptor
.acceptor ().init_buffer_size (1024 * 1024);
355 acceptor
.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
356 acceptor
.acceptor ().preferred_strategy (ACE_MEM_IO::MT
);
358 ACE_MEM_Addr local_addr
;
359 if (acceptor
.acceptor ().get_local_addr (local_addr
) == -1)
360 ACE_ERROR_RETURN ((LM_ERROR
,
361 ACE_TEXT ("MEM_Acceptor::get_local_addr\n")),
364 u_short sport
= local_addr
.get_port_number ();
366 #if defined (_TEST_USES_THREADS)
367 ACE_UNUSED_ARG (prog
);
369 if (ACE_Thread_Manager::instance ()->spawn_n (NUMBER_OF_MT_CONNECTIONS
,
372 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n()")));
374 ACE_Process_Options opts
;
375 # if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR)
376 const ACE_TCHAR
*cmdline_fmt
= ACE_TEXT ("%s -p%d -m");
378 const ACE_TCHAR
*cmdline_fmt
= ACE_TEXT ("%ls -p%d -m");
379 # endif /* ACE_WIN32 || !ACE_USES_WCHAR */
380 opts
.command_line (cmdline_fmt
, prog
, sport
);
381 if (ACE_Process_Manager::instance ()->spawn_n (NUMBER_OF_MT_CONNECTIONS
,
383 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n()")));
384 #endif /* _TEST_USES_THREADS */
386 ACE_Time_Value
tv (60, 0);
387 ACE_Reactor::instance ()->run_reactor_event_loop (tv
);
389 if (tv
== ACE_Time_Value::zero
)
391 ACE_ERROR ((LM_ERROR
,
392 ACE_TEXT ("Reactor::run_event_loop timeout\n")));
396 ACE_DEBUG ((LM_DEBUG
, "Reactor::run_event_loop finished\n"));
398 #if defined (_TEST_USES_THREADS)
399 if (ACE_Thread_Manager::instance ()->wait () == -1)
400 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
402 if (ACE_Process_Manager::instance ()->wait () == -1)
403 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
404 #endif /* _TEST_USES_THREADS */
406 if (acceptor
.close () == -1)
408 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"),
409 ACE_TEXT ("MEM_Acceptor::close")));
417 run_main (int argc
, ACE_TCHAR
*argv
[])
423 // This is the "master" process.
425 ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
426 #ifndef ACE_LACKS_ACCEPT
428 ACE_MEM_Addr
server_addr (port
);
430 reset_handler (NUMBER_OF_REACTIVE_CONNECTIONS
);
432 test_reactive (argc
> 0 ? argv
[0] : ACE_TEXT ("MEM_Stream_Test"), server_addr
);
434 ACE_Reactor::instance ()->reset_reactor_event_loop ();
436 #if !defined (ACE_WIN32) && defined (_ACE_USE_SV_SEM)
437 ACE_ERROR ((LM_DEBUG
,
438 ACE_TEXT ("\n *** Platform only supports non-scalable SysV semaphores ***\n\n")));
439 #endif /* !ACE_WIN32 && _ACE_USE_SV_SEM */
440 reset_handler (NUMBER_OF_MT_CONNECTIONS
);
442 test_concurrent (argc
> 0 ? argv
[0] : ACE_TEXT ("MEM_Stream_Test"), server_addr
);
444 #endif // ACE_LACKS_ACCEPT
450 // We end up here if this is a child process spawned for one of
451 // the test passes. command line is: -p <port> -r (reactive) |
452 // -m (multithreaded)
454 ACE_TCHAR lognm
[MAXPATHLEN
];
455 int mypid (ACE_OS::getpid ());
456 ACE_OS::snprintf (lognm
, MAXPATHLEN
,
457 ACE_TEXT ("MEM_Stream_Test-%d"), mypid
);
458 ACE_START_TEST (lognm
);
460 ACE_Get_Opt
opts (argc
, argv
, ACE_TEXT ("p:rm"));
461 int opt
, iport
, status
;
462 ACE_MEM_IO::Signal_Strategy model
= ACE_MEM_IO::Reactive
;
464 while ((opt
= opts()) != -1)
469 iport
= ACE_OS::atoi (opts
.opt_arg ());
470 port
= static_cast <u_short
> (iport
);
474 model
= ACE_MEM_IO::Reactive
;
478 model
= ACE_MEM_IO::MT
;
482 ACE_ERROR_RETURN ((LM_ERROR
,
483 ACE_TEXT ("Invalid option (-p <port> -r | -m)\n")),
487 status
= run_client (port
, model
);
493 #define ACE_Atomic_Op_type \
494 ACE_Atomic_Op< ACE_SYNCH_MUTEX, u_short>
495 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton
, ACE_Atomic_Op_type
, ACE_SYNCH_RECURSIVE_MUTEX
);
500 run_main (int, ACE_TCHAR
*[])
502 ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
505 ACE_TEXT ("position independent pointers ")
506 ACE_TEXT ("not supported on this platform\n")));
511 #endif /* (ACE_HAS_THREADS || ACE_HAS_PROCESS_SPAWN) && ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */