2 //=============================================================================
6 * This is a test of the <ACE_Acceptor> and <ACE_Connector>
7 * classes. The test forks processes or spawns threads (depending
8 * upon the platform) and then executes client and server allowing
9 * them to connect and exchange data. The test also illustrates
10 * how the <ACE_Strategy_Connector> works by showing how you can
11 * cache connections on the client.
13 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
14 * @author Chris Cleeland <cleeland@cs.wustl.edu>
15 * @author and Irfan Pyarali <irfan@cs.wustl.edu>
17 //=============================================================================
20 #include "test_config.h"
21 #include "ace/SOCK_Connector.h"
22 #include "ace/LOCK_SOCK_Acceptor.h"
23 #include "ace/Acceptor.h"
24 #include "ace/Handle_Set.h"
25 #include "ace/Connector.h"
26 #include "ace/Auto_Ptr.h"
27 #include "ace/Get_Opt.h"
28 #include "ace/Process_Mutex.h"
29 #include "ace/Signal.h"
30 #include "Conn_Test.h"
31 #include "ace/Barrier.h"
32 #include "ace/OS_NS_signal.h"
33 #include "ace/OS_NS_sys_select.h"
34 #include "ace/OS_NS_sys_wait.h"
35 #include "ace/OS_NS_unistd.h"
36 #include "ace/os_include/os_netdb.h"
40 static const char ACE_ALPHABET
[] = "abcdefghijklmnopqrstuvwxyz";
42 // This test doesn't work well using fork() on MacOS X. So we
43 // will force it to use threads instead.
44 #if defined (__APPLE__)
45 # define ACE_LACKS_FORK
46 #endif /* __APPLE__ */
48 // The following works around bugs with some operating systems, which
49 // don't allow multiple threads/process to call accept() on the same
50 // listen-mode port/socket. Also, note that since timed accept is
51 // implemented using select(), and we use timed accepts with threads,
52 // we need a real lock when using timed accepts even if the OS has
53 // thread-safe accept.
55 #if defined (ACE_LACKS_FORK)
56 # if defined (ACE_HAS_THREADS)
57 # include "ace/Thread_Mutex.h"
58 typedef ACE_Thread_Mutex ACCEPTOR_LOCKING
;
60 # include "ace/Null_Mutex.h"
61 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
62 # endif /* ACE_HAS_THREADS */
64 # if defined (ACE_HAS_THREAD_SAFE_ACCEPT)
65 # include "ace/Null_Mutex.h"
66 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
68 # include "ace/Process_Mutex.h"
69 typedef ACE_Process_Mutex ACCEPTOR_LOCKING
;
70 # define CLEANUP_PROCESS_MUTEX
71 # endif /* ACE_HAS_THREAD_SAFE_ACCEPT */
72 #endif /* ACE_LACKS_FORK */
74 typedef ACE_Oneshot_Acceptor
<Svc_Handler
,
75 ACE_LOCK_SOCK_Acceptor
<ACCEPTOR_LOCKING
> >
77 typedef ACE_Connector
<Svc_Handler
,
80 typedef ACE_Strategy_Connector
<Svc_Handler
,
83 typedef ACE_NOOP_Creation_Strategy
<Svc_Handler
>
84 NULL_CREATION_STRATEGY
;
85 typedef ACE_NOOP_Concurrency_Strategy
<Svc_Handler
>
86 NULL_ACTIVATION_STRATEGY
;
87 typedef ACE_Cached_Connect_Strategy
<Svc_Handler
,
90 CACHED_CONNECT_STRATEGY
;
92 #define CACHED_CONNECT_STRATEGY ACE_Cached_Connect_Strategy<Svc_Handler, ACE_SOCK_CONNECTOR, ACE_SYNCH_MUTEX>
93 #define REFCOUNTED_HASH_RECYCLABLE_ADDR ACE_Refcounted_Hash_Recyclable<ACE_INET_Addr>
95 // Default number of clients/servers.
96 #if defined (ACE_HAS_PHARLAP)
97 // PharLap is, by default, resource contrained. Test for something that works
98 // on the default configuration.
99 static int n_servers
= 2;
100 static int n_clients
= 4;
102 static int n_servers
= 5;
103 static int n_clients
= 5;
104 #endif /* ACE_HAS_PHARLAP */
106 static int n_client_iterations
= 3;
108 Svc_Handler::Svc_Handler (ACE_Thread_Manager
*)
113 Svc_Handler::open (void *)
115 ACE_DEBUG ((LM_DEBUG
,
116 ACE_TEXT ("(%P|%t) opening Svc_Handler %@ with handle %d\n"),
118 this->peer ().get_handle ()));
119 // Enable non-blocking I/O.
120 if (this->peer ().enable (ACE_NONBLOCK
) == -1)
121 ACE_ERROR_RETURN ((LM_ERROR
,
122 ACE_TEXT ("(%P|%t) %p\n"),
123 ACE_TEXT ("enable")),
129 Svc_Handler::recycle (void *)
131 ACE_DEBUG ((LM_DEBUG
,
132 ACE_TEXT ("(%P|%t) recycling Svc_Handler %@ with handle %d\n"),
134 this->peer ().get_handle ()));
139 Svc_Handler::send_data (void)
141 // Send data to server.
143 for (const char *c
= ACE_ALPHABET
; *c
!= '\0'; c
++)
144 if (this->peer ().send_n (c
, 1) == -1)
145 ACE_ERROR ((LM_ERROR
,
146 ACE_TEXT ("(%P|%t) %p\n"),
147 ACE_TEXT ("send_n")));
151 Svc_Handler::recv_data (void)
153 ACE_SOCK_Stream
&new_stream
= this->peer ();
155 ACE_Handle_Set handle_set
;
156 handle_set
.set_bit (new_stream
.get_handle ());
158 const char *t
= ACE_ALPHABET
;
160 // Read data from client (terminate on error).
162 for (ssize_t r_bytes
; ;)
164 // Since we're in non-blocking mode we need to use <select> to
165 // avoid busy waiting.
166 #if defined (ACE_WIN64)
167 int select_width
= 0;
169 int select_width
= int (new_stream
.get_handle ()) + 1;
170 #endif /* ACE_WIN64 */
171 if (ACE_OS::select (select_width
, handle_set
) == -1)
172 ACE_ERROR ((LM_ERROR
,
173 ACE_TEXT ("(%P|%t) %p\n"),
174 ACE_TEXT ("select")));
179 while ((r_bytes
= new_stream
.recv (&c
, 1)) > 0)
181 ACE_TEST_ASSERT (*t
== c
);
183 // We need to guard against cached connections, which
184 // will send multiple sequences of letters from 'a' ->
185 // 'z' through the same connection.
194 ACE_DEBUG ((LM_DEBUG
,
195 ACE_TEXT ("(%P|%t) reached end of input, connection closed by client\n")));
197 // Close endpoint handle (but don't close <this> yet
198 // since we're going to recycle it for the next
200 if (new_stream
.close () == -1)
201 ACE_ERROR ((LM_ERROR
,
202 ACE_TEXT ("(%P|%t) %p\n"),
203 ACE_TEXT ("close")));
206 else if (r_bytes
== -1)
208 if (errno
== EWOULDBLOCK
)
209 ACE_DEBUG ((LM_DEBUG
,
210 ACE_TEXT ("(%P|%t) no input available, going back to reading\n")));
212 ACE_ERROR ((LM_ERROR
,
213 ACE_TEXT ("(%P|%t) %p\n"),
214 ACE_TEXT ("recv_n")));
221 Svc_Handler::close (u_long side
)
223 // Only run this protocol if we're the write-side (i.e., "1").
224 if (side
== 1 && this->peer ().close () == -1)
225 ACE_ERROR ((LM_ERROR
,
226 ACE_TEXT ("(%P|%t) %p\n"),
227 ACE_TEXT ("close_writer")));
228 // Trigger the shutdown.
229 return this->handle_close ();
233 Svc_Handler::idle (u_long flags
)
235 ACE_DEBUG ((LM_DEBUG
,
236 ACE_TEXT ("(%P|%t) idling Svc_Handler %@ with handle %d\n"),
238 this->peer ().get_handle ()));
239 return ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
>::idle (flags
);
244 // Information passed to the client so it can communicate with the
247 ACE_INET_Addr
*server_addr_
;
248 // Address of the server to connect with.
250 CONNECTOR
*connector_
;
251 // Connection factory.
253 STRAT_CONNECTOR
*strat_connector_
;
254 // Strategy for connecting.
256 #if defined (ACE_HAS_THREADS)
257 ACE_Barrier
*barrier_
;
258 // Performs barrier synchronization.
259 #endif /* ACE_HAS_THREADS */
262 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
265 timed_blocking_connect (CONNECTOR
&con
,
266 const ACE_INET_Addr
&server_addr
)
268 ACE_Time_Value
tv (ACE_DEFAULT_TIMEOUT
);
269 ACE_Synch_Options
options (ACE_Synch_Options::USE_TIMEOUT
, tv
);
271 Svc_Handler
*svc_handler
;
272 ACE_NEW (svc_handler
,
275 // Perform a timed-blocking connect to the server (this should
276 // connect quickly since we're in the same address space or same
278 if (con
.connect (svc_handler
,
281 ACE_ERROR ((LM_ERROR
,
282 ACE_TEXT ("(%P|%t) %p\n"),
283 ACE_TEXT ("connection failed")));
286 // Send the data to the server.
287 svc_handler
->send_data ();
289 // Close the connection completely.
290 if (svc_handler
->close (1) == -1)
291 ACE_ERROR ((LM_ERROR
,
292 ACE_TEXT ("(%P|%t) %p\n"),
293 ACE_TEXT ("close")));
298 blocking_connect (CONNECTOR
&con
,
299 const ACE_INET_Addr
&server_addr
)
301 Svc_Handler
*svc_handler
;
302 ACE_NEW (svc_handler
,
305 // Perform a blocking connect to the server.
306 if (con
.connect (svc_handler
,
308 ACE_ERROR ((LM_ERROR
,
309 ACE_TEXT ("(%P|%t) %p\n"),
310 ACE_TEXT ("connection failed")));
313 // Send the data to the server.
314 svc_handler
->send_data ();
316 // Close the connection completely.
317 if (svc_handler
->close (1) == -1)
318 ACE_ERROR ((LM_ERROR
,
319 ACE_TEXT ("(%P|%t) %p\n"),
320 ACE_TEXT ("close")));
324 // This function runs the more sophisticated tests involving the
325 // Caching_Connect_Strategy.
328 cached_connect (STRAT_CONNECTOR
&con
,
329 const ACE_INET_Addr
&server_addr
)
331 Svc_Handler
*svc_handler
= 0;
333 for (int i
= 0; i
< n_client_iterations
; i
++)
335 // Perform a blocking connect to the server using the Strategy
336 // Connector with a connection caching strategy. Since we are
337 // connecting to the same <server_addr> these calls will return
338 // the same dynamically allocated <Svc_Handler> for each
340 if (con
.connect (svc_handler
,
343 ACE_ERROR ((LM_ERROR
,
344 ACE_TEXT ("(%P|%t) %p\n"),
345 ACE_TEXT ("connection failed")));
349 // Send the data to the server.
350 svc_handler
->send_data ();
352 // Svc_Handler is now idle, so mark it as such and let the cache
353 // recycle it in another thread.
354 svc_handler
->idle (1);
356 // Rest for a second to give another thread a chance to reuse the
363 client_connections (void *arg
)
365 Client_Info
*info
= (Client_Info
*) arg
;
367 // Run the timed-blocking test.
368 ACE_DEBUG ((LM_DEBUG
,
369 ACE_TEXT ("(%P|%t) **** starting timed-blocking connect\n")));
370 timed_blocking_connect (*info
->connector_
,
371 *info
->server_addr_
);
373 #if defined (ACE_HAS_THREADS)
374 // Wait for other threads to join us.
375 info
->barrier_
->wait ();
376 #endif /* ACE_HAS_THREADS */
378 // Run the blocking test.
379 ACE_DEBUG ((LM_DEBUG
,
380 ACE_TEXT ("(%P|%t) **** starting blocking connect\n")));
381 blocking_connect (*info
->connector_
,
382 *info
->server_addr_
);
384 #if defined (ACE_HAS_THREADS)
385 // Wait for other threads to join us.
386 info
->barrier_
->wait ();
387 #endif /* ACE_HAS_THREADS */
389 // Run the cached blocking test.
390 ACE_DEBUG ((LM_DEBUG
,
391 ACE_TEXT ("(%P|%t) **** starting cached blocking connect\n")));
392 cached_connect (*info
->strat_connector_
,
393 *info
->server_addr_
);
397 // Execute the client tests.
402 ACE_INET_Addr
*remote_addr
= reinterpret_cast<ACE_INET_Addr
*> (arg
);
403 ACE_INET_Addr
server_addr (remote_addr
->get_port_number (),
404 ACE_DEFAULT_SERVER_HOST
);
407 NULL_CREATION_STRATEGY creation_strategy
;
408 NULL_ACTIVATION_STRATEGY activation_strategy
;
409 // Configure the Strategy Connector with a strategy that caches
411 CACHED_CONNECT_STRATEGY caching_connect_strategy
;
413 STRAT_CONNECTOR
strat_connector (0,
415 &caching_connect_strategy
,
416 &activation_strategy
);
418 info
.server_addr_
= &server_addr
;
419 info
.connector_
= &connector
;
420 info
.strat_connector_
= &strat_connector
;
422 #if defined (ACE_HAS_THREADS)
423 int n_threads
= n_clients
;
424 ACE_Barrier
thread_barrier (n_threads
);
425 info
.barrier_
= &thread_barrier
;
427 ACE_Thread_Manager client_manager
;
429 if (client_manager
.spawn_n
431 (ACE_THR_FUNC
) client_connections
,
434 ACE_ERROR ((LM_ERROR
,
435 ACE_TEXT ("(%P|%t) %p\n%a"),
436 ACE_TEXT ("client thread spawn failed"),
439 // Wait for the threads to exit.
440 client_manager
.wait ();
442 #else /* ACE_HAS_THREADS */
443 client_connections (&info
);
444 #endif /* ACE_HAS_THREADS */
448 // Performs the iterative server activities.
453 #if defined (VXWORKS)
454 ACE_DEBUG ((LM_DEBUG
,
455 ACE_TEXT ("(%P|%t) server stack size is %u\n"),
456 ACE_OS::thr_min_stack ()));
459 ACCEPTOR
*acceptor
= (ACCEPTOR
*) arg
;
460 ACE_INET_Addr cli_addr
;
461 ACE_TCHAR peer_host
[MAXHOSTNAMELEN
];
462 const ACE_Time_Value
tv (ACE_DEFAULT_TIMEOUT
);
463 ACE_Synch_Options
options (ACE_Synch_Options::USE_TIMEOUT
, tv
);
465 Svc_Handler
*svc_handler
;
466 ACE_NEW_RETURN (svc_handler
,
470 // Keep looping until we timeout on <accept> or fail.
474 // Create a new <Svc_Handler> to consume the data.
476 #if defined (ACE_LACKS_FORK)
477 int result
= acceptor
->accept (svc_handler
,
480 #else /* ! ACE_LACKS_FORK */
481 int result
= acceptor
->accept (svc_handler
,
483 ACE_UNUSED_ARG (options
);
484 #endif /* ! ACE_LACKS_FORK */
486 // Timing out is the only way for threads to stop accepting
487 // since we don't have signals.
491 // svc_handler->close (); The ACE_Onsehot_Acceptor closed it.
493 if (errno
== ETIMEDOUT
)
495 ACE_DEBUG ((LM_DEBUG
,
496 ACE_TEXT ("accept timed out\n")));
500 ACE_ERROR_RETURN ((LM_ERROR
,
501 ACE_TEXT ("(%P|%t) %p\n"),
502 ACE_TEXT ("accept failed, shutting down")),
505 // Use this rather than get_host_name() to properly adjust to the
506 // charset width in use.
507 cli_addr
.get_host_name (peer_host
, MAXHOSTNAMELEN
);
508 ACE_DEBUG ((LM_DEBUG
,
509 ACE_TEXT ("(%P|%t) client %s connected from %d\n"),
511 cli_addr
.get_port_number ()));
513 svc_handler
->recv_data ();
516 ACE_NOTREACHED (return 0);
519 #endif /* !ACE_LACKS_FORK || ACE_HAS_THREADS */
521 #if !defined (ACE_LACKS_FORK)
523 handler (int /* signum */)
525 // No printout here, to be safe. Signal handlers must not acquire
526 // locks, etc. It's not even safe to call ACE_OS::exit ()!
533 spawn_processes (ACCEPTOR
*acceptor
,
534 ACE_INET_Addr
*server_addr
)
536 pid_t
*children_ptr
= 0;
537 ACE_NEW_RETURN (children_ptr
,
540 ACE_Auto_Basic_Array_Ptr
<pid_t
> children (children_ptr
);
543 // Spawn off a number of server processes all of which will listen
544 // on the same port number for clients to connect.
545 for (i
= 0; i
< n_servers
; i
++)
547 pid_t pid
= ACE_OS::fork (ACE_TEXT ("child"));
551 ACE_ERROR ((LM_ERROR
,
552 ACE_TEXT ("(%P|%t) %p\n%a"),
553 ACE_TEXT ("fork failed"),
557 case 0: // In the child.
559 // Register a signal handler to close down the child.
560 ACE_Sig_Action
sa ((ACE_SignalHandler
) handler
, SIGTERM
);
563 server ((void *) acceptor
);
567 default: // In the parent.
573 client ((void *) server_addr
);
575 for (i
= 0; i
< n_servers
; i
++)
576 // Shutdown the servers.
577 if (ACE_OS::kill (children
[i
], SIGTERM
) == -1)
578 ACE_ERROR ((LM_ERROR
,
579 ACE_TEXT ("(%P|%t) %p for %d\n"),
580 ACE_TEXT ("kill"), children
[i
]));
586 child
= ACE_OS::waitpid (0, 0, 0);
588 ACE_DEBUG ((LM_DEBUG
,
589 ACE_TEXT ("(%P|%t) reaping %d\n"),
594 // Remove the lock so we don't have process semaphores lying around.
595 return acceptor
->acceptor ().lock ().remove ();
597 #endif /* ! ACE_LACKS_FORK */
599 #if defined (ACE_LACKS_FORK) && defined (ACE_HAS_THREADS) \
600 && !defined ACE_LACKS_ACCEPT
601 // Spawn threads and run the client and server.
605 spawn_threads (ACCEPTOR
*acceptor
,
606 ACE_INET_Addr
*server_addr
)
611 // Assign thread (VxWorks task) names to test that feature.
612 ACE_hthread_t
*server_name
= 0;
613 ACE_NEW_RETURN (server_name
,
614 ACE_hthread_t
[n_servers
],
617 // And test ability to provide stacks.
618 size_t *stack_size
= 0;
619 ACE_NEW_RETURN (stack_size
,
623 ACE_NEW_RETURN (stack
,
628 for (i
= 0; i
< n_servers
; ++i
)
630 ACE_NEW_RETURN (server_name
[i
], ACE_TCHAR
[32], -1);
631 ACE_OS::sprintf (server_name
[i
],
632 ACE_TEXT ("server%u"),
634 stack_size
[i
] = 40000;
635 ACE_NEW_RETURN (stack
[i
], char[stack_size
[i
]], -1);
637 // Initialize the stack for checkStack.
638 ACE_OS::memset (stack
[i
], 0xEE, stack_size
[i
]);
641 ACE_TCHAR
*client_name
= ACE_TEXT ("Conn client");
642 #endif /* ACE_HAS_VXTHREADS */
644 if (ACE_Thread_Manager::instance ()->spawn_n
647 (ACE_THR_FUNC
) server
,
650 , ACE_DEFAULT_THREAD_PRIORITY
655 #if 0 /* Don't support setting of stack, because it doesn't seem to work. */
661 #endif /* ACE_HAS_VXTHREADS */
663 ACE_ERROR ((LM_ERROR
,
664 ACE_TEXT ("(%P|%t) %p\n%a"),
665 ACE_TEXT ("server thread create failed"),
668 if (ACE_Thread_Manager::instance ()->spawn
669 ((ACE_THR_FUNC
) client
,
670 (void *) server_addr
,
675 #endif /* ACE_HAS_VXTHREADS */
677 ACE_ERROR ((LM_ERROR
,
678 ACE_TEXT ("(%P|%t) %p\n%a"),
679 ACE_TEXT ("client thread create failed"),
682 // Wait for the threads to exit.
683 // But, wait for a limited time because sometimes the test hangs on Irix.
684 ACE_Time_Value
const max_wait (200 /* seconds */);
685 ACE_Time_Value
const wait_time (ACE_OS::gettimeofday () + max_wait
);
686 if (ACE_Thread_Manager::instance ()->wait (&wait_time
) == -1)
689 ACE_ERROR ((LM_ERROR
,
690 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
693 ACE_OS::perror (ACE_TEXT ("wait"));
699 for (i
= 0; i
< n_servers
; ++i
)
701 delete [] server_name
[i
];
704 delete [] server_name
;
706 delete [] stack_size
;
707 #endif /* ACE_HAS_VXTHREADS */
711 #endif /* ! ACE_LACKS_FORK && ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */
714 run_main (int argc
, ACE_TCHAR
*argv
[])
716 ACE_START_TEST (ACE_TEXT ("Conn_Test"));
719 //FUZZ: disable check_for_lack_ACE_OS
720 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT ("c:i:s:"));
721 for (int c
; (c
= getopt ()) != -1; )
722 //FUZZ: enable check_for_lack_ACE_OS
726 n_clients
= ACE_OS::atoi (getopt
.opt_arg ());
729 n_client_iterations
= ACE_OS::atoi (getopt
.opt_arg ());
732 n_servers
= ACE_OS::atoi (getopt
.opt_arg ());
736 #ifndef ACE_LACKS_ACCEPT
739 ACE_INET_Addr server_addr
;
741 // Bind acceptor to any port and then find out what the port was.
742 if (acceptor
.open (ACE_sap_any_cast (const ACE_INET_Addr
&)) == -1
743 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
745 ACE_ERROR ((LM_ERROR
,
746 ACE_TEXT ("(%P|%t) %p\n"),
752 ACE_DEBUG ((LM_DEBUG
,
753 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
754 server_addr
.get_port_number ()));
756 # if !defined (ACE_LACKS_FORK)
757 if (spawn_processes (&acceptor
,
759 ACE_ERROR_RETURN ((LM_ERROR
,
760 ACE_TEXT ("(%P|%t) %p\n"),
761 ACE_TEXT ("spawn_processes")),
763 # elif defined (ACE_HAS_THREADS)
764 status
= spawn_threads (&acceptor
, &server_addr
);
765 # else /* ACE_LACKS_FORK && ! ACE_HAS_THREADS */
767 ACE_TEXT ("(%P|%t) ")
768 ACE_TEXT ("only one thread may be run")
769 ACE_TEXT (" in a process on this platform")));
770 # endif /* ACE_LACKS_FORK && ! ACE_HAS_THREADS */
773 # ifdef CLEANUP_PROCESS_MUTEX
774 ACE_Process_Mutex::unlink (acceptor
.acceptor ().lock ().name ());
777 #endif // ACE_LACKS_ACCEPT