Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / MT_NonBlocking_Connect_Test.cpp
blobef8a018dd66772204992e54eea0ffcd17a22d5a3
2 //=============================================================================
3 /**
4 * @file MT_NonBlocking_Connect_Test.cpp
6 * This test makes non-blocking connects from multiple threads. Things are
7 * complicated by the fact that after the connection is attempted reactor's
8 * handle_events() is called and this leads to possible connection
9 * completion in the other thread. This is similar to what TAO does for
10 * oneway with SYNC_NONE sync scope policy.
11 * The following reactors are tested: Select, TP, WFMO, and Dev Poll
12 * (if enabled).
14 * @author Vladimir Zykov <vladimir.zykov@prismtech.com>
16 //=============================================================================
19 #include "test_config.h"
20 #include "ace/Reactor.h"
21 #include "ace/Select_Reactor.h"
22 #include "ace/TP_Reactor.h"
23 #include "ace/WFMO_Reactor.h"
24 #include "ace/Dev_Poll_Reactor.h"
25 #include "ace/Svc_Handler.h"
26 #include "ace/SOCK_Stream.h"
27 #include "ace/Get_Opt.h"
28 #include "ace/Task.h"
29 #include "ace/Connector.h"
30 #include "ace/SOCK_Connector.h"
31 #include "ace/Thread_Mutex.h"
33 #if defined (ACE_HAS_THREADS)
35 static int test_select_reactor = 1;
36 static int test_tp_reactor = 1;
37 static int test_wfmo_reactor = 1;
38 static int test_dev_poll_reactor = 1;
39 static int number_of_threads = 10;
40 static int debug = 0;
41 static int result = 0;
43 static const ACE_TCHAR* hosts[] = {
44 ACE_TEXT ("www.russiantvguide.com:80"),
45 ACE_TEXT ("news.bbc.co.uk:80"),
46 ACE_TEXT ("www.cnn.com:80"),
47 ACE_TEXT ("www.waca.com.au:80"),
48 ACE_TEXT ("www.uganda.co.ug:80"),
49 ACE_TEXT ("www.cs.wustl.edu:80"),
50 ACE_TEXT ("www.dre.vanderbilt.edu:80"),
51 ACE_TEXT ("www.dhm.gov.np:80"),
52 ACE_TEXT ("www.msn.com:80"),
53 ACE_TEXT ("www.presidencymaldives.gov.mv:80")
56 class Svc_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
58 public:
59 Svc_Handler (void)
61 ACE_TEST_ASSERT (0);
64 Svc_Handler (ACE_Thread_Manager *);
66 int open (void *);
68 int close (u_long flags);
70 bool connected_;
73 Svc_Handler::Svc_Handler (ACE_Thread_Manager *)
74 : connected_ (false)
76 this->reference_counting_policy ().value (
77 ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
80 int
81 Svc_Handler::open (void *)
83 this->connected_ = true;
84 return 0;
87 int
88 Svc_Handler::close (u_long)
90 ACE_DEBUG ((LM_DEBUG,
91 "%t close connection on handle %d.\n",
92 this->get_handle ()));
93 return 0;
96 template<class SVC_HANDLER>
97 class Concurrency_Strategy :
98 public ACE_Concurrency_Strategy<SVC_HANDLER>
100 public:
101 virtual int activate_svc_handler (SVC_HANDLER *svc_handler, void *arg);
104 template<class SVC_HANDLER> int
105 Concurrency_Strategy<SVC_HANDLER>::
106 activate_svc_handler (SVC_HANDLER *svc_handler, void *arg)
108 // Every fourth connection fails.
109 static long count = 0;
110 if (++count % 4 == 0)
112 ACE_DEBUG ((LM_DEBUG,
113 "%t connection on handle %d has artificially failed.\n",
114 svc_handler->get_handle ()));
115 return -1;
117 else
119 return ACE_Concurrency_Strategy<SVC_HANDLER>::activate_svc_handler (
120 svc_handler, arg);
124 typedef ACE_Creation_Strategy<Svc_Handler> CREATION_STRATEGY;
125 typedef ACE_Connect_Strategy<Svc_Handler,
126 ACE_SOCK_CONNECTOR> CONNECT_STRATEGY;
127 typedef Concurrency_Strategy<Svc_Handler> CONCURRENCY_STRATEGY;
128 typedef ACE_Strategy_Connector<Svc_Handler,
129 ACE_SOCK_CONNECTOR> BASE_CONNECTOR;
131 class Connect_Thread : public ACE_Task_Base
133 public:
134 Connect_Thread (ACE_Thread_Manager &thread_manager,
135 ACE_Reactor &reactor,
136 ACE_Thread_Mutex &reactor_lock)
137 : ACE_Task_Base (&thread_manager)
138 , reactor_ (reactor)
139 , reactor_lock_ (reactor_lock)
140 , threads_ (number_of_threads)
142 // Open the connector.
143 this->base_connector_.open (&this->reactor_,
144 &this->cns_, &this->cts_, &this->cys_);
147 int svc (void);
149 private:
150 ACE_Reactor &reactor_;
151 ACE_Thread_Mutex &reactor_lock_;
153 CREATION_STRATEGY cns_;
154 CONNECT_STRATEGY cts_;
155 CONCURRENCY_STRATEGY cys_;
156 BASE_CONNECTOR base_connector_;
158 ACE_Atomic_Op<ACE_Thread_Mutex, long> threads_;
162 Connect_Thread::svc (void)
164 size_t const nr_names = sizeof hosts / sizeof (char *);
165 ACE_INET_Addr *addresses = new ACE_INET_Addr[nr_names];
167 for (size_t i = 0; i < nr_names; ++i)
169 if (addresses[i].set (hosts[i]) != 0)
171 ACE_DEBUG ((LM_INFO,
172 ACE_TEXT ("%p\n"),
173 hosts[i]));
177 ACE_INET_Addr local_addr;
178 ACE_Synch_Options synch_options (ACE_Synch_Options::USE_REACTOR);
180 for (size_t i = 0; i < nr_names; ++i)
182 ACE_INET_Addr &remote_addr = addresses[i];
184 ACE_DEBUG ((LM_DEBUG,
185 "%t connecting to %s...\n",
186 hosts[i]));
188 // Create a new handler. It's what begin_connection() in TAO does.
189 Svc_Handler *svc_handler = 0;
190 result = this->base_connector_.connect (svc_handler,
191 remote_addr,
192 synch_options,
193 local_addr);
194 ACE_Event_Handler_var release_guard (svc_handler);
196 // Complete connection. It's what complete_connection() in TAO does.
197 // Not exactly but for the test it's enough.
198 int run_for_wfmo = 5;
199 while (this->reactor_.work_pending () && run_for_wfmo)
201 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon,
202 this->reactor_lock_, -1);
204 if (this->reactor_.work_pending () && run_for_wfmo)
206 --run_for_wfmo;
208 ACE_DEBUG ((LM_DEBUG,
209 "%t waiting for connection to %s...\n",
210 hosts[i]));
212 this->reactor_.owner (ACE_Thread::self ());
214 this->reactor_.handle_events ();
218 if (svc_handler->connected_)
220 this->reactor_.register_handler (svc_handler,
221 ACE_Event_Handler::READ_MASK);
223 ACE_DEBUG ((LM_DEBUG,
224 "%t connection for %s completed on handle %d.\n",
225 hosts[i],
226 svc_handler->get_handle ()));
227 // We are connected successfully. Close the connection.
228 this->reactor_.remove_handler (svc_handler->get_handle (),
229 ACE_Event_Handler::ALL_EVENTS_MASK |
230 ACE_Event_Handler::DONT_CALL);
232 else
234 // Connection didn't succeed. svc_handler will be deleted when
235 // we go out of the scope of this loop.
236 ACE_DEBUG ((LM_DEBUG,
237 "%t connection for %s did not complete.\n",
238 hosts[i]));
242 if (--this->threads_ == 0)
244 // Close the connector.
245 this->base_connector_.close ();
248 delete[] addresses;
250 return 0;
253 void
254 test (ACE_Reactor_Impl *reactor_impl)
256 ACE_Reactor reactor (reactor_impl, true);
258 ACE_Thread_Mutex reactor_lock;
259 ACE_Thread_Manager thread_manager;
261 ACE_DEBUG ((LM_DEBUG,
262 "Starting %d connection threads...\n",
263 number_of_threads));
265 Connect_Thread connect_thread (thread_manager, reactor, reactor_lock);
266 result = connect_thread.activate (THR_NEW_LWP|THR_JOINABLE,
267 number_of_threads);
268 ACE_TEST_ASSERT (result == 0);
270 // Wait for threads to exit.
271 result = thread_manager.wait ();
272 ACE_TEST_ASSERT (result == 0);
275 static int
276 parse_args (int argc, ACE_TCHAR *argv[])
278 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("a:b:c:d:f:g:k:lm:n:o:uz:"));
280 int cc;
281 while ((cc = get_opt ()) != -1)
283 switch (cc)
285 case 'a':
286 test_select_reactor = ACE_OS::atoi (get_opt.opt_arg ());
287 break;
288 case 'b':
289 test_tp_reactor = ACE_OS::atoi (get_opt.opt_arg ());
290 break;
291 case 'c':
292 test_wfmo_reactor = ACE_OS::atoi (get_opt.opt_arg ());
293 break;
294 case 'd':
295 test_dev_poll_reactor = ACE_OS::atoi (get_opt.opt_arg ());
296 break;
297 case 'f':
298 number_of_threads = ACE_OS::atoi (get_opt.opt_arg ());
299 break;
300 case 'z':
301 debug = ACE_OS::atoi (get_opt.opt_arg ());
302 break;
303 case 'u':
304 default:
305 ACE_ERROR ((LM_ERROR,
306 ACE_TEXT ("\nusage: %s \n\n")
307 ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n")
308 ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n")
309 ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n")
310 ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n")
311 ACE_TEXT ("\t[-f number of threads] (defaults to %d)\n")
312 ACE_TEXT ("\t[-z debug] (defaults to %d)\n")
313 ACE_TEXT ("\n"),
314 argv[0],
315 test_select_reactor,
316 test_tp_reactor,
317 test_wfmo_reactor,
318 test_dev_poll_reactor,
319 number_of_threads,
320 debug));
321 return -1;
325 return 0;
329 run_main (int argc, ACE_TCHAR *argv[])
331 ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
333 // Validate options.
334 result = parse_args (argc, argv);
335 if (result != 0)
336 return result;
338 if (test_select_reactor)
340 ACE_DEBUG ((LM_DEBUG,
341 ACE_TEXT ("\n\n(%t) Testing Select Reactor....\n\n")));
343 test (new ACE_Select_Reactor);
346 if (test_tp_reactor)
348 ACE_DEBUG ((LM_DEBUG,
349 ACE_TEXT ("\n\n(%t) Testing TP Reactor....\n\n")));
351 test (new ACE_TP_Reactor);
354 #if defined (ACE_HAS_EVENT_POLL)
356 if (test_dev_poll_reactor)
358 ACE_DEBUG ((LM_DEBUG,
359 ACE_TEXT ("\n\n(%t) Testing Dev Poll Reactor....\n\n")));
361 test (new ACE_Dev_Poll_Reactor);
364 #endif
366 #if defined (ACE_WIN32)
368 if (test_wfmo_reactor)
370 ACE_DEBUG ((LM_DEBUG,
371 ACE_TEXT ("\n\n(%t) Testing WFMO Reactor....\n\n")));
373 test (new ACE_WFMO_Reactor);
376 #endif /* ACE_WIN32 */
378 ACE_END_TEST;
380 return result;
383 #else /* ACE_HAS_THREADS */
386 run_main (int, ACE_TCHAR *[])
388 ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
390 ACE_ERROR ((LM_INFO,
391 ACE_TEXT ("threads not supported on this platform\n")));
393 ACE_END_TEST;
395 return 0;
398 #endif /* ACE_HAS_THREADS */