2 //=============================================================================
4 * @file MT_NonBlocking_Connect_Test.cpp
6 * This test makes non-blocking connects from multiple threads. Things are
7 * complicated by the fact that after the connection is attempted reactor's
8 * handle_events() is called and this leads to possible connection
9 * completion in the other thread. This is similar to what TAO does for
10 * oneway with SYNC_NONE sync scope policy.
11 * The following reactors are tested: Select, TP, WFMO, and Dev Poll
14 * @author Vladimir Zykov <vladimir.zykov@prismtech.com>
16 //=============================================================================
19 #include "test_config.h"
20 #include "ace/Reactor.h"
21 #include "ace/Select_Reactor.h"
22 #include "ace/TP_Reactor.h"
23 #include "ace/WFMO_Reactor.h"
24 #include "ace/Dev_Poll_Reactor.h"
25 #include "ace/Svc_Handler.h"
26 #include "ace/SOCK_Stream.h"
27 #include "ace/Get_Opt.h"
29 #include "ace/Connector.h"
30 #include "ace/SOCK_Connector.h"
31 #include "ace/Thread_Mutex.h"
33 #if defined (ACE_HAS_THREADS)
35 static int test_select_reactor
= 1;
36 static int test_tp_reactor
= 1;
37 static int test_wfmo_reactor
= 1;
38 static int test_dev_poll_reactor
= 1;
39 static int number_of_threads
= 10;
41 static int result
= 0;
43 static const ACE_TCHAR
* hosts
[] = {
44 ACE_TEXT ("www.russiantvguide.com:80"),
45 ACE_TEXT ("news.bbc.co.uk:80"),
46 ACE_TEXT ("www.cnn.com:80"),
47 ACE_TEXT ("www.waca.com.au:80"),
48 ACE_TEXT ("www.uganda.co.ug:80"),
49 ACE_TEXT ("www.cs.wustl.edu:80"),
50 ACE_TEXT ("www.dre.vanderbilt.edu:80"),
51 ACE_TEXT ("www.dhm.gov.np:80"),
52 ACE_TEXT ("www.msn.com:80"),
53 ACE_TEXT ("www.presidencymaldives.gov.mv:80")
56 class Svc_Handler
: public ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
>
64 Svc_Handler (ACE_Thread_Manager
*);
68 int close (u_long flags
);
73 Svc_Handler::Svc_Handler (ACE_Thread_Manager
*)
76 this->reference_counting_policy ().value (
77 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
);
81 Svc_Handler::open (void *)
83 this->connected_
= true;
88 Svc_Handler::close (u_long
)
91 "%t close connection on handle %d.\n",
92 this->get_handle ()));
96 template<class SVC_HANDLER
>
97 class Concurrency_Strategy
:
98 public ACE_Concurrency_Strategy
<SVC_HANDLER
>
101 virtual int activate_svc_handler (SVC_HANDLER
*svc_handler
, void *arg
);
104 template<class SVC_HANDLER
> int
105 Concurrency_Strategy
<SVC_HANDLER
>::
106 activate_svc_handler (SVC_HANDLER
*svc_handler
, void *arg
)
108 // Every fourth connection fails.
109 static long count
= 0;
110 if (++count
% 4 == 0)
112 ACE_DEBUG ((LM_DEBUG
,
113 "%t connection on handle %d has artificially failed.\n",
114 svc_handler
->get_handle ()));
119 return ACE_Concurrency_Strategy
<SVC_HANDLER
>::activate_svc_handler (
124 typedef ACE_Creation_Strategy
<Svc_Handler
> CREATION_STRATEGY
;
125 typedef ACE_Connect_Strategy
<Svc_Handler
,
126 ACE_SOCK_CONNECTOR
> CONNECT_STRATEGY
;
127 typedef Concurrency_Strategy
<Svc_Handler
> CONCURRENCY_STRATEGY
;
128 typedef ACE_Strategy_Connector
<Svc_Handler
,
129 ACE_SOCK_CONNECTOR
> BASE_CONNECTOR
;
131 class Connect_Thread
: public ACE_Task_Base
134 Connect_Thread (ACE_Thread_Manager
&thread_manager
,
135 ACE_Reactor
&reactor
,
136 ACE_Thread_Mutex
&reactor_lock
)
137 : ACE_Task_Base (&thread_manager
)
139 , reactor_lock_ (reactor_lock
)
140 , threads_ (number_of_threads
)
142 // Open the connector.
143 this->base_connector_
.open (&this->reactor_
,
144 &this->cns_
, &this->cts_
, &this->cys_
);
150 ACE_Reactor
&reactor_
;
151 ACE_Thread_Mutex
&reactor_lock_
;
153 CREATION_STRATEGY cns_
;
154 CONNECT_STRATEGY cts_
;
155 CONCURRENCY_STRATEGY cys_
;
156 BASE_CONNECTOR base_connector_
;
158 ACE_Atomic_Op
<ACE_Thread_Mutex
, long> threads_
;
162 Connect_Thread::svc (void)
164 size_t const nr_names
= sizeof hosts
/ sizeof (char *);
165 ACE_INET_Addr
*addresses
= new ACE_INET_Addr
[nr_names
];
167 for (size_t i
= 0; i
< nr_names
; ++i
)
169 if (addresses
[i
].set (hosts
[i
]) != 0)
177 ACE_INET_Addr local_addr
;
178 ACE_Synch_Options
synch_options (ACE_Synch_Options::USE_REACTOR
);
180 for (size_t i
= 0; i
< nr_names
; ++i
)
182 ACE_INET_Addr
&remote_addr
= addresses
[i
];
184 ACE_DEBUG ((LM_DEBUG
,
185 "%t connecting to %s...\n",
188 // Create a new handler. It's what begin_connection() in TAO does.
189 Svc_Handler
*svc_handler
= 0;
190 result
= this->base_connector_
.connect (svc_handler
,
194 ACE_Event_Handler_var
release_guard (svc_handler
);
196 // Complete connection. It's what complete_connection() in TAO does.
197 // Not exactly but for the test it's enough.
198 int run_for_wfmo
= 5;
199 while (this->reactor_
.work_pending () && run_for_wfmo
)
201 ACE_GUARD_RETURN (ACE_Thread_Mutex
, ace_mon
,
202 this->reactor_lock_
, -1);
204 if (this->reactor_
.work_pending () && run_for_wfmo
)
208 ACE_DEBUG ((LM_DEBUG
,
209 "%t waiting for connection to %s...\n",
212 this->reactor_
.owner (ACE_Thread::self ());
214 this->reactor_
.handle_events ();
218 if (svc_handler
->connected_
)
220 this->reactor_
.register_handler (svc_handler
,
221 ACE_Event_Handler::READ_MASK
);
223 ACE_DEBUG ((LM_DEBUG
,
224 "%t connection for %s completed on handle %d.\n",
226 svc_handler
->get_handle ()));
227 // We are connected successfully. Close the connection.
228 this->reactor_
.remove_handler (svc_handler
->get_handle (),
229 ACE_Event_Handler::ALL_EVENTS_MASK
|
230 ACE_Event_Handler::DONT_CALL
);
234 // Connection didn't succeed. svc_handler will be deleted when
235 // we go out of the scope of this loop.
236 ACE_DEBUG ((LM_DEBUG
,
237 "%t connection for %s did not complete.\n",
242 if (--this->threads_
== 0)
244 // Close the connector.
245 this->base_connector_
.close ();
254 test (ACE_Reactor_Impl
*reactor_impl
)
256 ACE_Reactor
reactor (reactor_impl
, true);
258 ACE_Thread_Mutex reactor_lock
;
259 ACE_Thread_Manager thread_manager
;
261 ACE_DEBUG ((LM_DEBUG
,
262 "Starting %d connection threads...\n",
265 Connect_Thread
connect_thread (thread_manager
, reactor
, reactor_lock
);
266 result
= connect_thread
.activate (THR_NEW_LWP
|THR_JOINABLE
,
268 ACE_TEST_ASSERT (result
== 0);
270 // Wait for threads to exit.
271 result
= thread_manager
.wait ();
272 ACE_TEST_ASSERT (result
== 0);
276 parse_args (int argc
, ACE_TCHAR
*argv
[])
278 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("a:b:c:d:f:g:k:lm:n:o:uz:"));
281 while ((cc
= get_opt ()) != -1)
286 test_select_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
289 test_tp_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
292 test_wfmo_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
295 test_dev_poll_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
298 number_of_threads
= ACE_OS::atoi (get_opt
.opt_arg ());
301 debug
= ACE_OS::atoi (get_opt
.opt_arg ());
305 ACE_ERROR ((LM_ERROR
,
306 ACE_TEXT ("\nusage: %s \n\n")
307 ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n")
308 ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n")
309 ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n")
310 ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n")
311 ACE_TEXT ("\t[-f number of threads] (defaults to %d)\n")
312 ACE_TEXT ("\t[-z debug] (defaults to %d)\n")
318 test_dev_poll_reactor
,
329 run_main (int argc
, ACE_TCHAR
*argv
[])
331 ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
334 result
= parse_args (argc
, argv
);
338 if (test_select_reactor
)
340 ACE_DEBUG ((LM_DEBUG
,
341 ACE_TEXT ("\n\n(%t) Testing Select Reactor....\n\n")));
343 test (new ACE_Select_Reactor
);
348 ACE_DEBUG ((LM_DEBUG
,
349 ACE_TEXT ("\n\n(%t) Testing TP Reactor....\n\n")));
351 test (new ACE_TP_Reactor
);
354 #if defined (ACE_HAS_EVENT_POLL)
356 if (test_dev_poll_reactor
)
358 ACE_DEBUG ((LM_DEBUG
,
359 ACE_TEXT ("\n\n(%t) Testing Dev Poll Reactor....\n\n")));
361 test (new ACE_Dev_Poll_Reactor
);
366 #if defined (ACE_WIN32)
368 if (test_wfmo_reactor
)
370 ACE_DEBUG ((LM_DEBUG
,
371 ACE_TEXT ("\n\n(%t) Testing WFMO Reactor....\n\n")));
373 test (new ACE_WFMO_Reactor
);
376 #endif /* ACE_WIN32 */
383 #else /* ACE_HAS_THREADS */
386 run_main (int, ACE_TCHAR
*[])
388 ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
391 ACE_TEXT ("threads not supported on this platform\n")));
398 #endif /* ACE_HAS_THREADS */