Merge pull request #2301 from sonndinh/remove-dup-reactor-functions
[ACE_TAO.git] / TAO / tests / Oneway_Timeouts / client.cpp
blob69ef4e594598449989c42e4535f8b18496ff06c5
1 #include "TestS.h"
2 #include "tao/Strategies/advanced_resource.h"
3 #include "tao/Messaging/Messaging.h"
4 #include "tao/AnyTypeCode/TAOA.h"
5 #include "tao/AnyTypeCode/Any.h"
7 #include "ace/streams.h"
8 #include "ace/High_Res_Timer.h"
9 #include "ace/Arg_Shifter.h"
11 #include <cmath>
13 using namespace CORBA;
14 using namespace PortableServer;
16 namespace
18 const ACE_TCHAR *non_existent_ior = ACE_TEXT("corbaloc:iiop:1.2@63.246.9.65:12345/test");
19 const int TIME_THRESHOLD = 100; //ms
21 int request_timeout = 0;
22 Messaging::SyncScope sync_scope;
23 bool use_buf_constraints = false;
24 bool use_sync_scope = false;
25 int bc_mode = TAO::BUFFER_FLUSH;
26 int bc_count = 0;
27 int bc_bytes = 0;
28 int bc_timeout = 0;
29 int num_requests = 10;
30 int request_interval = 50;
31 int connect_timeout = 0;
32 int run_orb_delay = 0;
33 int run_orb_time = 0;
34 bool force_timeout = false;
35 // This will force a blocking connection before starting the test
36 // by sending the num_requests as a twoway.
37 bool force_connect = false;
38 bool use_sleep = false;
39 unsigned int max_request_time = 0;
40 bool use_twoway = false;
41 bool retry_transients = false;
42 bool retry_timeouts = false;
43 bool make_request_queued = false;
44 const ACE_TCHAR *server_ior = ACE_TEXT ("file://test.ior");
46 void print_usage (const ACE_TCHAR *argv0)
48 ACE_ERROR ((LM_ERROR,
49 "%s [-request_timeout ms=0] "
50 "[-connect_timeout ms=0] "
51 "[-request_interval ms=100] "
52 "[-run_orb_delay ms=0] "
53 "[-run_orb_time ms=0] "
54 "[-max_request_time ms=0] "
55 "[-num_requests n=10] "
56 "[-use_twoway] "
57 "[-retry_transients] "
58 "[-retry_timeouts] "
59 "[-use_sleep] "
60 "[-force_timeout] "
61 "[-force_connect] "
62 "[-buffer_count n=0]"
63 "[-buffer_bytes n=0] "
64 "[-buffer_timeout ms=0] "
65 "[-sync delayed|none] "
66 "[-make_request_queued] "
67 "[-server_ior <ior>]\n",
68 argv0));
71 bool parse_command_line (int ac, ACE_TCHAR *av[])
73 ACE_Arg_Shifter args (ac, av);
74 args.consume_arg ();
76 while (args.is_anything_left ())
78 if (args.cur_arg_strncasecmp (ACE_TEXT("-request_timeout")) == 0)
80 args.consume_arg ();
81 request_timeout = ACE_OS::atoi (args.get_current ());
82 args.consume_arg ();
84 else if (args.cur_arg_strncasecmp (ACE_TEXT("-connect_timeout")) == 0)
86 args.consume_arg ();
87 connect_timeout = ACE_OS::atoi (args.get_current ());
88 args.consume_arg ();
90 else if (args.cur_arg_strncasecmp (ACE_TEXT("-request_interval")) == 0)
92 args.consume_arg ();
93 request_interval = ACE_OS::atoi (args.get_current ());
94 args.consume_arg ();
96 else if (args.cur_arg_strncasecmp (ACE_TEXT("-run_orb_delay")) == 0)
98 args.consume_arg ();
99 run_orb_delay = ACE_OS::atoi (args.get_current ());
100 args.consume_arg ();
102 else if (args.cur_arg_strncasecmp (ACE_TEXT("-run_orb_time")) == 0)
104 args.consume_arg ();
105 run_orb_time = ACE_OS::atoi(args.get_current ());
106 args.consume_arg ();
108 else if (args.cur_arg_strncasecmp (ACE_TEXT("-max_request_time")) == 0)
110 args.consume_arg ();
111 max_request_time = ACE_OS::atoi (args.get_current ());
112 args.consume_arg ();
114 else if (args.cur_arg_strncasecmp (ACE_TEXT("-num_requests")) == 0)
116 args.consume_arg ();
117 num_requests = ACE_OS::atoi (args.get_current ());
118 args.consume_arg ();
120 else if (args.cur_arg_strncasecmp (ACE_TEXT("-use_twoway")) == 0)
122 use_twoway = true;
123 args.consume_arg ();
125 else if (args.cur_arg_strncasecmp (ACE_TEXT("-retry_transients")) == 0)
127 retry_transients = true;
128 args.consume_arg ();
130 else if (args.cur_arg_strncasecmp (ACE_TEXT("-retry_timeouts")) == 0)
132 retry_timeouts = true;
133 args.consume_arg ();
135 else if (args.cur_arg_strncasecmp (ACE_TEXT("-use_sleep")) == 0)
137 use_sleep = true;
138 args.consume_arg ();
140 else if (args.cur_arg_strncasecmp (ACE_TEXT("-force_timeout")) == 0)
142 force_timeout = true;
143 args.consume_arg ();
145 else if (args.cur_arg_strncasecmp (ACE_TEXT("-force_connect")) == 0)
147 force_connect = true;
148 args.consume_arg ();
150 else if (args.cur_arg_strncasecmp (ACE_TEXT("-buffer_count")) == 0)
152 args.consume_arg ();
153 use_buf_constraints = true;
154 bc_count = ACE_OS::atoi (args.get_current ());
155 args.consume_arg ();
157 else if (args.cur_arg_strncasecmp (ACE_TEXT("-buffer_bytes")) == 0)
159 args.consume_arg ();
160 use_buf_constraints = true;
161 bc_bytes = ACE_OS::atoi (args.get_current ());
162 args.consume_arg ();
164 else if (args.cur_arg_strncasecmp (ACE_TEXT("-buffer_timeout")) == 0)
166 args.consume_arg ();
167 use_buf_constraints = true;
168 bc_timeout = ACE_OS::atoi (args.get_current ());
169 args.consume_arg ();
171 else if (args.cur_arg_strncasecmp (ACE_TEXT("-sync")) == 0)
173 args.consume_arg ();
174 if (args.cur_arg_strncasecmp (ACE_TEXT("delayed")) == 0)
176 sync_scope = TAO::SYNC_DELAYED_BUFFERING;
177 use_sync_scope = true;
179 else if (args.cur_arg_strncasecmp (ACE_TEXT("none")) == 0)
181 sync_scope = Messaging::SYNC_NONE;
182 use_sync_scope = true;
184 else
186 print_usage (av[0]);
187 return false;
190 args.consume_arg ();
192 else if (args.cur_arg_strncasecmp (ACE_TEXT("-make_request_queued")) == 0)
194 make_request_queued = true;
195 args.consume_arg ();
197 else if (args.cur_arg_strncasecmp (ACE_TEXT("-server_ior")) == 0)
199 args.consume_arg ();
200 server_ior = args.get_current ();
201 args.consume_arg ();
203 else
205 ACE_ERROR ((LM_ERROR, "Error: Unknown argument \"%s\"\n",
206 args.get_current ()));
207 print_usage (av[0]);
208 return false;
213 return true;
216 Tester_ptr set_request_timeout (Tester_ptr tst, ORB_ptr orb)
218 if (request_timeout <= 0)
220 return Tester::_duplicate (tst);
223 Any a;
224 a <<= static_cast<TimeBase::TimeT> (request_timeout * 10000);
225 PolicyList pols (1);
226 pols.length (1);
227 pols[0] =
228 orb->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, a);
229 Object_var obj = tst->_set_policy_overrides (pols, ADD_OVERRIDE);
230 pols[0]->destroy ();
231 return Tester::_unchecked_narrow (obj.in ());
235 void set_connect_timeout (ORB_ptr orb)
237 if (connect_timeout <= 0)
238 return;
239 Object_var obj = orb->resolve_initial_references ("PolicyCurrent");
240 PolicyCurrent_var policy_current = PolicyCurrent::_narrow (obj.in ());
241 Any a;
242 a <<= static_cast<TimeBase::TimeT> (connect_timeout * 10000);
243 PolicyList pols (1);
244 pols.length (1);
245 pols[0] = orb->create_policy (TAO::CONNECTION_TIMEOUT_POLICY_TYPE, a);
246 policy_current->set_policy_overrides (pols, ADD_OVERRIDE);
247 pols[0]->destroy ();
251 void set_buffering (ORB_ptr orb)
253 Object_var obj = orb->resolve_initial_references ("PolicyCurrent");
254 PolicyCurrent_var policy_current = PolicyCurrent::_narrow (obj.in ());
255 PolicyList pols (1);
256 pols.length (1);
258 if (use_sync_scope)
260 Any a;
261 if (make_request_queued)
262 a <<= Messaging::SYNC_NONE;
263 else
264 a <<= sync_scope;
265 pols[0] = orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, a);
266 policy_current->set_policy_overrides (pols, ADD_OVERRIDE);
267 pols[0]->destroy ();
271 if (use_buf_constraints)
273 TAO::BufferingConstraint bc;
274 if (bc_count > 0)
276 bc_mode |= TAO::BUFFER_MESSAGE_COUNT;
279 if (bc_bytes > 0)
281 bc_mode |= TAO::BUFFER_MESSAGE_BYTES;
284 if (bc_timeout > 0)
286 bc_mode |= TAO::BUFFER_TIMEOUT;
289 bc.mode = bc_mode;
290 bc.message_count = bc_count;
291 bc.message_bytes = bc_bytes;
292 bc.timeout = static_cast<TimeBase::TimeT> (bc_timeout * 10000);
293 Any a;
294 a <<= bc;
295 pols[0] =
296 orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE, a);
297 policy_current->set_policy_overrides (pols, ADD_OVERRIDE);
298 pols[0]->destroy ();
304 void reset_buffering (ORB_ptr orb)
306 Object_var obj = orb->resolve_initial_references ("PolicyCurrent");
307 PolicyCurrent_var policy_current = PolicyCurrent::_narrow (obj.in ());
308 PolicyList pols (1);
309 pols.length (1);
311 Any a;
312 a <<= sync_scope;
313 pols[0] = orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, a);
314 policy_current->set_policy_overrides (pols, ADD_OVERRIDE);
315 pols[0]->destroy ();
321 int ACE_TMAIN (int ac, ACE_TCHAR *av[])
323 ACE_Time_Value before = ACE_High_Res_Timer::gettimeofday_hr ();
325 int num_requests_sent = 0;
329 ORB_var orb = ORB_init (ac, av);
331 if (!parse_command_line (ac, av))
333 return 1;
336 set_connect_timeout (orb.in ());
337 set_buffering (orb.in ());
339 ACE_TString ior (server_ior);
340 if (force_timeout)
342 ior = non_existent_ior;
345 Object_var obj = orb->string_to_object (ior.c_str ());
347 ACE_ASSERT (! is_nil (obj.in ()));
349 Tester_var tmp_tester;
350 if (force_connect)
352 tmp_tester = Tester::_narrow (obj.in ());
353 tmp_tester->test2 (-2);
354 ACE_DEBUG ((LM_DEBUG, "Connected...\n"));
356 else
357 tmp_tester = Tester::_unchecked_narrow (obj.in ());
359 Tester_var tester = set_request_timeout (tmp_tester.in (), orb.in ());
361 ACE_ASSERT (! is_nil (tester.in ()));
363 Long i = 0;
365 // Using make_request_queued option to work around test failure due to
366 // different connection establishment behavior between OS. Some system
367 // can connect immediately and some may take longer time. With the flag on,
368 // the test sets the SYNC_NONE scope and sends a request so the transport
369 // queue is not empty for some SYNC_DELAYED_BUFFERING test case and hence
370 // the requests are all queued and will be received by server continuously
371 // during a short period.
372 if (make_request_queued)
374 //Send this message while using SYNC_NONE.
375 //This would leave the request in transport queue.
376 tester->test (-3);
377 //Set to SYNC_DELAYED_BUFFERING.
378 //The requests will be queued since queue is not
379 //empty.
380 reset_buffering (orb.in ());
383 for (; i < num_requests; ++i)
385 before = ACE_High_Res_Timer::gettimeofday_hr ();
388 if (use_twoway)
390 tester->test2 (i);
392 else
394 tester->test (i);
398 catch (const CORBA::TRANSIENT&)
400 ACE_DEBUG ((LM_DEBUG,
401 "Transient exception during test () invocation %d\n",
402 i));
403 if (retry_transients)
404 ACE_DEBUG ((LM_DEBUG,"retrying\n"));
405 else
406 throw;
408 catch (const CORBA::TIMEOUT&)
410 ACE_DEBUG ((LM_DEBUG,
411 "Timeout exception during test () invocation %d\n",
412 i));
414 // This exception is expected with forced timeouts, since the
415 // IOR is invalid. Unless we want to retry the invocation
416 // go ahead and rethrow and let the outer catch deal with it.
417 // Likewise if force_timeouts is not set, then throw it anyway
418 // because the exception should not occur because these are
419 // oneways.
420 if (retry_timeouts)
421 ACE_DEBUG ((LM_DEBUG,"retrying\n"));
422 else
423 throw;
426 ++num_requests_sent;
428 ACE_Time_Value after = ACE_High_Res_Timer::gettimeofday_hr ();
429 if (max_request_time > 0 &&
430 (after - before).msec () > max_request_time)
432 ACE_DEBUG ((LM_DEBUG,
433 "note: test() took %d ms, max is %d ms\n",
434 (after - before).msec (), max_request_time));
437 ACE_DEBUG ((LM_DEBUG, "c%d\n", i));
439 if (request_interval > 0)
441 ACE_Time_Value tv (0, request_interval * 1000);
442 ACE_Time_Value done = tv +
443 ACE_High_Res_Timer::gettimeofday_hr ();
444 if (! use_sleep)
446 orb->run (tv);
448 else
450 ACE_OS::sleep (tv);
453 while (ACE_High_Res_Timer::gettimeofday_hr () < done)
455 ACE_OS::sleep (0);
460 ACE_DEBUG ((LM_DEBUG,"request loop complete\n"));
463 if (run_orb_delay > 0)
465 ACE_Time_Value tv (0, run_orb_delay * 1000);
466 ACE_OS::sleep (tv);
470 if (run_orb_time > 0)
472 ACE_Time_Value tv (0, run_orb_time * 1000);
473 orb->run (tv);
476 ACE_DEBUG ((LM_DEBUG,"Sending synch request to shutdown server\n"));
477 use_twoway = true;
478 use_sync_scope = false;
480 if (force_timeout)
482 // we have one more invocation that may time out.
483 before = ACE_High_Res_Timer::gettimeofday_hr ();
484 tester->test2 (-1);
486 else
488 // At this point, we aren't interested in the time it takes, we
489 // want the peer to shut down, so use the non-timeout reference.
490 // BUT IF THIS DOES raise a timeout, it will be reported as an
491 // error.
492 tmp_tester->test2 (-1);
495 orb->shutdown (true);
497 orb->destroy ();
499 if (force_timeout)
501 ACE_ERROR_RETURN ((LM_ERROR,
502 "Error: Connection did not time out.\n"),
506 return 0;
508 catch (const CORBA::TRANSIENT &)
510 ACE_DEBUG ((LM_DEBUG, "caught transient exception\n"));
511 if (force_timeout)
513 ACE_Time_Value after = ACE_High_Res_Timer::gettimeofday_hr ();
514 long ms = (after - before).msec ();
515 if ((use_twoway || !use_sync_scope)
516 && request_timeout > 0
517 && request_timeout < connect_timeout)
519 connect_timeout = request_timeout;
521 else if (use_sync_scope && !use_sleep)
523 if (ms > TIME_THRESHOLD)
525 ACE_DEBUG ((LM_DEBUG,
526 "note: Buffered request took %dms\n", ms));
529 ms = num_requests_sent * request_interval;
532 if (ms - connect_timeout > TIME_THRESHOLD ||
533 connect_timeout - ms > TIME_THRESHOLD)
535 ACE_DEBUG ((LM_DEBUG,
536 "note: Timeout expected in %d ms, "
537 "but took %d ms\n", connect_timeout, ms));
540 return 0;
542 else
544 ACE_ERROR_RETURN ((LM_ERROR, "Error: Unexpected\n"), 1);
547 catch (const CORBA::TIMEOUT &)
549 ACE_DEBUG ((LM_DEBUG, "caught timeout exception\n"));
550 if (force_timeout)
552 ACE_Time_Value after = ACE_High_Res_Timer::gettimeofday_hr ();
553 long ms = (after - before).msec ();
554 if ((use_twoway || !use_sync_scope)
555 && request_timeout > 0
556 && request_timeout < connect_timeout)
558 connect_timeout = request_timeout;
560 else if (use_sync_scope && !use_sleep)
562 if (ms > TIME_THRESHOLD)
564 ACE_DEBUG ((LM_DEBUG,
565 "note: Buffered request took %d ms\n",
566 ms));
569 ms = num_requests_sent * request_interval;
572 if (ms - connect_timeout > TIME_THRESHOLD ||
573 connect_timeout - ms > TIME_THRESHOLD)
575 ACE_DEBUG ((LM_DEBUG,
576 "note: Timeout expected in %d ms, "
577 "but took %d ms\n", connect_timeout, ms));
580 return 0;
582 else
584 ACE_ERROR_RETURN ((LM_ERROR, "Error: Unexpected\n"), 1);
588 catch (Exception &ex)
590 ACE_ERROR ((LM_ERROR, "client: %s\n\nLast operation took %d ms.\n",
591 ex._name(),
592 (ACE_High_Res_Timer::gettimeofday_hr () - before).msec ()));
595 return 1;