2 //=============================================================================
6 * This is a test of the <ACE_Acceptor> and <ACE_Connector>
7 * classes. The test forks processes or spawns threads (depending
8 * upon the platform) and then executes client and server allowing
9 * them to connect and exchange data. The test also illustrates
10 * how the <ACE_Strategy_Connector> works by showing how you can
11 * cache connections on the client.
13 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
14 * @author Chris Cleeland <cleeland@cs.wustl.edu>
15 * @author and Irfan Pyarali <irfan@cs.wustl.edu>
17 //=============================================================================
20 #include "test_config.h"
21 #include "ace/SOCK_Connector.h"
22 #include "ace/LOCK_SOCK_Acceptor.h"
23 #include "ace/Acceptor.h"
24 #include "ace/Handle_Set.h"
25 #include "ace/Connector.h"
27 #include "ace/Get_Opt.h"
28 #include "ace/Process_Mutex.h"
29 #include "ace/Signal.h"
30 #include "Conn_Test.h"
31 #include "ace/Barrier.h"
32 #include "ace/OS_NS_signal.h"
33 #include "ace/OS_NS_sys_select.h"
34 #include "ace/OS_NS_sys_wait.h"
35 #include "ace/OS_NS_unistd.h"
36 #include "ace/os_include/os_netdb.h"
39 static const char ACE_ALPHABET
[] = "abcdefghijklmnopqrstuvwxyz";
41 // This test doesn't work well using fork() on MacOS X. So we
42 // will force it to use threads instead.
43 #if defined (__APPLE__)
44 # define ACE_LACKS_FORK
45 #endif /* __APPLE__ */
47 // The following works around bugs with some operating systems, which
48 // don't allow multiple threads/process to call accept() on the same
49 // listen-mode port/socket. Also, note that since timed accept is
50 // implemented using select(), and we use timed accepts with threads,
51 // we need a real lock when using timed accepts even if the OS has
52 // thread-safe accept.
54 #if defined (ACE_LACKS_FORK)
55 # if defined (ACE_HAS_THREADS)
56 # include "ace/Thread_Mutex.h"
57 typedef ACE_Thread_Mutex ACCEPTOR_LOCKING
;
59 # include "ace/Null_Mutex.h"
60 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
61 # endif /* ACE_HAS_THREADS */
63 # if defined (ACE_HAS_THREAD_SAFE_ACCEPT)
64 # include "ace/Null_Mutex.h"
65 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
67 # include "ace/Process_Mutex.h"
68 using ACCEPTOR_LOCKING
= ACE_Process_Mutex
;
69 # define CLEANUP_PROCESS_MUTEX
70 # endif /* ACE_HAS_THREAD_SAFE_ACCEPT */
71 #endif /* ACE_LACKS_FORK */
73 using ACCEPTOR
= ACE_Oneshot_Acceptor
<Svc_Handler
, ACE_LOCK_SOCK_Acceptor
<ACCEPTOR_LOCKING
>>;
74 using CONNECTOR
= ACE_Connector
<Svc_Handler
, ACE_SOCK_Connector
>;
75 using STRAT_CONNECTOR
= ACE_Strategy_Connector
<Svc_Handler
, ACE_SOCK_Connector
>;
76 using NULL_CREATION_STRATEGY
= ACE_NOOP_Creation_Strategy
<Svc_Handler
>;
77 using NULL_ACTIVATION_STRATEGY
= ACE_NOOP_Concurrency_Strategy
<Svc_Handler
>;
78 using CACHED_CONNECT_STRATEGY
= ACE_Cached_Connect_Strategy
<Svc_Handler
, ACE_SOCK_Connector
, ACE_MT_SYNCH::MUTEX
>;
80 #define CACHED_CONNECT_STRATEGY ACE_Cached_Connect_Strategy<Svc_Handler, ACE_SOCK_CONNECTOR, ACE_SYNCH_MUTEX>
81 #define REFCOUNTED_HASH_RECYCLABLE_ADDR ACE_Refcounted_Hash_Recyclable<ACE_INET_Addr>
83 // Default number of clients/servers.
84 static int n_servers
= 5;
85 static int n_clients
= 5;
86 static int n_client_iterations
= 3;
88 Svc_Handler::Svc_Handler (ACE_Thread_Manager
*)
93 Svc_Handler::open (void *)
96 ACE_TEXT ("(%P|%t) opening Svc_Handler %@ with handle %d\n"),
98 this->peer ().get_handle ()));
99 // Enable non-blocking I/O.
100 if (this->peer ().enable (ACE_NONBLOCK
) == -1)
101 ACE_ERROR_RETURN ((LM_ERROR
,
102 ACE_TEXT ("(%P|%t) %p\n"),
103 ACE_TEXT ("enable")),
109 Svc_Handler::recycle (void *)
111 ACE_DEBUG ((LM_DEBUG
,
112 ACE_TEXT ("(%P|%t) recycling Svc_Handler %@ with handle %d\n"),
114 this->peer ().get_handle ()));
119 Svc_Handler::send_data ()
121 // Send data to server.
123 for (const char *c
= ACE_ALPHABET
; *c
!= '\0'; c
++)
124 if (this->peer ().send_n (c
, 1) == -1)
125 ACE_ERROR ((LM_ERROR
,
126 ACE_TEXT ("(%P|%t) %p\n"),
127 ACE_TEXT ("send_n")));
131 Svc_Handler::recv_data ()
133 ACE_SOCK_Stream
&new_stream
= this->peer ();
135 ACE_Handle_Set handle_set
;
136 handle_set
.set_bit (new_stream
.get_handle ());
138 const char *t
= ACE_ALPHABET
;
140 // Read data from client (terminate on error).
142 for (ssize_t r_bytes
; ;)
144 // Since we're in non-blocking mode we need to use <select> to
145 // avoid busy waiting.
146 #if defined (ACE_WIN64)
147 int select_width
= 0;
149 int select_width
= int (new_stream
.get_handle ()) + 1;
150 #endif /* ACE_WIN64 */
151 if (ACE_OS::select (select_width
, handle_set
) == -1)
152 ACE_ERROR ((LM_ERROR
,
153 ACE_TEXT ("(%P|%t) %p\n"),
154 ACE_TEXT ("select")));
159 while ((r_bytes
= new_stream
.recv (&c
, 1)) > 0)
161 ACE_TEST_ASSERT (*t
== c
);
163 // We need to guard against cached connections, which
164 // will send multiple sequences of letters from 'a' ->
165 // 'z' through the same connection.
174 ACE_DEBUG ((LM_DEBUG
,
175 ACE_TEXT ("(%P|%t) reached end of input, connection closed by client\n")));
177 // Close endpoint handle (but don't close <this> yet
178 // since we're going to recycle it for the next
180 if (new_stream
.close () == -1)
181 ACE_ERROR ((LM_ERROR
,
182 ACE_TEXT ("(%P|%t) %p\n"),
183 ACE_TEXT ("close")));
186 else if (r_bytes
== -1)
188 if (errno
== EWOULDBLOCK
)
189 ACE_DEBUG ((LM_DEBUG
,
190 ACE_TEXT ("(%P|%t) no input available, going back to reading\n")));
192 ACE_ERROR ((LM_ERROR
,
193 ACE_TEXT ("(%P|%t) %p\n"),
194 ACE_TEXT ("recv_n")));
201 Svc_Handler::close (u_long side
)
203 // Only run this protocol if we're the write-side (i.e., "1").
204 if (side
== 1 && this->peer ().close () == -1)
205 ACE_ERROR ((LM_ERROR
,
206 ACE_TEXT ("(%P|%t) %p\n"),
207 ACE_TEXT ("close_writer")));
208 // Trigger the shutdown.
209 return this->handle_close ();
213 Svc_Handler::idle (u_long flags
)
215 ACE_DEBUG ((LM_DEBUG
,
216 ACE_TEXT ("(%P|%t) idling Svc_Handler %@ with handle %d\n"),
218 this->peer ().get_handle ()));
219 return ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
>::idle (flags
);
224 // Information passed to the client so it can communicate with the
227 ACE_INET_Addr
*server_addr_
;
228 // Address of the server to connect with.
230 CONNECTOR
*connector_
;
231 // Connection factory.
233 STRAT_CONNECTOR
*strat_connector_
;
234 // Strategy for connecting.
236 #if defined (ACE_HAS_THREADS)
237 ACE_Barrier
*barrier_
;
238 // Performs barrier synchronization.
239 #endif /* ACE_HAS_THREADS */
242 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
245 timed_blocking_connect (CONNECTOR
&con
,
246 const ACE_INET_Addr
&server_addr
)
248 ACE_Time_Value
tv (ACE_DEFAULT_TIMEOUT
);
249 ACE_Synch_Options
options (ACE_Synch_Options::USE_TIMEOUT
, tv
);
251 Svc_Handler
*svc_handler
;
252 ACE_NEW (svc_handler
,
255 // Perform a timed-blocking connect to the server (this should
256 // connect quickly since we're in the same address space or same
258 if (con
.connect (svc_handler
,
261 ACE_ERROR ((LM_ERROR
,
262 ACE_TEXT ("(%P|%t) %p\n"),
263 ACE_TEXT ("connection failed")));
266 // Send the data to the server.
267 svc_handler
->send_data ();
269 // Close the connection completely.
270 if (svc_handler
->close (1) == -1)
271 ACE_ERROR ((LM_ERROR
,
272 ACE_TEXT ("(%P|%t) %p\n"),
273 ACE_TEXT ("close")));
278 blocking_connect (CONNECTOR
&con
,
279 const ACE_INET_Addr
&server_addr
)
281 Svc_Handler
*svc_handler
;
282 ACE_NEW (svc_handler
,
285 // Perform a blocking connect to the server.
286 if (con
.connect (svc_handler
,
288 ACE_ERROR ((LM_ERROR
,
289 ACE_TEXT ("(%P|%t) %p\n"),
290 ACE_TEXT ("connection failed")));
293 // Send the data to the server.
294 svc_handler
->send_data ();
296 // Close the connection completely.
297 if (svc_handler
->close (1) == -1)
298 ACE_ERROR ((LM_ERROR
,
299 ACE_TEXT ("(%P|%t) %p\n"),
300 ACE_TEXT ("close")));
304 // This function runs the more sophisticated tests involving the
305 // Caching_Connect_Strategy.
308 cached_connect (STRAT_CONNECTOR
&con
,
309 const ACE_INET_Addr
&server_addr
)
311 Svc_Handler
*svc_handler
= 0;
313 for (int i
= 0; i
< n_client_iterations
; i
++)
315 // Perform a blocking connect to the server using the Strategy
316 // Connector with a connection caching strategy. Since we are
317 // connecting to the same <server_addr> these calls will return
318 // the same dynamically allocated <Svc_Handler> for each
320 if (con
.connect (svc_handler
,
323 ACE_ERROR ((LM_ERROR
,
324 ACE_TEXT ("(%P|%t) %p\n"),
325 ACE_TEXT ("connection failed")));
329 // Send the data to the server.
330 svc_handler
->send_data ();
332 // Svc_Handler is now idle, so mark it as such and let the cache
333 // recycle it in another thread.
334 svc_handler
->idle (1);
336 // Rest for a second to give another thread a chance to reuse the
343 client_connections (void *arg
)
345 Client_Info
*info
= (Client_Info
*) arg
;
347 // Run the timed-blocking test.
348 ACE_DEBUG ((LM_DEBUG
,
349 ACE_TEXT ("(%P|%t) **** starting timed-blocking connect\n")));
350 timed_blocking_connect (*info
->connector_
,
351 *info
->server_addr_
);
353 #if defined (ACE_HAS_THREADS)
354 // Wait for other threads to join us.
355 info
->barrier_
->wait ();
356 #endif /* ACE_HAS_THREADS */
358 // Run the blocking test.
359 ACE_DEBUG ((LM_DEBUG
,
360 ACE_TEXT ("(%P|%t) **** starting blocking connect\n")));
361 blocking_connect (*info
->connector_
,
362 *info
->server_addr_
);
364 #if defined (ACE_HAS_THREADS)
365 // Wait for other threads to join us.
366 info
->barrier_
->wait ();
367 #endif /* ACE_HAS_THREADS */
369 // Run the cached blocking test.
370 ACE_DEBUG ((LM_DEBUG
,
371 ACE_TEXT ("(%P|%t) **** starting cached blocking connect\n")));
372 cached_connect (*info
->strat_connector_
,
373 *info
->server_addr_
);
377 // Execute the client tests.
382 ACE_INET_Addr
*remote_addr
= reinterpret_cast<ACE_INET_Addr
*> (arg
);
383 ACE_INET_Addr
server_addr (remote_addr
->get_port_number (),
384 ACE_DEFAULT_SERVER_HOST
);
387 NULL_CREATION_STRATEGY creation_strategy
;
388 NULL_ACTIVATION_STRATEGY activation_strategy
;
389 // Configure the Strategy Connector with a strategy that caches
391 CACHED_CONNECT_STRATEGY caching_connect_strategy
;
393 STRAT_CONNECTOR
strat_connector (0,
395 &caching_connect_strategy
,
396 &activation_strategy
);
398 info
.server_addr_
= &server_addr
;
399 info
.connector_
= &connector
;
400 info
.strat_connector_
= &strat_connector
;
402 #if defined (ACE_HAS_THREADS)
403 int n_threads
= n_clients
;
404 ACE_Barrier
thread_barrier (n_threads
);
405 info
.barrier_
= &thread_barrier
;
407 ACE_Thread_Manager client_manager
;
409 if (client_manager
.spawn_n
411 (ACE_THR_FUNC
) client_connections
,
414 ACE_ERROR ((LM_ERROR
,
415 ACE_TEXT ("(%P|%t) %p\n%a"),
416 ACE_TEXT ("client thread spawn failed"),
419 // Wait for the threads to exit.
420 client_manager
.wait ();
422 #else /* ACE_HAS_THREADS */
423 client_connections (&info
);
424 #endif /* ACE_HAS_THREADS */
428 // Performs the iterative server activities.
433 #if defined (VXWORKS)
434 ACE_DEBUG ((LM_DEBUG
,
435 ACE_TEXT ("(%P|%t) server stack size is %u\n"),
436 ACE_OS::thr_min_stack ()));
439 ACCEPTOR
*acceptor
= (ACCEPTOR
*) arg
;
440 ACE_INET_Addr cli_addr
;
441 ACE_TCHAR peer_host
[MAXHOSTNAMELEN
];
442 const ACE_Time_Value
tv (ACE_DEFAULT_TIMEOUT
);
443 ACE_Synch_Options
options (ACE_Synch_Options::USE_TIMEOUT
, tv
);
445 Svc_Handler
*svc_handler
;
446 ACE_NEW_RETURN (svc_handler
,
450 // Keep looping until we timeout on <accept> or fail.
454 // Create a new <Svc_Handler> to consume the data.
456 #if defined (ACE_LACKS_FORK)
457 int result
= acceptor
->accept (svc_handler
,
460 #else /* ! ACE_LACKS_FORK */
461 int result
= acceptor
->accept (svc_handler
,
463 ACE_UNUSED_ARG (options
);
464 #endif /* ! ACE_LACKS_FORK */
466 // Timing out is the only way for threads to stop accepting
467 // since we don't have signals.
471 // svc_handler->close (); The ACE_Onsehot_Acceptor closed it.
473 if (errno
== ETIMEDOUT
)
475 ACE_DEBUG ((LM_DEBUG
,
476 ACE_TEXT ("accept timed out\n")));
480 ACE_ERROR_RETURN ((LM_ERROR
,
481 ACE_TEXT ("(%P|%t) %p\n"),
482 ACE_TEXT ("accept failed, shutting down")),
485 // Use this rather than get_host_name() to properly adjust to the
486 // charset width in use.
487 cli_addr
.get_host_name (peer_host
, MAXHOSTNAMELEN
);
488 ACE_DEBUG ((LM_DEBUG
,
489 ACE_TEXT ("(%P|%t) client %s connected from %d\n"),
491 cli_addr
.get_port_number ()));
493 svc_handler
->recv_data ();
496 ACE_NOTREACHED (return 0);
499 #endif /* !ACE_LACKS_FORK || ACE_HAS_THREADS */
501 #if !defined (ACE_LACKS_FORK)
503 handler (int /* signum */)
505 // No printout here, to be safe. Signal handlers must not acquire
506 // locks, etc. It's not even safe to call ACE_OS::exit ()!
513 spawn_processes (ACCEPTOR
*acceptor
,
514 ACE_INET_Addr
*server_addr
)
516 pid_t
*children_ptr
= 0;
517 ACE_NEW_RETURN (children_ptr
,
520 std::unique_ptr
<pid_t
[]> children (children_ptr
);
523 // Spawn off a number of server processes all of which will listen
524 // on the same port number for clients to connect.
525 for (i
= 0; i
< n_servers
; i
++)
527 pid_t pid
= ACE_OS::fork (ACE_TEXT ("child"));
531 ACE_ERROR ((LM_ERROR
,
532 ACE_TEXT ("(%P|%t) %p\n%a"),
533 ACE_TEXT ("fork failed"),
537 case 0: // In the child.
539 // Register a signal handler to close down the child.
540 ACE_Sig_Action
sa ((ACE_SignalHandler
) handler
, SIGTERM
);
543 server ((void *) acceptor
);
547 default: // In the parent.
553 client ((void *) server_addr
);
555 for (i
= 0; i
< n_servers
; i
++)
556 // Shutdown the servers.
557 if (ACE_OS::kill (children
[i
], SIGTERM
) == -1)
558 ACE_ERROR ((LM_ERROR
,
559 ACE_TEXT ("(%P|%t) %p for %d\n"),
560 ACE_TEXT ("kill"), children
[i
]));
566 child
= ACE_OS::waitpid (0, 0, 0);
568 ACE_DEBUG ((LM_DEBUG
,
569 ACE_TEXT ("(%P|%t) reaping %d\n"),
574 // Remove the lock so we don't have process semaphores lying around.
575 return acceptor
->acceptor ().lock ().remove ();
577 #endif /* ! ACE_LACKS_FORK */
579 #if defined (ACE_LACKS_FORK) && defined (ACE_HAS_THREADS) \
580 && !defined ACE_LACKS_ACCEPT
581 // Spawn threads and run the client and server.
585 spawn_threads (ACCEPTOR
*acceptor
,
586 ACE_INET_Addr
*server_addr
)
591 // Assign thread (VxWorks task) names to test that feature.
592 ACE_hthread_t
*server_name
= 0;
593 ACE_NEW_RETURN (server_name
,
594 ACE_hthread_t
[n_servers
],
597 // And test ability to provide stacks.
598 size_t *stack_size
= 0;
599 ACE_NEW_RETURN (stack_size
,
603 ACE_NEW_RETURN (stack
,
608 for (i
= 0; i
< n_servers
; ++i
)
610 ACE_NEW_RETURN (server_name
[i
], ACE_TCHAR
[32], -1);
611 ACE_OS::sprintf (server_name
[i
],
612 ACE_TEXT ("server%u"),
614 stack_size
[i
] = 40000;
615 ACE_NEW_RETURN (stack
[i
], char[stack_size
[i
]], -1);
617 // Initialize the stack for checkStack.
618 ACE_OS::memset (stack
[i
], 0xEE, stack_size
[i
]);
621 ACE_TCHAR
*client_name
= ACE_TEXT ("Conn client");
622 #endif /* ACE_HAS_VXTHREADS */
624 if (ACE_Thread_Manager::instance ()->spawn_n
627 (ACE_THR_FUNC
) server
,
630 , ACE_DEFAULT_THREAD_PRIORITY
635 #if 0 /* Don't support setting of stack, because it doesn't seem to work. */
641 #endif /* ACE_HAS_VXTHREADS */
643 ACE_ERROR ((LM_ERROR
,
644 ACE_TEXT ("(%P|%t) %p\n%a"),
645 ACE_TEXT ("server thread create failed"),
648 if (ACE_Thread_Manager::instance ()->spawn
649 ((ACE_THR_FUNC
) client
,
650 (void *) server_addr
,
655 #endif /* ACE_HAS_VXTHREADS */
657 ACE_ERROR ((LM_ERROR
,
658 ACE_TEXT ("(%P|%t) %p\n%a"),
659 ACE_TEXT ("client thread create failed"),
662 // Wait for the threads to exit.
663 // But, wait for a limited time because sometimes the test hangs on Irix.
664 ACE_Time_Value
const max_wait (200 /* seconds */);
665 ACE_Time_Value
const wait_time (ACE_OS::gettimeofday () + max_wait
);
666 if (ACE_Thread_Manager::instance ()->wait (&wait_time
) == -1)
669 ACE_ERROR ((LM_ERROR
,
670 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
673 ACE_OS::perror (ACE_TEXT ("wait"));
679 for (i
= 0; i
< n_servers
; ++i
)
681 delete [] server_name
[i
];
684 delete [] server_name
;
686 delete [] stack_size
;
687 #endif /* ACE_HAS_VXTHREADS */
691 #endif /* ! ACE_LACKS_FORK && ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */
694 run_main (int argc
, ACE_TCHAR
*argv
[])
696 ACE_START_TEST (ACE_TEXT ("Conn_Test"));
699 //FUZZ: disable check_for_lack_ACE_OS
700 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT ("c:i:s:"));
701 for (int c
; (c
= getopt ()) != -1; )
702 //FUZZ: enable check_for_lack_ACE_OS
706 n_clients
= ACE_OS::atoi (getopt
.opt_arg ());
709 n_client_iterations
= ACE_OS::atoi (getopt
.opt_arg ());
712 n_servers
= ACE_OS::atoi (getopt
.opt_arg ());
716 #ifndef ACE_LACKS_ACCEPT
719 ACE_INET_Addr server_addr
;
721 // Bind acceptor to any port and then find out what the port was.
722 if (acceptor
.open (ACE_sap_any_cast (const ACE_INET_Addr
&)) == -1
723 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
725 ACE_ERROR ((LM_ERROR
,
726 ACE_TEXT ("(%P|%t) %p\n"),
732 ACE_DEBUG ((LM_DEBUG
,
733 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
734 server_addr
.get_port_number ()));
736 # if !defined (ACE_LACKS_FORK)
737 if (spawn_processes (&acceptor
,
739 ACE_ERROR_RETURN ((LM_ERROR
,
740 ACE_TEXT ("(%P|%t) %p\n"),
741 ACE_TEXT ("spawn_processes")),
743 # elif defined (ACE_HAS_THREADS)
744 status
= spawn_threads (&acceptor
, &server_addr
);
745 # else /* ACE_LACKS_FORK && ! ACE_HAS_THREADS */
747 ACE_TEXT ("(%P|%t) ")
748 ACE_TEXT ("only one thread may be run")
749 ACE_TEXT (" in a process on this platform")));
750 # endif /* ACE_LACKS_FORK && ! ACE_HAS_THREADS */
753 # ifdef CLEANUP_PROCESS_MUTEX
754 ACE_Process_Mutex::unlink (acceptor
.acceptor ().lock ().name ());
757 #endif // ACE_LACKS_ACCEPT