2 //=============================================================================
4 * @file MT_NonBlocking_Connect_Test.cpp
6 * This test makes non-blocking connects from multiple threads. Things are
7 * complicated by the fact that after the connection is attempted reactor's
8 * handle_events() is called and this leads to possible connection
9 * completion in the other thread. This is similar to what TAO does for
10 * oneway with SYNC_NONE sync scope policy.
11 * The following reactors are tested: Select, TP, WFMO, and Dev Poll
14 * @author Vladimir Zykov <vladimir.zykov@prismtech.com>
16 //=============================================================================
19 #include "test_config.h"
20 #include "ace/Reactor.h"
21 #include "ace/Select_Reactor.h"
22 #include "ace/TP_Reactor.h"
23 #include "ace/WFMO_Reactor.h"
24 #include "ace/Dev_Poll_Reactor.h"
25 #include "ace/Svc_Handler.h"
26 #include "ace/SOCK_Stream.h"
27 #include "ace/Get_Opt.h"
29 #include "ace/Connector.h"
30 #include "ace/SOCK_Connector.h"
31 #include "ace/Thread_Mutex.h"
33 #if defined (ACE_HAS_THREADS)
35 static int test_select_reactor
= 1;
36 static int test_tp_reactor
= 1;
37 static int test_wfmo_reactor
= 1;
38 static int test_dev_poll_reactor
= 1;
39 static int number_of_threads
= 10;
41 static int result
= 0;
43 static const ACE_TCHAR
* hosts
[] = {
44 ACE_TEXT ("www.russiantvguide.com:80"),
45 ACE_TEXT ("news.bbc.co.uk:80"),
46 ACE_TEXT ("www.cnn.com:80"),
47 ACE_TEXT ("www.waca.com.au:80"),
48 ACE_TEXT ("www.uganda.co.ug:80"),
49 ACE_TEXT ("www.cs.wustl.edu:80"),
50 ACE_TEXT ("www.dre.vanderbilt.edu:80"),
51 ACE_TEXT ("www.dhm.gov.np:80"),
52 ACE_TEXT ("www.msn.com:80"),
53 ACE_TEXT ("www.presidencymaldives.gov.mv:80")
56 class Svc_Handler
: public ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
>
64 Svc_Handler (ACE_Thread_Manager
*);
66 int open (void *) override
;
68 int close (u_long flags
) override
;
73 Svc_Handler::Svc_Handler (ACE_Thread_Manager
*)
76 this->reference_counting_policy ().value (
77 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
);
81 Svc_Handler::open (void *)
83 this->connected_
= true;
88 Svc_Handler::close (u_long
)
91 "%t close connection on handle %d.\n",
92 this->get_handle ()));
96 template<class SVC_HANDLER
>
97 class Concurrency_Strategy
:
98 public ACE_Concurrency_Strategy
<SVC_HANDLER
>
101 int activate_svc_handler (SVC_HANDLER
*svc_handler
, void *arg
) override
;
104 template<class SVC_HANDLER
> int
105 Concurrency_Strategy
<SVC_HANDLER
>::
106 activate_svc_handler (SVC_HANDLER
*svc_handler
, void *arg
)
108 // Every fourth connection fails.
109 static long count
= 0;
110 if (++count
% 4 == 0)
112 ACE_DEBUG ((LM_DEBUG
,
113 "%t connection on handle %d has artificially failed.\n",
114 svc_handler
->get_handle ()));
119 return ACE_Concurrency_Strategy
<SVC_HANDLER
>::activate_svc_handler (
124 using CREATION_STRATEGY
= ACE_Creation_Strategy
<Svc_Handler
>;
125 using CONNECT_STRATEGY
= ACE_Connect_Strategy
<Svc_Handler
, ACE_SOCK_Connector
>;
126 using CONCURRENCY_STRATEGY
= Concurrency_Strategy
<Svc_Handler
>;
127 using BASE_CONNECTOR
= ACE_Strategy_Connector
<Svc_Handler
, ACE_SOCK_Connector
>;
129 class Connect_Thread
: public ACE_Task_Base
132 Connect_Thread (ACE_Thread_Manager
&thread_manager
,
133 ACE_Reactor
&reactor
,
134 ACE_Thread_Mutex
&reactor_lock
)
135 : ACE_Task_Base (&thread_manager
)
137 , reactor_lock_ (reactor_lock
)
138 , threads_ (number_of_threads
)
140 // Open the connector.
141 this->base_connector_
.open (&this->reactor_
,
142 &this->cns_
, &this->cts_
, &this->cys_
);
148 ACE_Reactor
&reactor_
;
149 ACE_Thread_Mutex
&reactor_lock_
;
151 CREATION_STRATEGY cns_
;
152 CONNECT_STRATEGY cts_
;
153 CONCURRENCY_STRATEGY cys_
;
154 BASE_CONNECTOR base_connector_
;
156 ACE_Atomic_Op
<ACE_Thread_Mutex
, long> threads_
;
160 Connect_Thread::svc ()
162 size_t const nr_names
= sizeof hosts
/ sizeof (char *);
163 ACE_INET_Addr
*addresses
= new ACE_INET_Addr
[nr_names
];
165 for (size_t i
= 0; i
< nr_names
; ++i
)
167 if (addresses
[i
].set (hosts
[i
]) != 0)
175 ACE_INET_Addr local_addr
;
176 ACE_Synch_Options
synch_options (ACE_Synch_Options::USE_REACTOR
);
178 for (size_t i
= 0; i
< nr_names
; ++i
)
180 ACE_INET_Addr
&remote_addr
= addresses
[i
];
182 ACE_DEBUG ((LM_DEBUG
,
183 "%t connecting to %s...\n",
186 // Create a new handler. It's what begin_connection() in TAO does.
187 Svc_Handler
*svc_handler
= 0;
188 result
= this->base_connector_
.connect (svc_handler
,
192 ACE_Event_Handler_var
release_guard (svc_handler
);
194 // Complete connection. It's what complete_connection() in TAO does.
195 // Not exactly but for the test it's enough.
196 int run_for_wfmo
= 5;
197 while (this->reactor_
.work_pending () && run_for_wfmo
)
199 ACE_GUARD_RETURN (ACE_Thread_Mutex
, ace_mon
,
200 this->reactor_lock_
, -1);
202 if (this->reactor_
.work_pending () && run_for_wfmo
)
206 ACE_DEBUG ((LM_DEBUG
,
207 "%t waiting for connection to %s...\n",
210 this->reactor_
.owner (ACE_Thread::self ());
212 this->reactor_
.handle_events ();
216 if (svc_handler
->connected_
)
218 this->reactor_
.register_handler (svc_handler
,
219 ACE_Event_Handler::READ_MASK
);
221 ACE_DEBUG ((LM_DEBUG
,
222 "%t connection for %s completed on handle %d.\n",
224 svc_handler
->get_handle ()));
225 // We are connected successfully. Close the connection.
226 this->reactor_
.remove_handler (svc_handler
->get_handle (),
227 ACE_Event_Handler::ALL_EVENTS_MASK
|
228 ACE_Event_Handler::DONT_CALL
);
232 // Connection didn't succeed. svc_handler will be deleted when
233 // we go out of the scope of this loop.
234 ACE_DEBUG ((LM_DEBUG
,
235 "%t connection for %s did not complete.\n",
240 if (--this->threads_
== 0)
242 // Close the connector.
243 this->base_connector_
.close ();
252 test (ACE_Reactor_Impl
*reactor_impl
)
254 ACE_Reactor
reactor (reactor_impl
, true);
256 ACE_Thread_Mutex reactor_lock
;
257 ACE_Thread_Manager thread_manager
;
259 ACE_DEBUG ((LM_DEBUG
,
260 "Starting %d connection threads...\n",
263 Connect_Thread
connect_thread (thread_manager
, reactor
, reactor_lock
);
264 result
= connect_thread
.activate (THR_NEW_LWP
|THR_JOINABLE
,
266 ACE_TEST_ASSERT (result
== 0);
268 // Wait for threads to exit.
269 result
= thread_manager
.wait ();
270 ACE_TEST_ASSERT (result
== 0);
274 parse_args (int argc
, ACE_TCHAR
*argv
[])
276 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("a:b:c:d:f:g:k:lm:n:o:uz:"));
279 while ((cc
= get_opt ()) != -1)
284 test_select_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
287 test_tp_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
290 test_wfmo_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
293 test_dev_poll_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
296 number_of_threads
= ACE_OS::atoi (get_opt
.opt_arg ());
299 debug
= ACE_OS::atoi (get_opt
.opt_arg ());
303 ACE_ERROR ((LM_ERROR
,
304 ACE_TEXT ("\nusage: %s \n\n")
305 ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n")
306 ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n")
307 ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n")
308 ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n")
309 ACE_TEXT ("\t[-f number of threads] (defaults to %d)\n")
310 ACE_TEXT ("\t[-z debug] (defaults to %d)\n")
316 test_dev_poll_reactor
,
327 run_main (int argc
, ACE_TCHAR
*argv
[])
329 ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
332 result
= parse_args (argc
, argv
);
336 if (test_select_reactor
)
338 ACE_DEBUG ((LM_DEBUG
,
339 ACE_TEXT ("\n\n(%t) Testing Select Reactor....\n\n")));
341 test (new ACE_Select_Reactor
);
346 ACE_DEBUG ((LM_DEBUG
,
347 ACE_TEXT ("\n\n(%t) Testing TP Reactor....\n\n")));
349 test (new ACE_TP_Reactor
);
352 #if defined (ACE_HAS_EVENT_POLL)
354 if (test_dev_poll_reactor
)
356 ACE_DEBUG ((LM_DEBUG
,
357 ACE_TEXT ("\n\n(%t) Testing Dev Poll Reactor....\n\n")));
359 test (new ACE_Dev_Poll_Reactor
);
364 #if defined (ACE_WIN32)
366 if (test_wfmo_reactor
)
368 ACE_DEBUG ((LM_DEBUG
,
369 ACE_TEXT ("\n\n(%t) Testing WFMO Reactor....\n\n")));
371 test (new ACE_WFMO_Reactor
);
374 #endif /* ACE_WIN32 */
381 #else /* ACE_HAS_THREADS */
384 run_main (int, ACE_TCHAR
*[])
386 ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
389 ACE_TEXT ("threads not supported on this platform\n")));
396 #endif /* ACE_HAS_THREADS */