Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / MEM_Stream_Test.cpp
blob0819d766df4a4b9e47c3f5177b4ed098db0d175e
2 //=============================================================================
3 /**
4 * @file MEM_Stream_Test.cpp
6 * This is a test of the <ACE_MEM_Acceptor> and
7 * <ACE_MEM_Connector> classes.
9 * @author Nanbor Wang <nanbor@cs.wustl.edu>
11 //=============================================================================
13 #include "test_config.h"
14 #include "ace/OS_NS_stdio.h"
15 #include "ace/OS_NS_string.h"
16 #include "ace/OS_NS_unistd.h"
17 #include "ace/Get_Opt.h"
18 #include "ace/Thread_Manager.h"
19 #include "ace/MEM_Connector.h"
20 #include "ace/MEM_Acceptor.h"
21 #include "ace/Select_Reactor.h"
22 #include "ace/Connector.h"
23 #include "ace/Acceptor.h"
24 #include "ace/Svc_Handler.h"
25 #include "ace/Singleton.h"
26 #include "ace/Atomic_Op.h"
28 #if (defined (ACE_HAS_THREADS) || defined (ACE_HAS_PROCESS_SPAWN)) && \
29 (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
31 #if !defined (ACE_HAS_PROCESS_SPAWN) && defined (ACE_HAS_THREADS)
32 # define _TEST_USES_THREADS
33 #else
34 # define _TEST_USES_PROCESSES
35 # include "ace/Process.h"
36 # include "ace/Process_Manager.h"
37 #endif
39 #include "MEM_Stream_Test.h" // Defines Echo_Handler
41 #define NUMBER_OF_REACTIVE_CONNECTIONS 3
42 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
43 # define NUMBER_OF_MT_CONNECTIONS 3
44 #else
45 // We will use SysV Semaphore in this case which is not very scalable
46 // and can only handle one connection.
47 # define NUMBER_OF_MT_CONNECTIONS 1
48 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
50 #define NUMBER_OF_ITERATIONS 100
52 // If we don't have winsock2 we can't use WFMO_Reactor.
53 #if defined (ACE_WIN32) \
54 && defined (ACE_HAS_WINSOCK2) \
55 && ACE_HAS_WINSOCK2 != 0
56 # define TEST_CAN_USE_WFMO_REACTOR
57 #endif
59 #if defined (TEST_CAN_USE_WFMO_REACTOR)
60 static const int opt_wfmo_reactor = 1;
61 #endif /* TEST_CAN_USE_WFMO_REACTOR */
63 static int opt_select_reactor = 1;
64 static ACE_MEM_IO::Signal_Strategy client_strategy = ACE_MEM_IO::Reactive;
66 typedef ACE_Atomic_Op <ACE_SYNCH_MUTEX, u_short> WaitingCounter;
67 typedef ACE_Singleton <WaitingCounter, ACE_SYNCH_RECURSIVE_MUTEX> Waiting;
69 // Number of connections that are currently open
70 static u_short connection_count = 0;
72 typedef ACE_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR> ACCEPTOR;
73 typedef ACE_Strategy_Acceptor<Echo_Handler, ACE_MEM_ACCEPTOR> S_ACCEPTOR;
75 void reset_handler (int conn)
77 // Reset the number of connection the test should perform.
78 *Waiting::instance () = conn;
79 connection_count = 0;
82 int
83 Echo_Handler::open (void *)
85 return 0;
88 Echo_Handler::Echo_Handler (ACE_Thread_Manager *thr_mgr)
89 : ACE_Svc_Handler<ACE_MEM_STREAM, ACE_SYNCH> (thr_mgr),
90 connection_ (++connection_count)
92 ACE_OS::snprintf (this->name_, MAXPATHLEN, ACE_TEXT ("Connection %d --> "),
93 this->connection_);
96 int
97 Echo_Handler::handle_input (ACE_HANDLE)
99 ACE_TCHAR buf[MAXPATHLEN];
100 ssize_t len;
102 len = this->peer ().recv (buf, MAXPATHLEN * sizeof (ACE_TCHAR));
104 if (len == -1)
105 ACE_ERROR_RETURN ((LM_ERROR,
106 ACE_TEXT ("Error receiving from MEM_Stream\n")),
107 -1);
108 else if (len == 0) // Connection closed.
110 ACE_DEBUG ((LM_INFO,
111 ACE_TEXT ("Connection %d closed\n"),
112 this->connection_));
113 return -1;
116 ACE_TCHAR return_buf[MAXPATHLEN];
117 ACE_OS::strcpy (return_buf, this->name_);
118 ACE_OS::strcat (return_buf, buf);
119 len = (ACE_OS::strlen (return_buf) + 1) * sizeof (ACE_TCHAR);
121 if (this->peer ().send (return_buf, len) != len)
122 ACE_ERROR_RETURN ((LM_ERROR,
123 ACE_TEXT ("Error sending from MEM_Stream\n")),
124 -1);
126 return 0;
130 Echo_Handler::handle_close (ACE_HANDLE,
131 ACE_Reactor_Mask mask)
133 // Reduce count.
134 (*Waiting::instance ())--;
136 if (client_strategy != ACE_MEM_IO::Reactive)
137 this->reactor ()->remove_handler (this,
138 mask | ACE_Event_Handler::DONT_CALL);
140 // If no connections are open.
141 if (*Waiting::instance () == 0)
142 ACE_Reactor::instance ()->end_reactor_event_loop ();
144 ACE_DEBUG ((LM_DEBUG,
145 ACE_TEXT ("(%t) Echo_Handler %d::handle_close closing down\n"),
146 this->connection_));
148 // Shutdown
149 this->destroy ();
150 return 0;
154 Echo_Handler::svc (void)
156 while (this->handle_input (this->get_handle ()) >= 0)
157 continue;
158 return 0;
162 run_client (u_short port,
163 ACE_MEM_IO::Signal_Strategy strategy)
165 int status = 0;
166 ACE_MEM_Addr to_server (port);
167 ACE_MEM_Connector connector;
168 connector.preferred_strategy (strategy);
169 ACE_MEM_Stream stream;
171 // connector.preferred_strategy (ACE_MEM_IO::MT);
173 if (connector.connect (stream, to_server.get_remote_addr ()) == -1)
174 ACE_ERROR_RETURN ((LM_ERROR,
175 ACE_TEXT ("Failed to connect to <%C> %p\n"),
176 to_server.get_host_name (),
177 ACE_TEXT ("connector.connect()")),
178 -1);
180 ACE_TCHAR buf[MAXPATHLEN];
182 for (size_t cntr = 0; cntr < NUMBER_OF_ITERATIONS; cntr ++)
184 ACE_OS::snprintf (buf, MAXPATHLEN,
185 ACE_TEXT ("Iteration ") ACE_SIZE_T_FORMAT_SPECIFIER,
186 cntr);
188 ssize_t slen = (ACE_OS::strlen (buf) + 1) * sizeof (ACE_TCHAR);
190 if (stream.send (buf, slen) < slen)
192 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
193 ACE_TEXT ("In stream.send()")));
194 status = -1;
195 break;
198 if (stream.recv (buf, MAXPATHLEN) == -1)
200 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
201 ACE_TEXT ("stream.recv()")));
202 status = -1;
203 break;
206 ACE_DEBUG ((LM_DEBUG,
207 ACE_TEXT ("run_client(), got echo %s\n"),
208 buf));
211 status = stream.close () == -1 ? -1 : status;
212 return status;
215 #if defined (_TEST_USES_THREADS)
216 ACE_THR_FUNC_RETURN
217 connect_client (void *arg)
219 u_short *sport = reinterpret_cast <u_short *> (arg);
220 run_client (*sport, client_strategy);
221 return 0;
223 #endif
225 void
226 create_reactor (void)
228 ACE_Reactor_Impl *impl = 0;
230 #if defined (TEST_CAN_USE_WFMO_REACTOR)
231 if (opt_wfmo_reactor)
232 ACE_NEW (impl,
233 ACE_WFMO_Reactor);
234 #endif /* TEST_CAN_USE_WFMO_REACTOR */
236 if (impl == 0 && opt_select_reactor)
237 ACE_NEW (impl,
238 ACE_Select_Reactor);
240 ACE_Reactor *reactor = 0;
241 ACE_NEW (reactor,
242 ACE_Reactor (impl));
243 ACE_Reactor::instance (reactor);
247 test_reactive (const ACE_TCHAR *prog,
248 ACE_MEM_Addr &server_addr)
250 ACE_DEBUG ((LM_DEBUG, "Testing Reactive MEM_Stream\n\n"));
252 int status = 0;
254 client_strategy = ACE_MEM_IO::Reactive; // Echo_Handler uses this.
256 ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR> accept_strategy;
257 ACE_Creation_Strategy<Echo_Handler> create_strategy;
258 ACE_Reactive_Strategy<Echo_Handler> reactive_strategy (ACE_Reactor::instance ());
259 S_ACCEPTOR acceptor;
260 if (acceptor.open (server_addr,
261 ACE_Reactor::instance (),
262 &create_strategy,
263 &accept_strategy,
264 &reactive_strategy) == -1)
265 ACE_ERROR_RETURN ((LM_ERROR,
266 ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
267 acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
269 ACE_MEM_Addr local_addr;
270 if (acceptor.acceptor ().get_local_addr (local_addr) == -1)
271 ACE_ERROR_RETURN ((LM_ERROR,
272 ACE_TEXT ("MEM_Acceptor::get_local_addr\n")),
275 u_short sport = local_addr.get_port_number ();
277 #if defined (_TEST_USES_THREADS)
278 if (ACE_Thread_Manager::instance ()->spawn_n (NUMBER_OF_REACTIVE_CONNECTIONS,
279 connect_client,
280 &sport) == -1)
281 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n ()")));
282 #else
283 ACE_Process_Options opts;
284 # if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR)
285 const ACE_TCHAR *cmdline_fmt = ACE_TEXT ("%s -p%d -r");
286 # else
287 const ACE_TCHAR *cmdline_fmt = ACE_TEXT ("%ls -p%d -r");
288 # endif /* ACE_WIN32 || !ACE_USES_WCHAR */
289 opts.command_line (cmdline_fmt, prog, sport);
290 if (ACE_Process_Manager::instance ()->spawn_n (NUMBER_OF_REACTIVE_CONNECTIONS,
291 opts) == -1)
292 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n ()")));
293 #endif /* _TEST_USES_THREADS */
295 ACE_Time_Value tv (60, 0);
296 ACE_Reactor::instance ()->run_reactor_event_loop (tv);
298 if (tv == ACE_Time_Value::zero)
300 ACE_ERROR ((LM_ERROR,
301 ACE_TEXT ("Reactor::run_event_loop timeout\n")));
302 status = 1;
304 else
305 ACE_DEBUG ((LM_DEBUG, "Reactor::run_event_loop finished\n"));
307 #if defined (_TEST_USES_THREADS)
308 if (ACE_Thread_Manager::instance ()->wait () == -1)
309 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
310 #else
311 if (ACE_Process_Manager::instance ()->wait () == -1)
312 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
313 #endif /* _TEST_USES_THREADS */
315 if (acceptor.close () == -1)
317 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
318 ACE_TEXT ("MEM_Acceptor::close\n")));
319 status = 1;
322 ACE_UNUSED_ARG (prog);
323 return status;
327 test_concurrent (const ACE_TCHAR *prog,
328 ACE_MEM_Addr &server_addr)
330 ACE_DEBUG ((LM_DEBUG, "Testing Multithreaded MEM_Stream\n\n"));
332 int status = 0;
333 client_strategy = ACE_MEM_IO::MT; // Echo_Handler uses this.
335 ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR> accept_strategy;
336 ACE_Creation_Strategy<Echo_Handler> create_strategy;
337 #if defined (ACE_HAS_THREADS)
338 ACE_Thread_Strategy<Echo_Handler> act_strategy;
339 #else
340 ACE_Reactive_Strategy<Echo_Handler> act_strategy (ACE_Reactor::instance ());
341 #endif /* ACE_HAS_THREADS */
342 S_ACCEPTOR acceptor;
344 if (acceptor.open (server_addr,
345 ACE_Reactor::instance (),
346 &create_strategy,
347 &accept_strategy,
348 &act_strategy) == -1)
349 ACE_ERROR_RETURN ((LM_ERROR,
350 ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
352 // Make sure the MEM_Stream created by the underlying MEM_Acceptor
353 // is capable of passing messages of 1MB.
354 acceptor.acceptor ().init_buffer_size (1024 * 1024);
355 acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
356 acceptor.acceptor ().preferred_strategy (ACE_MEM_IO::MT);
358 ACE_MEM_Addr local_addr;
359 if (acceptor.acceptor ().get_local_addr (local_addr) == -1)
360 ACE_ERROR_RETURN ((LM_ERROR,
361 ACE_TEXT ("MEM_Acceptor::get_local_addr\n")),
364 u_short sport = local_addr.get_port_number ();
366 #if defined (_TEST_USES_THREADS)
367 ACE_UNUSED_ARG (prog);
369 if (ACE_Thread_Manager::instance ()->spawn_n (NUMBER_OF_MT_CONNECTIONS,
370 connect_client,
371 &sport) == -1)
372 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n()")));
373 #else
374 ACE_Process_Options opts;
375 # if defined (ACE_WIN32) || !defined (ACE_USES_WCHAR)
376 const ACE_TCHAR *cmdline_fmt = ACE_TEXT ("%s -p%d -m");
377 # else
378 const ACE_TCHAR *cmdline_fmt = ACE_TEXT ("%ls -p%d -m");
379 # endif /* ACE_WIN32 || !ACE_USES_WCHAR */
380 opts.command_line (cmdline_fmt, prog, sport);
381 if (ACE_Process_Manager::instance ()->spawn_n (NUMBER_OF_MT_CONNECTIONS,
382 opts) == -1)
383 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n()")));
384 #endif /* _TEST_USES_THREADS */
386 ACE_Time_Value tv (60, 0);
387 ACE_Reactor::instance ()->run_reactor_event_loop (tv);
389 if (tv == ACE_Time_Value::zero)
391 ACE_ERROR ((LM_ERROR,
392 ACE_TEXT ("Reactor::run_event_loop timeout\n")));
393 status = 1;
395 else
396 ACE_DEBUG ((LM_DEBUG, "Reactor::run_event_loop finished\n"));
398 #if defined (_TEST_USES_THREADS)
399 if (ACE_Thread_Manager::instance ()->wait () == -1)
400 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
401 #else
402 if (ACE_Process_Manager::instance ()->wait () == -1)
403 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
404 #endif /* _TEST_USES_THREADS */
406 if (acceptor.close () == -1)
408 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
409 ACE_TEXT ("MEM_Acceptor::close")));
410 status = 1;
413 return status;
417 run_main (int argc, ACE_TCHAR *argv[])
419 u_short port = 0;
421 if (argc == 1)
423 // This is the "master" process.
425 ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
426 #ifndef ACE_LACKS_ACCEPT
427 create_reactor ();
428 ACE_MEM_Addr server_addr (port);
430 reset_handler (NUMBER_OF_REACTIVE_CONNECTIONS);
432 test_reactive (argc > 0 ? argv[0] : ACE_TEXT ("MEM_Stream_Test"), server_addr);
434 ACE_Reactor::instance ()->reset_reactor_event_loop ();
436 #if !defined (ACE_WIN32) && defined (_ACE_USE_SV_SEM)
437 ACE_ERROR ((LM_DEBUG,
438 ACE_TEXT ("\n *** Platform only supports non-scalable SysV semaphores ***\n\n")));
439 #endif /* !ACE_WIN32 && _ACE_USE_SV_SEM */
440 reset_handler (NUMBER_OF_MT_CONNECTIONS);
442 test_concurrent (argc > 0 ? argv[0] : ACE_TEXT ("MEM_Stream_Test"), server_addr);
444 #endif // ACE_LACKS_ACCEPT
445 ACE_END_TEST;
446 return 0;
448 else
450 // We end up here if this is a child process spawned for one of
451 // the test passes. command line is: -p <port> -r (reactive) |
452 // -m (multithreaded)
454 ACE_TCHAR lognm[MAXPATHLEN];
455 int mypid (ACE_OS::getpid ());
456 ACE_OS::snprintf (lognm, MAXPATHLEN,
457 ACE_TEXT ("MEM_Stream_Test-%d"), mypid);
458 ACE_START_TEST (lognm);
460 ACE_Get_Opt opts (argc, argv, ACE_TEXT ("p:rm"));
461 int opt, iport, status;
462 ACE_MEM_IO::Signal_Strategy model = ACE_MEM_IO::Reactive;
464 while ((opt = opts()) != -1)
466 switch (opt)
468 case 'p':
469 iport = ACE_OS::atoi (opts.opt_arg ());
470 port = static_cast <u_short> (iport);
471 break;
473 case 'r':
474 model = ACE_MEM_IO::Reactive;
475 break;
477 case 'm':
478 model = ACE_MEM_IO::MT;
479 break;
481 default:
482 ACE_ERROR_RETURN ((LM_ERROR,
483 ACE_TEXT ("Invalid option (-p <port> -r | -m)\n")),
487 status = run_client (port, model);
488 ACE_END_TEST;
489 return status;
493 #define ACE_Atomic_Op_type \
494 ACE_Atomic_Op< ACE_SYNCH_MUTEX, u_short>
495 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton, ACE_Atomic_Op_type, ACE_SYNCH_RECURSIVE_MUTEX);
498 #else
500 run_main (int, ACE_TCHAR *[])
502 ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
504 ACE_ERROR ((LM_INFO,
505 ACE_TEXT ("position independent pointers ")
506 ACE_TEXT ("not supported on this platform\n")));
508 ACE_END_TEST;
509 return 0;
511 #endif /* (ACE_HAS_THREADS || ACE_HAS_PROCESS_SPAWN) && ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */