Changes to attempt to silence bcc64x
[ACE_TAO.git] / ACE / tests / MT_NonBlocking_Connect_Test.cpp
blob93a0a8706c59eae326dae698752c439ff5a279bf
2 //=============================================================================
3 /**
4 * @file MT_NonBlocking_Connect_Test.cpp
6 * This test makes non-blocking connects from multiple threads. Things are
7 * complicated by the fact that after the connection is attempted reactor's
8 * handle_events() is called and this leads to possible connection
9 * completion in the other thread. This is similar to what TAO does for
10 * oneway with SYNC_NONE sync scope policy.
11 * The following reactors are tested: Select, TP, WFMO, and Dev Poll
12 * (if enabled).
14 * @author Vladimir Zykov <vladimir.zykov@prismtech.com>
16 //=============================================================================
19 #include "test_config.h"
20 #include "ace/Reactor.h"
21 #include "ace/Select_Reactor.h"
22 #include "ace/TP_Reactor.h"
23 #include "ace/WFMO_Reactor.h"
24 #include "ace/Dev_Poll_Reactor.h"
25 #include "ace/Svc_Handler.h"
26 #include "ace/SOCK_Stream.h"
27 #include "ace/Get_Opt.h"
28 #include "ace/Task.h"
29 #include "ace/Connector.h"
30 #include "ace/SOCK_Connector.h"
31 #include "ace/Thread_Mutex.h"
33 #if defined (ACE_HAS_THREADS)
35 static int test_select_reactor = 1;
36 static int test_tp_reactor = 1;
37 static int test_wfmo_reactor = 1;
38 static int test_dev_poll_reactor = 1;
39 static int number_of_threads = 10;
40 static int debug = 0;
41 static int result = 0;
43 static const ACE_TCHAR* hosts[] = {
44 ACE_TEXT ("www.russiantvguide.com:80"),
45 ACE_TEXT ("news.bbc.co.uk:80"),
46 ACE_TEXT ("www.cnn.com:80"),
47 ACE_TEXT ("www.waca.com.au:80"),
48 ACE_TEXT ("www.uganda.co.ug:80"),
49 ACE_TEXT ("www.cs.wustl.edu:80"),
50 ACE_TEXT ("www.dre.vanderbilt.edu:80"),
51 ACE_TEXT ("www.dhm.gov.np:80"),
52 ACE_TEXT ("www.msn.com:80"),
53 ACE_TEXT ("www.presidencymaldives.gov.mv:80")
56 class Svc_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
58 public:
59 Svc_Handler ()
61 ACE_TEST_ASSERT (0);
64 Svc_Handler (ACE_Thread_Manager *);
66 int open (void *) override;
68 int close (u_long flags) override;
70 bool connected_;
73 Svc_Handler::Svc_Handler (ACE_Thread_Manager *)
74 : connected_ (false)
76 this->reference_counting_policy ().value (
77 ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
80 int
81 Svc_Handler::open (void *)
83 this->connected_ = true;
84 return 0;
87 int
88 Svc_Handler::close (u_long)
90 ACE_DEBUG ((LM_DEBUG,
91 "%t close connection on handle %d.\n",
92 this->get_handle ()));
93 return 0;
96 template<class SVC_HANDLER>
97 class Concurrency_Strategy :
98 public ACE_Concurrency_Strategy<SVC_HANDLER>
100 public:
101 int activate_svc_handler (SVC_HANDLER *svc_handler, void *arg) override;
104 template<class SVC_HANDLER> int
105 Concurrency_Strategy<SVC_HANDLER>::
106 activate_svc_handler (SVC_HANDLER *svc_handler, void *arg)
108 // Every fourth connection fails.
109 static long count = 0;
110 if (++count % 4 == 0)
112 ACE_DEBUG ((LM_DEBUG,
113 "%t connection on handle %d has artificially failed.\n",
114 svc_handler->get_handle ()));
115 return -1;
117 else
119 return ACE_Concurrency_Strategy<SVC_HANDLER>::activate_svc_handler (
120 svc_handler, arg);
124 using CREATION_STRATEGY = ACE_Creation_Strategy<Svc_Handler>;
125 using CONNECT_STRATEGY = ACE_Connect_Strategy<Svc_Handler, ACE_SOCK_Connector>;
126 using CONCURRENCY_STRATEGY = Concurrency_Strategy<Svc_Handler>;
127 using BASE_CONNECTOR = ACE_Strategy_Connector<Svc_Handler, ACE_SOCK_Connector>;
129 class Connect_Thread : public ACE_Task_Base
131 public:
132 Connect_Thread (ACE_Thread_Manager &thread_manager,
133 ACE_Reactor &reactor,
134 ACE_Thread_Mutex &reactor_lock)
135 : ACE_Task_Base (&thread_manager)
136 , reactor_ (reactor)
137 , reactor_lock_ (reactor_lock)
138 , threads_ (number_of_threads)
140 // Open the connector.
141 this->base_connector_.open (&this->reactor_,
142 &this->cns_, &this->cts_, &this->cys_);
145 int svc () override;
147 private:
148 ACE_Reactor &reactor_;
149 ACE_Thread_Mutex &reactor_lock_;
151 CREATION_STRATEGY cns_;
152 CONNECT_STRATEGY cts_;
153 CONCURRENCY_STRATEGY cys_;
154 BASE_CONNECTOR base_connector_;
156 ACE_Atomic_Op<ACE_Thread_Mutex, long> threads_;
160 Connect_Thread::svc ()
162 size_t const nr_names = sizeof hosts / sizeof (char *);
163 ACE_INET_Addr *addresses = new ACE_INET_Addr[nr_names];
165 for (size_t i = 0; i < nr_names; ++i)
167 if (addresses[i].set (hosts[i]) != 0)
169 ACE_DEBUG ((LM_INFO,
170 ACE_TEXT ("%p\n"),
171 hosts[i]));
175 ACE_INET_Addr local_addr;
176 ACE_Synch_Options synch_options (ACE_Synch_Options::USE_REACTOR);
178 for (size_t i = 0; i < nr_names; ++i)
180 ACE_INET_Addr &remote_addr = addresses[i];
182 ACE_DEBUG ((LM_DEBUG,
183 "%t connecting to %s...\n",
184 hosts[i]));
186 // Create a new handler. It's what begin_connection() in TAO does.
187 Svc_Handler *svc_handler = 0;
188 result = this->base_connector_.connect (svc_handler,
189 remote_addr,
190 synch_options,
191 local_addr);
192 ACE_Event_Handler_var release_guard (svc_handler);
194 // Complete connection. It's what complete_connection() in TAO does.
195 // Not exactly but for the test it's enough.
196 int run_for_wfmo = 5;
197 while (this->reactor_.work_pending () && run_for_wfmo)
199 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon,
200 this->reactor_lock_, -1);
202 if (this->reactor_.work_pending () && run_for_wfmo)
204 --run_for_wfmo;
206 ACE_DEBUG ((LM_DEBUG,
207 "%t waiting for connection to %s...\n",
208 hosts[i]));
210 this->reactor_.owner (ACE_Thread::self ());
212 this->reactor_.handle_events ();
216 if (svc_handler->connected_)
218 this->reactor_.register_handler (svc_handler,
219 ACE_Event_Handler::READ_MASK);
221 ACE_DEBUG ((LM_DEBUG,
222 "%t connection for %s completed on handle %d.\n",
223 hosts[i],
224 svc_handler->get_handle ()));
225 // We are connected successfully. Close the connection.
226 this->reactor_.remove_handler (svc_handler->get_handle (),
227 ACE_Event_Handler::ALL_EVENTS_MASK |
228 ACE_Event_Handler::DONT_CALL);
230 else
232 // Connection didn't succeed. svc_handler will be deleted when
233 // we go out of the scope of this loop.
234 ACE_DEBUG ((LM_DEBUG,
235 "%t connection for %s did not complete.\n",
236 hosts[i]));
240 if (--this->threads_ == 0)
242 // Close the connector.
243 this->base_connector_.close ();
246 delete[] addresses;
248 return 0;
251 void
252 test (ACE_Reactor_Impl *reactor_impl)
254 ACE_Reactor reactor (reactor_impl, true);
256 ACE_Thread_Mutex reactor_lock;
257 ACE_Thread_Manager thread_manager;
259 ACE_DEBUG ((LM_DEBUG,
260 "Starting %d connection threads...\n",
261 number_of_threads));
263 Connect_Thread connect_thread (thread_manager, reactor, reactor_lock);
264 result = connect_thread.activate (THR_NEW_LWP|THR_JOINABLE,
265 number_of_threads);
266 ACE_TEST_ASSERT (result == 0);
268 // Wait for threads to exit.
269 result = thread_manager.wait ();
270 ACE_TEST_ASSERT (result == 0);
273 static int
274 parse_args (int argc, ACE_TCHAR *argv[])
276 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("a:b:c:d:f:g:k:lm:n:o:uz:"));
278 int cc;
279 while ((cc = get_opt ()) != -1)
281 switch (cc)
283 case 'a':
284 test_select_reactor = ACE_OS::atoi (get_opt.opt_arg ());
285 break;
286 case 'b':
287 test_tp_reactor = ACE_OS::atoi (get_opt.opt_arg ());
288 break;
289 case 'c':
290 test_wfmo_reactor = ACE_OS::atoi (get_opt.opt_arg ());
291 break;
292 case 'd':
293 test_dev_poll_reactor = ACE_OS::atoi (get_opt.opt_arg ());
294 break;
295 case 'f':
296 number_of_threads = ACE_OS::atoi (get_opt.opt_arg ());
297 break;
298 case 'z':
299 debug = ACE_OS::atoi (get_opt.opt_arg ());
300 break;
301 case 'u':
302 default:
303 ACE_ERROR ((LM_ERROR,
304 ACE_TEXT ("\nusage: %s \n\n")
305 ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n")
306 ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n")
307 ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n")
308 ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n")
309 ACE_TEXT ("\t[-f number of threads] (defaults to %d)\n")
310 ACE_TEXT ("\t[-z debug] (defaults to %d)\n")
311 ACE_TEXT ("\n"),
312 argv[0],
313 test_select_reactor,
314 test_tp_reactor,
315 test_wfmo_reactor,
316 test_dev_poll_reactor,
317 number_of_threads,
318 debug));
319 return -1;
323 return 0;
327 run_main (int argc, ACE_TCHAR *argv[])
329 ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
331 // Validate options.
332 result = parse_args (argc, argv);
333 if (result != 0)
334 return result;
336 if (test_select_reactor)
338 ACE_DEBUG ((LM_DEBUG,
339 ACE_TEXT ("\n\n(%t) Testing Select Reactor....\n\n")));
341 test (new ACE_Select_Reactor);
344 if (test_tp_reactor)
346 ACE_DEBUG ((LM_DEBUG,
347 ACE_TEXT ("\n\n(%t) Testing TP Reactor....\n\n")));
349 test (new ACE_TP_Reactor);
352 #if defined (ACE_HAS_EVENT_POLL)
354 if (test_dev_poll_reactor)
356 ACE_DEBUG ((LM_DEBUG,
357 ACE_TEXT ("\n\n(%t) Testing Dev Poll Reactor....\n\n")));
359 test (new ACE_Dev_Poll_Reactor);
362 #endif
364 #if defined (ACE_WIN32)
366 if (test_wfmo_reactor)
368 ACE_DEBUG ((LM_DEBUG,
369 ACE_TEXT ("\n\n(%t) Testing WFMO Reactor....\n\n")));
371 test (new ACE_WFMO_Reactor);
374 #endif /* ACE_WIN32 */
376 ACE_END_TEST;
378 return result;
381 #else /* ACE_HAS_THREADS */
384 run_main (int, ACE_TCHAR *[])
386 ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
388 ACE_ERROR ((LM_INFO,
389 ACE_TEXT ("threads not supported on this platform\n")));
391 ACE_END_TEST;
393 return 0;
396 #endif /* ACE_HAS_THREADS */