Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / tests / MEM_Stream_Test.cpp
blob56e7d629cd64434d5a991da51211dedf4ad9dba1
2 //=============================================================================
3 /**
4 * @file MEM_Stream_Test.cpp
6 * This is a test of the <ACE_MEM_Acceptor> and
7 * <ACE_MEM_Connector> classes.
9 * @author Nanbor Wang <nanbor@cs.wustl.edu>
11 //=============================================================================
13 #include "test_config.h"
14 #include "ace/OS_NS_stdio.h"
15 #include "ace/OS_NS_string.h"
16 #include "ace/OS_NS_unistd.h"
17 #include "ace/Get_Opt.h"
18 #include "ace/Thread_Manager.h"
19 #include "ace/MEM_Connector.h"
20 #include "ace/MEM_Acceptor.h"
21 #include "ace/Select_Reactor.h"
22 #include "ace/Connector.h"
23 #include "ace/Acceptor.h"
24 #include "ace/Svc_Handler.h"
25 #include "ace/Singleton.h"
26 #include "ace/Atomic_Op.h"
28 #if (defined (ACE_HAS_THREADS) || defined (ACE_HAS_PROCESS_SPAWN)) && \
29 (ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1)
31 #if !defined (ACE_HAS_PROCESS_SPAWN) && defined (ACE_HAS_THREADS)
32 # define _TEST_USES_THREADS
33 #else
34 # define _TEST_USES_PROCESSES
35 # include "ace/Process.h"
36 # include "ace/Process_Manager.h"
37 #endif
39 #include "MEM_Stream_Test.h" // Defines Echo_Handler
41 #define NUMBER_OF_REACTIVE_CONNECTIONS 3
42 #if defined (ACE_WIN32) || !defined (_ACE_USE_SV_SEM)
43 # define NUMBER_OF_MT_CONNECTIONS 3
44 #else
45 // We will use SysV Semaphore in this case which is not very scalable
46 // and can only handle one connection.
47 # define NUMBER_OF_MT_CONNECTIONS 1
48 #endif /* ACE_WIN32 || !_ACE_USE_SV_SEM */
50 #define NUMBER_OF_ITERATIONS 100
52 // If we don't have winsock2 we can't use WFMO_Reactor.
53 #if defined (ACE_WIN32) \
54 && defined (ACE_HAS_WINSOCK2) \
55 && ACE_HAS_WINSOCK2 != 0
56 # define TEST_CAN_USE_WFMO_REACTOR
57 #endif
59 #if defined (TEST_CAN_USE_WFMO_REACTOR)
60 static const int opt_wfmo_reactor = 1;
61 #endif /* TEST_CAN_USE_WFMO_REACTOR */
63 static int opt_select_reactor = 1;
64 static ACE_MEM_IO::Signal_Strategy client_strategy = ACE_MEM_IO::Reactive;
66 using WaitingCounter = ACE_Atomic_Op<ACE_MT_SYNCH::MUTEX, u_short>;
67 using Waiting = ACE_Singleton<WaitingCounter, ACE_MT_SYNCH::RECURSIVE_MUTEX>;
69 // Number of connections that are currently open
70 static u_short connection_count = 0;
72 using ACCEPTOR = ACE_Acceptor<Echo_Handler, ACE_MEM_Acceptor>;
73 using S_ACCEPTOR = ACE_Strategy_Acceptor<Echo_Handler, ACE_MEM_Acceptor>;
75 void reset_handler (int conn)
77 // Reset the number of connection the test should perform.
78 *Waiting::instance () = conn;
79 connection_count = 0;
82 int
83 Echo_Handler::open (void *)
85 return 0;
88 Echo_Handler::Echo_Handler (ACE_Thread_Manager *thr_mgr)
89 : ACE_Svc_Handler<ACE_MEM_STREAM, ACE_SYNCH> (thr_mgr),
90 connection_ (++connection_count)
92 ACE_OS::snprintf (this->name_, MAXPATHLEN, ACE_TEXT ("Connection %d --> "),
93 this->connection_);
96 int
97 Echo_Handler::handle_input (ACE_HANDLE)
99 ACE_TCHAR buf[MAXPATHLEN];
100 ssize_t len;
102 len = this->peer ().recv (buf, MAXPATHLEN * sizeof (ACE_TCHAR));
104 if (len == -1)
105 ACE_ERROR_RETURN ((LM_ERROR,
106 ACE_TEXT ("Error receiving from MEM_Stream\n")),
107 -1);
108 else if (len == 0) // Connection closed.
110 ACE_DEBUG ((LM_INFO,
111 ACE_TEXT ("Connection %d closed\n"),
112 this->connection_));
113 return -1;
116 ACE_TCHAR return_buf[MAXPATHLEN];
117 ACE_OS::strcpy (return_buf, this->name_);
118 ACE_OS::strcat (return_buf, buf);
119 len = (ACE_OS::strlen (return_buf) + 1) * sizeof (ACE_TCHAR);
121 if (this->peer ().send (return_buf, len) != len)
122 ACE_ERROR_RETURN ((LM_ERROR,
123 ACE_TEXT ("Error sending from MEM_Stream\n")),
124 -1);
126 return 0;
130 Echo_Handler::handle_close (ACE_HANDLE,
131 ACE_Reactor_Mask mask)
133 // Reduce count.
134 (*Waiting::instance ())--;
136 if (client_strategy != ACE_MEM_IO::Reactive)
137 this->reactor ()->remove_handler (this,
138 mask | ACE_Event_Handler::DONT_CALL);
140 // If no connections are open.
141 if (*Waiting::instance () == 0)
142 ACE_Reactor::instance ()->end_reactor_event_loop ();
144 ACE_DEBUG ((LM_DEBUG,
145 ACE_TEXT ("(%t) Echo_Handler %d::handle_close closing down\n"),
146 this->connection_));
148 // Shutdown
149 this->destroy ();
150 return 0;
154 Echo_Handler::svc ()
156 while (this->handle_input (this->get_handle ()) >= 0)
157 continue;
158 return 0;
162 run_client (u_short port,
163 ACE_MEM_IO::Signal_Strategy strategy)
165 int status = 0;
166 ACE_MEM_Addr to_server (port);
167 ACE_MEM_Connector connector;
168 connector.preferred_strategy (strategy);
169 ACE_MEM_Stream stream;
171 // connector.preferred_strategy (ACE_MEM_IO::MT);
173 if (connector.connect (stream, to_server.get_remote_addr ()) == -1)
174 ACE_ERROR_RETURN ((LM_ERROR,
175 ACE_TEXT ("Failed to connect to <%C> %p\n"),
176 to_server.get_host_name (),
177 ACE_TEXT ("connector.connect()")),
178 -1);
180 ACE_TCHAR buf[MAXPATHLEN];
182 for (size_t cntr = 0; cntr < NUMBER_OF_ITERATIONS; cntr ++)
184 ACE_OS::snprintf (buf, MAXPATHLEN,
185 ACE_TEXT ("Iteration ") ACE_SIZE_T_FORMAT_SPECIFIER,
186 cntr);
188 ssize_t slen = (ACE_OS::strlen (buf) + 1) * sizeof (ACE_TCHAR);
190 if (stream.send (buf, slen) < slen)
192 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
193 ACE_TEXT ("In stream.send()")));
194 status = -1;
195 break;
198 if (stream.recv (buf, MAXPATHLEN) == -1)
200 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
201 ACE_TEXT ("stream.recv()")));
202 status = -1;
203 break;
206 ACE_DEBUG ((LM_DEBUG,
207 ACE_TEXT ("run_client(), got echo %s\n"),
208 buf));
211 status = stream.close () == -1 ? -1 : status;
212 return status;
215 #if defined (_TEST_USES_THREADS)
216 ACE_THR_FUNC_RETURN
217 connect_client (void *arg)
219 u_short *sport = reinterpret_cast <u_short *> (arg);
220 run_client (*sport, client_strategy);
221 return 0;
223 #endif
225 void
226 create_reactor ()
228 ACE_Reactor_Impl *impl = 0;
230 #if defined (TEST_CAN_USE_WFMO_REACTOR)
231 if (opt_wfmo_reactor)
232 ACE_NEW (impl,
233 ACE_WFMO_Reactor);
234 #endif /* TEST_CAN_USE_WFMO_REACTOR */
236 if (impl == 0 && opt_select_reactor)
237 ACE_NEW (impl,
238 ACE_Select_Reactor);
240 ACE_Reactor *reactor = 0;
241 ACE_NEW (reactor,
242 ACE_Reactor (impl));
243 ACE_Reactor::instance (reactor);
247 test_reactive (const ACE_TCHAR *prog,
248 ACE_MEM_Addr &server_addr)
250 ACE_DEBUG ((LM_DEBUG, "Testing Reactive MEM_Stream\n\n"));
252 int status = 0;
254 client_strategy = ACE_MEM_IO::Reactive; // Echo_Handler uses this.
256 ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR> accept_strategy;
257 ACE_Creation_Strategy<Echo_Handler> create_strategy;
258 ACE_Reactive_Strategy<Echo_Handler> reactive_strategy (ACE_Reactor::instance ());
259 S_ACCEPTOR acceptor;
260 if (acceptor.open (server_addr,
261 ACE_Reactor::instance (),
262 &create_strategy,
263 &accept_strategy,
264 &reactive_strategy) == -1)
265 ACE_ERROR_RETURN ((LM_ERROR,
266 ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
267 acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
269 ACE_MEM_Addr local_addr;
270 if (acceptor.acceptor ().get_local_addr (local_addr) == -1)
271 ACE_ERROR_RETURN ((LM_ERROR,
272 ACE_TEXT ("MEM_Acceptor::get_local_addr\n")),
275 u_short sport = local_addr.get_port_number ();
277 #if defined (_TEST_USES_THREADS)
278 if (ACE_Thread_Manager::instance ()->spawn_n (NUMBER_OF_REACTIVE_CONNECTIONS,
279 connect_client,
280 &sport) == -1)
281 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n ()")));
282 #else
283 ACE_Process_Options opts;
284 opts.command_line (ACE_TEXT ("%") ACE_TEXT_PRIs ACE_TEXT (" -p%d -r"), prog, sport);
285 if (ACE_Process_Manager::instance ()->spawn_n (NUMBER_OF_REACTIVE_CONNECTIONS,
286 opts) == -1)
287 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n ()")));
288 #endif /* _TEST_USES_THREADS */
290 ACE_Time_Value tv (60, 0);
291 ACE_Reactor::instance ()->run_reactor_event_loop (tv);
293 if (tv == ACE_Time_Value::zero)
295 ACE_ERROR ((LM_ERROR,
296 ACE_TEXT ("Reactor::run_event_loop timeout\n")));
297 status = 1;
299 else
300 ACE_DEBUG ((LM_DEBUG, "Reactor::run_event_loop finished\n"));
302 #if defined (_TEST_USES_THREADS)
303 if (ACE_Thread_Manager::instance ()->wait () == -1)
304 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
305 #else
306 if (ACE_Process_Manager::instance ()->wait () == -1)
307 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
308 #endif /* _TEST_USES_THREADS */
310 if (acceptor.close () == -1)
312 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
313 ACE_TEXT ("MEM_Acceptor::close\n")));
314 status = 1;
317 ACE_UNUSED_ARG (prog);
318 return status;
322 test_concurrent (const ACE_TCHAR *prog,
323 ACE_MEM_Addr &server_addr)
325 ACE_DEBUG ((LM_DEBUG, "Testing Multithreaded MEM_Stream\n\n"));
327 int status = 0;
328 client_strategy = ACE_MEM_IO::MT; // Echo_Handler uses this.
330 ACE_Accept_Strategy<Echo_Handler, ACE_MEM_ACCEPTOR> accept_strategy;
331 ACE_Creation_Strategy<Echo_Handler> create_strategy;
332 #if defined (ACE_HAS_THREADS)
333 ACE_Thread_Strategy<Echo_Handler> act_strategy;
334 #else
335 ACE_Reactive_Strategy<Echo_Handler> act_strategy (ACE_Reactor::instance ());
336 #endif /* ACE_HAS_THREADS */
337 S_ACCEPTOR acceptor;
339 if (acceptor.open (server_addr,
340 ACE_Reactor::instance (),
341 &create_strategy,
342 &accept_strategy,
343 &act_strategy) == -1)
344 ACE_ERROR_RETURN ((LM_ERROR,
345 ACE_TEXT ("MEM_Acceptor::accept\n")), 1);
347 // Make sure the MEM_Stream created by the underlying MEM_Acceptor
348 // is capable of passing messages of 1MB.
349 acceptor.acceptor ().init_buffer_size (1024 * 1024);
350 acceptor.acceptor ().mmap_prefix (ACE_TEXT ("MEM_Acceptor_"));
351 acceptor.acceptor ().preferred_strategy (ACE_MEM_IO::MT);
353 ACE_MEM_Addr local_addr;
354 if (acceptor.acceptor ().get_local_addr (local_addr) == -1)
355 ACE_ERROR_RETURN ((LM_ERROR,
356 ACE_TEXT ("MEM_Acceptor::get_local_addr\n")),
359 u_short sport = local_addr.get_port_number ();
361 #if defined (_TEST_USES_THREADS)
362 ACE_UNUSED_ARG (prog);
364 if (ACE_Thread_Manager::instance ()->spawn_n (NUMBER_OF_MT_CONNECTIONS,
365 connect_client,
366 &sport) == -1)
367 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n()")));
368 #else
369 ACE_Process_Options opts;
370 opts.command_line (ACE_TEXT ("%") ACE_TEXT_PRIs ACE_TEXT (" -p%d -m"), prog, sport);
371 if (ACE_Process_Manager::instance ()->spawn_n (NUMBER_OF_MT_CONNECTIONS,
372 opts) == -1)
373 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn_n()")));
374 #endif /* _TEST_USES_THREADS */
376 ACE_Time_Value tv (60, 0);
377 ACE_Reactor::instance ()->run_reactor_event_loop (tv);
379 if (tv == ACE_Time_Value::zero)
381 ACE_ERROR ((LM_ERROR,
382 ACE_TEXT ("Reactor::run_event_loop timeout\n")));
383 status = 1;
385 else
386 ACE_DEBUG ((LM_DEBUG, "Reactor::run_event_loop finished\n"));
388 #if defined (_TEST_USES_THREADS)
389 if (ACE_Thread_Manager::instance ()->wait () == -1)
390 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
391 #else
392 if (ACE_Process_Manager::instance ()->wait () == -1)
393 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("wait ()")));
394 #endif /* _TEST_USES_THREADS */
396 if (acceptor.close () == -1)
398 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
399 ACE_TEXT ("MEM_Acceptor::close")));
400 status = 1;
403 return status;
407 run_main (int argc, ACE_TCHAR *argv[])
409 u_short port = 0;
411 if (argc == 1)
413 // This is the "master" process.
415 ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
416 #ifndef ACE_LACKS_ACCEPT
417 create_reactor ();
418 ACE_MEM_Addr server_addr (port);
420 reset_handler (NUMBER_OF_REACTIVE_CONNECTIONS);
422 test_reactive (argc > 0 ? argv[0] : ACE_TEXT ("MEM_Stream_Test"), server_addr);
424 ACE_Reactor::instance ()->reset_reactor_event_loop ();
426 #if !defined (ACE_WIN32) && defined (_ACE_USE_SV_SEM)
427 ACE_ERROR ((LM_DEBUG,
428 ACE_TEXT ("\n *** Platform only supports non-scalable SysV semaphores ***\n\n")));
429 #endif /* !ACE_WIN32 && _ACE_USE_SV_SEM */
430 reset_handler (NUMBER_OF_MT_CONNECTIONS);
432 test_concurrent (argc > 0 ? argv[0] : ACE_TEXT ("MEM_Stream_Test"), server_addr);
434 #endif // ACE_LACKS_ACCEPT
435 ACE_END_TEST;
436 return 0;
438 else
440 // We end up here if this is a child process spawned for one of
441 // the test passes. command line is: -p <port> -r (reactive) |
442 // -m (multithreaded)
444 ACE_TCHAR lognm[MAXPATHLEN];
445 int mypid (ACE_OS::getpid ());
446 ACE_OS::snprintf (lognm, MAXPATHLEN,
447 ACE_TEXT ("MEM_Stream_Test-%d"), mypid);
448 ACE_START_TEST (lognm);
450 ACE_Get_Opt opts (argc, argv, ACE_TEXT ("p:rm"));
451 int opt, iport, status;
452 ACE_MEM_IO::Signal_Strategy model = ACE_MEM_IO::Reactive;
454 while ((opt = opts()) != -1)
456 switch (opt)
458 case 'p':
459 iport = ACE_OS::atoi (opts.opt_arg ());
460 port = static_cast <u_short> (iport);
461 break;
463 case 'r':
464 model = ACE_MEM_IO::Reactive;
465 break;
467 case 'm':
468 model = ACE_MEM_IO::MT;
469 break;
471 default:
472 ACE_ERROR_RETURN ((LM_ERROR,
473 ACE_TEXT ("Invalid option (-p <port> -r | -m)\n")),
477 status = run_client (port, model);
478 ACE_END_TEST;
479 return status;
483 #define ACE_Atomic_Op_type \
484 ACE_Atomic_Op< ACE_SYNCH_MUTEX, u_short>
485 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton, ACE_Atomic_Op_type, ACE_SYNCH_RECURSIVE_MUTEX);
488 #else
490 run_main (int, ACE_TCHAR *[])
492 ACE_START_TEST (ACE_TEXT ("MEM_Stream_Test"));
494 ACE_ERROR ((LM_INFO,
495 ACE_TEXT ("position independent pointers ")
496 ACE_TEXT ("not supported on this platform\n")));
498 ACE_END_TEST;
499 return 0;
501 #endif /* (ACE_HAS_THREADS || ACE_HAS_PROCESS_SPAWN) && ACE_HAS_POSITION_INDEPENDENT_POINTERS == 1 */