2 //=============================================================================
6 * This is a test of the <ACE_Acceptor> and <ACE_Connector>
7 * classes. The test forks processes or spawns threads (depending
8 * upon the platform) and then executes client and server allowing
9 * them to connect and exchange data. The test also illustrates
10 * how the <ACE_Strategy_Connector> works by showing how you can
11 * cache connections on the client.
13 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
14 * @author Chris Cleeland <cleeland@cs.wustl.edu>
15 * @author and Irfan Pyarali <irfan@cs.wustl.edu>
17 //=============================================================================
20 #include "test_config.h"
21 #include "ace/SOCK_Connector.h"
22 #include "ace/LOCK_SOCK_Acceptor.h"
23 #include "ace/Acceptor.h"
24 #include "ace/Handle_Set.h"
25 #include "ace/Connector.h"
26 #include "ace/Auto_Ptr.h"
27 #include "ace/Get_Opt.h"
28 #include "ace/Process_Mutex.h"
29 #include "ace/Signal.h"
30 #include "Conn_Test.h"
31 #include "ace/Barrier.h"
32 #include "ace/OS_NS_signal.h"
33 #include "ace/OS_NS_sys_select.h"
34 #include "ace/OS_NS_sys_wait.h"
35 #include "ace/OS_NS_unistd.h"
36 #include "ace/os_include/os_netdb.h"
40 static const char ACE_ALPHABET
[] = "abcdefghijklmnopqrstuvwxyz";
42 // This test doesn't work well using fork() on MacOS X. So we
43 // will force it to use threads instead.
44 #if defined (__APPLE__)
45 # define ACE_LACKS_FORK
46 #endif /* __APPLE__ */
48 // The following works around bugs with some operating systems, which
49 // don't allow multiple threads/process to call accept() on the same
50 // listen-mode port/socket. Also, note that since timed accept is
51 // implemented using select(), and we use timed accepts with threads,
52 // we need a real lock when using timed accepts even if the OS has
53 // thread-safe accept.
55 #if defined (ACE_LACKS_FORK)
56 # if defined (ACE_HAS_THREADS)
57 # include "ace/Thread_Mutex.h"
58 typedef ACE_Thread_Mutex ACCEPTOR_LOCKING
;
60 # include "ace/Null_Mutex.h"
61 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
62 # endif /* ACE_HAS_THREADS */
64 # if defined (ACE_HAS_THREAD_SAFE_ACCEPT)
65 # include "ace/Null_Mutex.h"
66 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
68 # include "ace/Process_Mutex.h"
69 using ACCEPTOR_LOCKING
= ACE_Process_Mutex
;
70 # define CLEANUP_PROCESS_MUTEX
71 # endif /* ACE_HAS_THREAD_SAFE_ACCEPT */
72 #endif /* ACE_LACKS_FORK */
74 using ACCEPTOR
= ACE_Oneshot_Acceptor
<Svc_Handler
, ACE_LOCK_SOCK_Acceptor
<ACCEPTOR_LOCKING
>>;
75 using CONNECTOR
= ACE_Connector
<Svc_Handler
, ACE_SOCK_Connector
>;
76 using STRAT_CONNECTOR
= ACE_Strategy_Connector
<Svc_Handler
, ACE_SOCK_Connector
>;
77 using NULL_CREATION_STRATEGY
= ACE_NOOP_Creation_Strategy
<Svc_Handler
>;
78 using NULL_ACTIVATION_STRATEGY
= ACE_NOOP_Concurrency_Strategy
<Svc_Handler
>;
79 using CACHED_CONNECT_STRATEGY
= ACE_Cached_Connect_Strategy
<Svc_Handler
, ACE_SOCK_Connector
, ACE_MT_SYNCH::MUTEX
>;
81 #define CACHED_CONNECT_STRATEGY ACE_Cached_Connect_Strategy<Svc_Handler, ACE_SOCK_CONNECTOR, ACE_SYNCH_MUTEX>
82 #define REFCOUNTED_HASH_RECYCLABLE_ADDR ACE_Refcounted_Hash_Recyclable<ACE_INET_Addr>
84 // Default number of clients/servers.
85 #if defined (ACE_HAS_PHARLAP)
86 // PharLap is, by default, resource contrained. Test for something that works
87 // on the default configuration.
88 static int n_servers
= 2;
89 static int n_clients
= 4;
91 static int n_servers
= 5;
92 static int n_clients
= 5;
93 #endif /* ACE_HAS_PHARLAP */
95 static int n_client_iterations
= 3;
97 Svc_Handler::Svc_Handler (ACE_Thread_Manager
*)
102 Svc_Handler::open (void *)
104 ACE_DEBUG ((LM_DEBUG
,
105 ACE_TEXT ("(%P|%t) opening Svc_Handler %@ with handle %d\n"),
107 this->peer ().get_handle ()));
108 // Enable non-blocking I/O.
109 if (this->peer ().enable (ACE_NONBLOCK
) == -1)
110 ACE_ERROR_RETURN ((LM_ERROR
,
111 ACE_TEXT ("(%P|%t) %p\n"),
112 ACE_TEXT ("enable")),
118 Svc_Handler::recycle (void *)
120 ACE_DEBUG ((LM_DEBUG
,
121 ACE_TEXT ("(%P|%t) recycling Svc_Handler %@ with handle %d\n"),
123 this->peer ().get_handle ()));
128 Svc_Handler::send_data ()
130 // Send data to server.
132 for (const char *c
= ACE_ALPHABET
; *c
!= '\0'; c
++)
133 if (this->peer ().send_n (c
, 1) == -1)
134 ACE_ERROR ((LM_ERROR
,
135 ACE_TEXT ("(%P|%t) %p\n"),
136 ACE_TEXT ("send_n")));
140 Svc_Handler::recv_data ()
142 ACE_SOCK_Stream
&new_stream
= this->peer ();
144 ACE_Handle_Set handle_set
;
145 handle_set
.set_bit (new_stream
.get_handle ());
147 const char *t
= ACE_ALPHABET
;
149 // Read data from client (terminate on error).
151 for (ssize_t r_bytes
; ;)
153 // Since we're in non-blocking mode we need to use <select> to
154 // avoid busy waiting.
155 #if defined (ACE_WIN64)
156 int select_width
= 0;
158 int select_width
= int (new_stream
.get_handle ()) + 1;
159 #endif /* ACE_WIN64 */
160 if (ACE_OS::select (select_width
, handle_set
) == -1)
161 ACE_ERROR ((LM_ERROR
,
162 ACE_TEXT ("(%P|%t) %p\n"),
163 ACE_TEXT ("select")));
168 while ((r_bytes
= new_stream
.recv (&c
, 1)) > 0)
170 ACE_TEST_ASSERT (*t
== c
);
172 // We need to guard against cached connections, which
173 // will send multiple sequences of letters from 'a' ->
174 // 'z' through the same connection.
183 ACE_DEBUG ((LM_DEBUG
,
184 ACE_TEXT ("(%P|%t) reached end of input, connection closed by client\n")));
186 // Close endpoint handle (but don't close <this> yet
187 // since we're going to recycle it for the next
189 if (new_stream
.close () == -1)
190 ACE_ERROR ((LM_ERROR
,
191 ACE_TEXT ("(%P|%t) %p\n"),
192 ACE_TEXT ("close")));
195 else if (r_bytes
== -1)
197 if (errno
== EWOULDBLOCK
)
198 ACE_DEBUG ((LM_DEBUG
,
199 ACE_TEXT ("(%P|%t) no input available, going back to reading\n")));
201 ACE_ERROR ((LM_ERROR
,
202 ACE_TEXT ("(%P|%t) %p\n"),
203 ACE_TEXT ("recv_n")));
210 Svc_Handler::close (u_long side
)
212 // Only run this protocol if we're the write-side (i.e., "1").
213 if (side
== 1 && this->peer ().close () == -1)
214 ACE_ERROR ((LM_ERROR
,
215 ACE_TEXT ("(%P|%t) %p\n"),
216 ACE_TEXT ("close_writer")));
217 // Trigger the shutdown.
218 return this->handle_close ();
222 Svc_Handler::idle (u_long flags
)
224 ACE_DEBUG ((LM_DEBUG
,
225 ACE_TEXT ("(%P|%t) idling Svc_Handler %@ with handle %d\n"),
227 this->peer ().get_handle ()));
228 return ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
>::idle (flags
);
233 // Information passed to the client so it can communicate with the
236 ACE_INET_Addr
*server_addr_
;
237 // Address of the server to connect with.
239 CONNECTOR
*connector_
;
240 // Connection factory.
242 STRAT_CONNECTOR
*strat_connector_
;
243 // Strategy for connecting.
245 #if defined (ACE_HAS_THREADS)
246 ACE_Barrier
*barrier_
;
247 // Performs barrier synchronization.
248 #endif /* ACE_HAS_THREADS */
251 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
254 timed_blocking_connect (CONNECTOR
&con
,
255 const ACE_INET_Addr
&server_addr
)
257 ACE_Time_Value
tv (ACE_DEFAULT_TIMEOUT
);
258 ACE_Synch_Options
options (ACE_Synch_Options::USE_TIMEOUT
, tv
);
260 Svc_Handler
*svc_handler
;
261 ACE_NEW (svc_handler
,
264 // Perform a timed-blocking connect to the server (this should
265 // connect quickly since we're in the same address space or same
267 if (con
.connect (svc_handler
,
270 ACE_ERROR ((LM_ERROR
,
271 ACE_TEXT ("(%P|%t) %p\n"),
272 ACE_TEXT ("connection failed")));
275 // Send the data to the server.
276 svc_handler
->send_data ();
278 // Close the connection completely.
279 if (svc_handler
->close (1) == -1)
280 ACE_ERROR ((LM_ERROR
,
281 ACE_TEXT ("(%P|%t) %p\n"),
282 ACE_TEXT ("close")));
287 blocking_connect (CONNECTOR
&con
,
288 const ACE_INET_Addr
&server_addr
)
290 Svc_Handler
*svc_handler
;
291 ACE_NEW (svc_handler
,
294 // Perform a blocking connect to the server.
295 if (con
.connect (svc_handler
,
297 ACE_ERROR ((LM_ERROR
,
298 ACE_TEXT ("(%P|%t) %p\n"),
299 ACE_TEXT ("connection failed")));
302 // Send the data to the server.
303 svc_handler
->send_data ();
305 // Close the connection completely.
306 if (svc_handler
->close (1) == -1)
307 ACE_ERROR ((LM_ERROR
,
308 ACE_TEXT ("(%P|%t) %p\n"),
309 ACE_TEXT ("close")));
313 // This function runs the more sophisticated tests involving the
314 // Caching_Connect_Strategy.
317 cached_connect (STRAT_CONNECTOR
&con
,
318 const ACE_INET_Addr
&server_addr
)
320 Svc_Handler
*svc_handler
= 0;
322 for (int i
= 0; i
< n_client_iterations
; i
++)
324 // Perform a blocking connect to the server using the Strategy
325 // Connector with a connection caching strategy. Since we are
326 // connecting to the same <server_addr> these calls will return
327 // the same dynamically allocated <Svc_Handler> for each
329 if (con
.connect (svc_handler
,
332 ACE_ERROR ((LM_ERROR
,
333 ACE_TEXT ("(%P|%t) %p\n"),
334 ACE_TEXT ("connection failed")));
338 // Send the data to the server.
339 svc_handler
->send_data ();
341 // Svc_Handler is now idle, so mark it as such and let the cache
342 // recycle it in another thread.
343 svc_handler
->idle (1);
345 // Rest for a second to give another thread a chance to reuse the
352 client_connections (void *arg
)
354 Client_Info
*info
= (Client_Info
*) arg
;
356 // Run the timed-blocking test.
357 ACE_DEBUG ((LM_DEBUG
,
358 ACE_TEXT ("(%P|%t) **** starting timed-blocking connect\n")));
359 timed_blocking_connect (*info
->connector_
,
360 *info
->server_addr_
);
362 #if defined (ACE_HAS_THREADS)
363 // Wait for other threads to join us.
364 info
->barrier_
->wait ();
365 #endif /* ACE_HAS_THREADS */
367 // Run the blocking test.
368 ACE_DEBUG ((LM_DEBUG
,
369 ACE_TEXT ("(%P|%t) **** starting blocking connect\n")));
370 blocking_connect (*info
->connector_
,
371 *info
->server_addr_
);
373 #if defined (ACE_HAS_THREADS)
374 // Wait for other threads to join us.
375 info
->barrier_
->wait ();
376 #endif /* ACE_HAS_THREADS */
378 // Run the cached blocking test.
379 ACE_DEBUG ((LM_DEBUG
,
380 ACE_TEXT ("(%P|%t) **** starting cached blocking connect\n")));
381 cached_connect (*info
->strat_connector_
,
382 *info
->server_addr_
);
386 // Execute the client tests.
391 ACE_INET_Addr
*remote_addr
= reinterpret_cast<ACE_INET_Addr
*> (arg
);
392 ACE_INET_Addr
server_addr (remote_addr
->get_port_number (),
393 ACE_DEFAULT_SERVER_HOST
);
396 NULL_CREATION_STRATEGY creation_strategy
;
397 NULL_ACTIVATION_STRATEGY activation_strategy
;
398 // Configure the Strategy Connector with a strategy that caches
400 CACHED_CONNECT_STRATEGY caching_connect_strategy
;
402 STRAT_CONNECTOR
strat_connector (0,
404 &caching_connect_strategy
,
405 &activation_strategy
);
407 info
.server_addr_
= &server_addr
;
408 info
.connector_
= &connector
;
409 info
.strat_connector_
= &strat_connector
;
411 #if defined (ACE_HAS_THREADS)
412 int n_threads
= n_clients
;
413 ACE_Barrier
thread_barrier (n_threads
);
414 info
.barrier_
= &thread_barrier
;
416 ACE_Thread_Manager client_manager
;
418 if (client_manager
.spawn_n
420 (ACE_THR_FUNC
) client_connections
,
423 ACE_ERROR ((LM_ERROR
,
424 ACE_TEXT ("(%P|%t) %p\n%a"),
425 ACE_TEXT ("client thread spawn failed"),
428 // Wait for the threads to exit.
429 client_manager
.wait ();
431 #else /* ACE_HAS_THREADS */
432 client_connections (&info
);
433 #endif /* ACE_HAS_THREADS */
437 // Performs the iterative server activities.
442 #if defined (VXWORKS)
443 ACE_DEBUG ((LM_DEBUG
,
444 ACE_TEXT ("(%P|%t) server stack size is %u\n"),
445 ACE_OS::thr_min_stack ()));
448 ACCEPTOR
*acceptor
= (ACCEPTOR
*) arg
;
449 ACE_INET_Addr cli_addr
;
450 ACE_TCHAR peer_host
[MAXHOSTNAMELEN
];
451 const ACE_Time_Value
tv (ACE_DEFAULT_TIMEOUT
);
452 ACE_Synch_Options
options (ACE_Synch_Options::USE_TIMEOUT
, tv
);
454 Svc_Handler
*svc_handler
;
455 ACE_NEW_RETURN (svc_handler
,
459 // Keep looping until we timeout on <accept> or fail.
463 // Create a new <Svc_Handler> to consume the data.
465 #if defined (ACE_LACKS_FORK)
466 int result
= acceptor
->accept (svc_handler
,
469 #else /* ! ACE_LACKS_FORK */
470 int result
= acceptor
->accept (svc_handler
,
472 ACE_UNUSED_ARG (options
);
473 #endif /* ! ACE_LACKS_FORK */
475 // Timing out is the only way for threads to stop accepting
476 // since we don't have signals.
480 // svc_handler->close (); The ACE_Onsehot_Acceptor closed it.
482 if (errno
== ETIMEDOUT
)
484 ACE_DEBUG ((LM_DEBUG
,
485 ACE_TEXT ("accept timed out\n")));
489 ACE_ERROR_RETURN ((LM_ERROR
,
490 ACE_TEXT ("(%P|%t) %p\n"),
491 ACE_TEXT ("accept failed, shutting down")),
494 // Use this rather than get_host_name() to properly adjust to the
495 // charset width in use.
496 cli_addr
.get_host_name (peer_host
, MAXHOSTNAMELEN
);
497 ACE_DEBUG ((LM_DEBUG
,
498 ACE_TEXT ("(%P|%t) client %s connected from %d\n"),
500 cli_addr
.get_port_number ()));
502 svc_handler
->recv_data ();
505 ACE_NOTREACHED (return 0);
508 #endif /* !ACE_LACKS_FORK || ACE_HAS_THREADS */
510 #if !defined (ACE_LACKS_FORK)
512 handler (int /* signum */)
514 // No printout here, to be safe. Signal handlers must not acquire
515 // locks, etc. It's not even safe to call ACE_OS::exit ()!
522 spawn_processes (ACCEPTOR
*acceptor
,
523 ACE_INET_Addr
*server_addr
)
525 pid_t
*children_ptr
= 0;
526 ACE_NEW_RETURN (children_ptr
,
529 ACE_Auto_Basic_Array_Ptr
<pid_t
> children (children_ptr
);
532 // Spawn off a number of server processes all of which will listen
533 // on the same port number for clients to connect.
534 for (i
= 0; i
< n_servers
; i
++)
536 pid_t pid
= ACE_OS::fork (ACE_TEXT ("child"));
540 ACE_ERROR ((LM_ERROR
,
541 ACE_TEXT ("(%P|%t) %p\n%a"),
542 ACE_TEXT ("fork failed"),
546 case 0: // In the child.
548 // Register a signal handler to close down the child.
549 ACE_Sig_Action
sa ((ACE_SignalHandler
) handler
, SIGTERM
);
552 server ((void *) acceptor
);
556 default: // In the parent.
562 client ((void *) server_addr
);
564 for (i
= 0; i
< n_servers
; i
++)
565 // Shutdown the servers.
566 if (ACE_OS::kill (children
[i
], SIGTERM
) == -1)
567 ACE_ERROR ((LM_ERROR
,
568 ACE_TEXT ("(%P|%t) %p for %d\n"),
569 ACE_TEXT ("kill"), children
[i
]));
575 child
= ACE_OS::waitpid (0, 0, 0);
577 ACE_DEBUG ((LM_DEBUG
,
578 ACE_TEXT ("(%P|%t) reaping %d\n"),
583 // Remove the lock so we don't have process semaphores lying around.
584 return acceptor
->acceptor ().lock ().remove ();
586 #endif /* ! ACE_LACKS_FORK */
588 #if defined (ACE_LACKS_FORK) && defined (ACE_HAS_THREADS) \
589 && !defined ACE_LACKS_ACCEPT
590 // Spawn threads and run the client and server.
594 spawn_threads (ACCEPTOR
*acceptor
,
595 ACE_INET_Addr
*server_addr
)
600 // Assign thread (VxWorks task) names to test that feature.
601 ACE_hthread_t
*server_name
= 0;
602 ACE_NEW_RETURN (server_name
,
603 ACE_hthread_t
[n_servers
],
606 // And test ability to provide stacks.
607 size_t *stack_size
= 0;
608 ACE_NEW_RETURN (stack_size
,
612 ACE_NEW_RETURN (stack
,
617 for (i
= 0; i
< n_servers
; ++i
)
619 ACE_NEW_RETURN (server_name
[i
], ACE_TCHAR
[32], -1);
620 ACE_OS::sprintf (server_name
[i
],
621 ACE_TEXT ("server%u"),
623 stack_size
[i
] = 40000;
624 ACE_NEW_RETURN (stack
[i
], char[stack_size
[i
]], -1);
626 // Initialize the stack for checkStack.
627 ACE_OS::memset (stack
[i
], 0xEE, stack_size
[i
]);
630 ACE_TCHAR
*client_name
= ACE_TEXT ("Conn client");
631 #endif /* ACE_HAS_VXTHREADS */
633 if (ACE_Thread_Manager::instance ()->spawn_n
636 (ACE_THR_FUNC
) server
,
639 , ACE_DEFAULT_THREAD_PRIORITY
644 #if 0 /* Don't support setting of stack, because it doesn't seem to work. */
650 #endif /* ACE_HAS_VXTHREADS */
652 ACE_ERROR ((LM_ERROR
,
653 ACE_TEXT ("(%P|%t) %p\n%a"),
654 ACE_TEXT ("server thread create failed"),
657 if (ACE_Thread_Manager::instance ()->spawn
658 ((ACE_THR_FUNC
) client
,
659 (void *) server_addr
,
664 #endif /* ACE_HAS_VXTHREADS */
666 ACE_ERROR ((LM_ERROR
,
667 ACE_TEXT ("(%P|%t) %p\n%a"),
668 ACE_TEXT ("client thread create failed"),
671 // Wait for the threads to exit.
672 // But, wait for a limited time because sometimes the test hangs on Irix.
673 ACE_Time_Value
const max_wait (200 /* seconds */);
674 ACE_Time_Value
const wait_time (ACE_OS::gettimeofday () + max_wait
);
675 if (ACE_Thread_Manager::instance ()->wait (&wait_time
) == -1)
678 ACE_ERROR ((LM_ERROR
,
679 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
682 ACE_OS::perror (ACE_TEXT ("wait"));
688 for (i
= 0; i
< n_servers
; ++i
)
690 delete [] server_name
[i
];
693 delete [] server_name
;
695 delete [] stack_size
;
696 #endif /* ACE_HAS_VXTHREADS */
700 #endif /* ! ACE_LACKS_FORK && ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */
703 run_main (int argc
, ACE_TCHAR
*argv
[])
705 ACE_START_TEST (ACE_TEXT ("Conn_Test"));
708 //FUZZ: disable check_for_lack_ACE_OS
709 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT ("c:i:s:"));
710 for (int c
; (c
= getopt ()) != -1; )
711 //FUZZ: enable check_for_lack_ACE_OS
715 n_clients
= ACE_OS::atoi (getopt
.opt_arg ());
718 n_client_iterations
= ACE_OS::atoi (getopt
.opt_arg ());
721 n_servers
= ACE_OS::atoi (getopt
.opt_arg ());
725 #ifndef ACE_LACKS_ACCEPT
728 ACE_INET_Addr server_addr
;
730 // Bind acceptor to any port and then find out what the port was.
731 if (acceptor
.open (ACE_sap_any_cast (const ACE_INET_Addr
&)) == -1
732 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
734 ACE_ERROR ((LM_ERROR
,
735 ACE_TEXT ("(%P|%t) %p\n"),
741 ACE_DEBUG ((LM_DEBUG
,
742 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
743 server_addr
.get_port_number ()));
745 # if !defined (ACE_LACKS_FORK)
746 if (spawn_processes (&acceptor
,
748 ACE_ERROR_RETURN ((LM_ERROR
,
749 ACE_TEXT ("(%P|%t) %p\n"),
750 ACE_TEXT ("spawn_processes")),
752 # elif defined (ACE_HAS_THREADS)
753 status
= spawn_threads (&acceptor
, &server_addr
);
754 # else /* ACE_LACKS_FORK && ! ACE_HAS_THREADS */
756 ACE_TEXT ("(%P|%t) ")
757 ACE_TEXT ("only one thread may be run")
758 ACE_TEXT (" in a process on this platform")));
759 # endif /* ACE_LACKS_FORK && ! ACE_HAS_THREADS */
762 # ifdef CLEANUP_PROCESS_MUTEX
763 ACE_Process_Mutex::unlink (acceptor
.acceptor ().lock ().name ());
766 #endif // ACE_LACKS_ACCEPT