2 //=============================================================================
4 * @file MEM_Stream_Test.cpp
6 * This is a test of the <ACE_MEM_Acceptor> and
7 * <ACE_MEM_Connector> classes.
9 * @author Nanbor Wang <nanbor@cs.wustl.edu>
11 //=============================================================================
13 #include "test_config.h"
14 #include "ace/OS_NS_stdio.h"
15 #include "ace/OS_NS_string.h"
16 #include "ace/OS_NS_unistd.h"
17 #include "ace/Get_Opt.h"
18 #include "ace/Thread_Manager.h"
19 #include "ace/MEM_Connector.h"
20 #include "ace/MEM_Acceptor.h"
21 #include "ace/Select_Reactor.h"
22 #include "ace/Connector.h"
23 #include "ace/Acceptor.h"
24 #include "ace/Svc_Handler.h"
25 #include "ace/Singleton.h"
26 #include "ace/Atomic_Op.h"
28 #if (defined (ACE_HAS_THREADS) || defined (ACE_HAS_PROCESS_SPAWN)) && \
29 (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
31 #if !defined (ACE_HAS_PROCESS_SPAWN) && defined (ACE_HAS_THREADS)
32 # define _TEST_USES_THREADS
34 # define _TEST_USES_PROCESSES
35 # include "ace/Process.h"
36 # include "ace/Process_Manager.h"
39 #include "MEM_Stream_Test.h" // Defines Echo_Handler
41 #define NUMBER_OF_REACTIVE_CONNECTIONS 3
42 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
43 # define NUMBER_OF_MT_CONNECTIONS 3
45 // We will use SysV Semaphore in this case which is not very scalable
46 // and can only handle one connection.
47 # define NUMBER_OF_MT_CONNECTIONS 1
48 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
50 #define NUMBER_OF_ITERATIONS 100
52 // If we don't have winsock2 we can't use WFMO_Reactor.
53 #if defined (ACE_WIN32) \
54 && defined (ACE_HAS_WINSOCK2) \
55 && ACE_HAS_WINSOCK2 != 0
56 # define TEST_CAN_USE_WFMO_REACTOR
59 #if defined (TEST_CAN_USE_WFMO_REACTOR)
60 static const int opt_wfmo_reactor
= 1;
61 #endif /* TEST_CAN_USE_WFMO_REACTOR */
63 static int opt_select_reactor
= 1;
64 static ACE_MEM_IO::Signal_Strategy client_strategy
= ACE_MEM_IO::Reactive
;
66 using WaitingCounter
= ACE_Atomic_Op
<ACE_MT_SYNCH::MUTEX
, u_short
>;
67 using Waiting
= ACE_Singleton
<WaitingCounter
, ACE_MT_SYNCH::RECURSIVE_MUTEX
>;
69 // Number of connections that are currently open
70 static u_short connection_count
= 0;
72 using ACCEPTOR
= ACE_Acceptor
<Echo_Handler
, ACE_MEM_Acceptor
>;
73 using S_ACCEPTOR
= ACE_Strategy_Acceptor
<Echo_Handler
, ACE_MEM_Acceptor
>;
75 void reset_handler (int conn
)
77 // Reset the number of connection the test should perform.
78 *Waiting::instance () = conn
;
83 Echo_Handler::open (void *)
88 Echo_Handler::Echo_Handler (ACE_Thread_Manager
*thr_mgr
)
89 : ACE_Svc_Handler
<ACE_MEM_STREAM
, ACE_SYNCH
> (thr_mgr
),
90 connection_ (++connection_count
)
92 ACE_OS::snprintf (this->name_
, MAXPATHLEN
, ACE_TEXT ("Connection %d --> "),
97 Echo_Handler::handle_input (ACE_HANDLE
)
99 ACE_TCHAR buf
[MAXPATHLEN
];
102 len
= this->peer ().recv (buf
, MAXPATHLEN
* sizeof (ACE_TCHAR
));
105 ACE_ERROR_RETURN ((LM_ERROR
,
106 ACE_TEXT ("Error receiving from MEM_Stream\n")),
108 else if (len
== 0) // Connection closed.
111 ACE_TEXT ("Connection %d closed\n"),
116 ACE_TCHAR return_buf
[MAXPATHLEN
];
117 ACE_OS::strcpy (return_buf
, this->name_
);
118 ACE_OS::strcat (return_buf
, buf
);
119 len
= (ACE_OS::strlen (return_buf
) + 1) * sizeof (ACE_TCHAR
);
121 if (this->peer ().send (return_buf
, len
) != len
)
122 ACE_ERROR_RETURN ((LM_ERROR
,
123 ACE_TEXT ("Error sending from MEM_Stream\n")),
130 Echo_Handler::handle_close (ACE_HANDLE
,
131 ACE_Reactor_Mask mask
)
134 (*Waiting::instance ())--;
136 if (client_strategy
!= ACE_MEM_IO::Reactive
)
137 this->reactor ()->remove_handler (this,
138 mask
| ACE_Event_Handler::DONT_CALL
);
140 // If no connections are open.
141 if (*Waiting::instance () == 0)
142 ACE_Reactor::instance ()->end_reactor_event_loop ();
144 ACE_DEBUG ((LM_DEBUG
,
145 ACE_TEXT ("(%t) Echo_Handler %d::handle_close closing down\n"),
156 while (this->handle_input (this->get_handle ()) >= 0)
162 run_client (u_short port
,
163 ACE_MEM_IO::Signal_Strategy strategy
)
166 ACE_MEM_Addr
to_server (port
);
167 ACE_MEM_Connector connector
;
168 connector
.preferred_strategy (strategy
);
169 ACE_MEM_Stream stream
;
171 // connector.preferred_strategy (ACE_MEM_IO::MT);
173 if (connector
.connect (stream
, to_server
.get_remote_addr ()) == -1)
174 ACE_ERROR_RETURN ((LM_ERROR
,
175 ACE_TEXT ("Failed to connect to <%C> %p\n"),
176 to_server
.get_host_name (),
177 ACE_TEXT ("connector.connect()")),
180 ACE_TCHAR buf
[MAXPATHLEN
];
182 for (size_t cntr
= 0; cntr
< NUMBER_OF_ITERATIONS
; cntr
++)
184 ACE_OS::snprintf (buf
, MAXPATHLEN
,
185 ACE_TEXT ("Iteration ") ACE_SIZE_T_FORMAT_SPECIFIER
,
188 ssize_t slen
= (ACE_OS::strlen (buf
) + 1) * sizeof (ACE_TCHAR
);
190 if (stream
.send (buf
, slen
) < slen
)
192 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"),
193 ACE_TEXT ("In stream.send()")));
198 if (stream
.recv (buf
, MAXPATHLEN
) == -1)
200 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"),
201 ACE_TEXT ("stream.recv()")));
206 ACE_DEBUG ((LM_DEBUG
,
207 ACE_TEXT ("run_client(), got echo %s\n"),
211 status
= stream
.close () == -1 ? -1 : status
;
215 #if defined (_TEST_USES_THREADS)
217 connect_client (void *arg
)
219 u_short
*sport
= reinterpret_cast <u_short
*> (arg
);
220 run_client (*sport
, client_strategy
);
228 ACE_Reactor_Impl
*impl
= 0;
230 #if defined (TEST_CAN_USE_WFMO_REACTOR)
231 if (opt_wfmo_reactor
)
234 #endif /* TEST_CAN_USE_WFMO_REACTOR */
236 if (impl
== 0 && opt_select_reactor
)
240 ACE_Reactor
*reactor
= 0;
243 ACE_Reactor::instance (reactor
);
247 test_reactive (const ACE_TCHAR
*prog
,
248 ACE_MEM_Addr
&server_addr
)
250 ACE_DEBUG ((LM_DEBUG
, "Testing Reactive MEM_Stream\n\n"));
254 client_strategy
= ACE_MEM_IO::Reactive
; // Echo_Handler uses this.
256 ACE_Accept_Strategy
<Echo_Handler
, ACE_MEM_ACCEPTOR
> accept_strategy
;
257 ACE_Creation_Strategy
<Echo_Handler
> create_strategy
;
258 ACE_Reactive_Strategy
<Echo_Handler
> reactive_strategy (ACE_Reactor::instance ());
260 if (acceptor
.open (server_addr
,
261 ACE_Reactor::instance (),
264 &reactive_strategy
) == -1)
265 ACE_ERROR_RETURN ((LM_ERROR
,
266 ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
267 acceptor
.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
269 ACE_MEM_Addr local_addr
;
270 if (acceptor
.acceptor ().get_local_addr (local_addr
) == -1)
271 ACE_ERROR_RETURN ((LM_ERROR
,
272 ACE_TEXT ("MEM_Acceptor::get_local_addr\n")),
275 u_short sport
= local_addr
.get_port_number ();
277 #if defined (_TEST_USES_THREADS)
278 if (ACE_Thread_Manager::instance ()->spawn_n (NUMBER_OF_REACTIVE_CONNECTIONS
,
281 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n ()")));
283 ACE_Process_Options opts
;
284 opts
.command_line (ACE_TEXT ("%") ACE_TEXT_PRIs
ACE_TEXT (" -p%d -r"), prog
, sport
);
285 if (ACE_Process_Manager::instance ()->spawn_n (NUMBER_OF_REACTIVE_CONNECTIONS
,
287 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n ()")));
288 #endif /* _TEST_USES_THREADS */
290 ACE_Time_Value
tv (60, 0);
291 ACE_Reactor::instance ()->run_reactor_event_loop (tv
);
293 if (tv
== ACE_Time_Value::zero
)
295 ACE_ERROR ((LM_ERROR
,
296 ACE_TEXT ("Reactor::run_event_loop timeout\n")));
300 ACE_DEBUG ((LM_DEBUG
, "Reactor::run_event_loop finished\n"));
302 #if defined (_TEST_USES_THREADS)
303 if (ACE_Thread_Manager::instance ()->wait () == -1)
304 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
306 if (ACE_Process_Manager::instance ()->wait () == -1)
307 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
308 #endif /* _TEST_USES_THREADS */
310 if (acceptor
.close () == -1)
312 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"),
313 ACE_TEXT ("MEM_Acceptor::close\n")));
317 ACE_UNUSED_ARG (prog
);
322 test_concurrent (const ACE_TCHAR
*prog
,
323 ACE_MEM_Addr
&server_addr
)
325 ACE_DEBUG ((LM_DEBUG
, "Testing Multithreaded MEM_Stream\n\n"));
328 client_strategy
= ACE_MEM_IO::MT
; // Echo_Handler uses this.
330 ACE_Accept_Strategy
<Echo_Handler
, ACE_MEM_ACCEPTOR
> accept_strategy
;
331 ACE_Creation_Strategy
<Echo_Handler
> create_strategy
;
332 #if defined (ACE_HAS_THREADS)
333 ACE_Thread_Strategy
<Echo_Handler
> act_strategy
;
335 ACE_Reactive_Strategy
<Echo_Handler
> act_strategy (ACE_Reactor::instance ());
336 #endif /* ACE_HAS_THREADS */
339 if (acceptor
.open (server_addr
,
340 ACE_Reactor::instance (),
343 &act_strategy
) == -1)
344 ACE_ERROR_RETURN ((LM_ERROR
,
345 ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
347 // Make sure the MEM_Stream created by the underlying MEM_Acceptor
348 // is capable of passing messages of 1MB.
349 acceptor
.acceptor ().init_buffer_size (1024 * 1024);
350 acceptor
.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
351 acceptor
.acceptor ().preferred_strategy (ACE_MEM_IO::MT
);
353 ACE_MEM_Addr local_addr
;
354 if (acceptor
.acceptor ().get_local_addr (local_addr
) == -1)
355 ACE_ERROR_RETURN ((LM_ERROR
,
356 ACE_TEXT ("MEM_Acceptor::get_local_addr\n")),
359 u_short sport
= local_addr
.get_port_number ();
361 #if defined (_TEST_USES_THREADS)
362 ACE_UNUSED_ARG (prog
);
364 if (ACE_Thread_Manager::instance ()->spawn_n (NUMBER_OF_MT_CONNECTIONS
,
367 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n()")));
369 ACE_Process_Options opts
;
370 opts
.command_line (ACE_TEXT ("%") ACE_TEXT_PRIs
ACE_TEXT (" -p%d -m"), prog
, sport
);
371 if (ACE_Process_Manager::instance ()->spawn_n (NUMBER_OF_MT_CONNECTIONS
,
373 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n()")));
374 #endif /* _TEST_USES_THREADS */
376 ACE_Time_Value
tv (60, 0);
377 ACE_Reactor::instance ()->run_reactor_event_loop (tv
);
379 if (tv
== ACE_Time_Value::zero
)
381 ACE_ERROR ((LM_ERROR
,
382 ACE_TEXT ("Reactor::run_event_loop timeout\n")));
386 ACE_DEBUG ((LM_DEBUG
, "Reactor::run_event_loop finished\n"));
388 #if defined (_TEST_USES_THREADS)
389 if (ACE_Thread_Manager::instance ()->wait () == -1)
390 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
392 if (ACE_Process_Manager::instance ()->wait () == -1)
393 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
394 #endif /* _TEST_USES_THREADS */
396 if (acceptor
.close () == -1)
398 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"),
399 ACE_TEXT ("MEM_Acceptor::close")));
407 run_main (int argc
, ACE_TCHAR
*argv
[])
413 // This is the "master" process.
415 ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
416 #ifndef ACE_LACKS_ACCEPT
418 ACE_MEM_Addr
server_addr (port
);
420 reset_handler (NUMBER_OF_REACTIVE_CONNECTIONS
);
422 test_reactive (argc
> 0 ? argv
[0] : ACE_TEXT ("MEM_Stream_Test"), server_addr
);
424 ACE_Reactor::instance ()->reset_reactor_event_loop ();
426 #if !defined (ACE_WIN32) && defined (_ACE_USE_SV_SEM)
427 ACE_ERROR ((LM_DEBUG
,
428 ACE_TEXT ("\n *** Platform only supports non-scalable SysV semaphores ***\n\n")));
429 #endif /* !ACE_WIN32 && _ACE_USE_SV_SEM */
430 reset_handler (NUMBER_OF_MT_CONNECTIONS
);
432 test_concurrent (argc
> 0 ? argv
[0] : ACE_TEXT ("MEM_Stream_Test"), server_addr
);
434 #endif // ACE_LACKS_ACCEPT
440 // We end up here if this is a child process spawned for one of
441 // the test passes. command line is: -p <port> -r (reactive) |
442 // -m (multithreaded)
444 ACE_TCHAR lognm
[MAXPATHLEN
];
445 int mypid (ACE_OS::getpid ());
446 ACE_OS::snprintf (lognm
, MAXPATHLEN
,
447 ACE_TEXT ("MEM_Stream_Test-%d"), mypid
);
448 ACE_START_TEST (lognm
);
450 ACE_Get_Opt
opts (argc
, argv
, ACE_TEXT ("p:rm"));
451 int opt
, iport
, status
;
452 ACE_MEM_IO::Signal_Strategy model
= ACE_MEM_IO::Reactive
;
454 while ((opt
= opts()) != -1)
459 iport
= ACE_OS::atoi (opts
.opt_arg ());
460 port
= static_cast <u_short
> (iport
);
464 model
= ACE_MEM_IO::Reactive
;
468 model
= ACE_MEM_IO::MT
;
472 ACE_ERROR_RETURN ((LM_ERROR
,
473 ACE_TEXT ("Invalid option (-p <port> -r | -m)\n")),
477 status
= run_client (port
, model
);
483 #define ACE_Atomic_Op_type \
484 ACE_Atomic_Op< ACE_SYNCH_MUTEX, u_short>
485 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton
, ACE_Atomic_Op_type
, ACE_SYNCH_RECURSIVE_MUTEX
);
490 run_main (int, ACE_TCHAR
*[])
492 ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
495 ACE_TEXT ("position independent pointers ")
496 ACE_TEXT ("not supported on this platform\n")));
501 #endif /* (ACE_HAS_THREADS || ACE_HAS_PROCESS_SPAWN) && ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */