2 #include "tao/Messaging/Messaging.h"
3 #include "tao/AnyTypeCode/Any.h"
5 #include "tao/AnyTypeCode/TAOA.h"
6 #include "ace/Get_Opt.h"
7 #include "ace/OS_NS_sys_time.h"
8 #include "ace/OS_NS_unistd.h"
10 const ACE_TCHAR
*server_ior
= ACE_TEXT("file://server.ior");
11 const ACE_TCHAR
*admin_ior
= ACE_TEXT("file://admin.ior");
14 int run_message_count_test
= 0;
15 int run_timeout_test
= 0;
16 int run_timeout_reactive_test
= 0;
17 int run_buffer_size_test
= 0;
19 const int PAYLOAD_LENGTH
= 1024;
20 const int BUFFERED_MESSAGES_COUNT
= 10;
21 const unsigned int TIMEOUT_MILLISECONDS
= 50;
22 const int BUFFER_SIZE
= 10 * PAYLOAD_LENGTH
;
24 /// Check that no more than 10% of the messages are not sent.
25 const double LIVENESS_TOLERANCE
= 0.9;
27 /// Limit the depth of the liveness test, avoid blowing up the stack
29 const int LIVENESS_MAX_DEPTH
= 256;
31 /// Factor in GIOP overhead in the buffer size test
32 const double GIOP_OVERHEAD
= 0.9;
34 const ACE_Time_Value
TRANSIENT_HOLDOFF (0, 500); // 0.5ms delay
35 const int TRANSIENT_LIMIT
= 10;
38 parse_args (int argc
, ACE_TCHAR
*argv
[])
40 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("k:a:i:ctbr"));
43 while ((c
= get_opts ()) != -1)
47 server_ior
= get_opts
.opt_arg ();
51 admin_ior
= get_opts
.opt_arg ();
55 iterations
= ACE_OS::atoi (get_opts
.opt_arg ());
59 run_message_count_test
= 1;
67 run_buffer_size_test
= 1;
71 run_timeout_reactive_test
= 1;
76 ACE_ERROR_RETURN ((LM_ERROR
,
86 // Indicates successful parsing of the command line
91 run_message_count (CORBA::ORB_ptr orb
,
92 Test::Oneway_Buffering_ptr oneway_buffering
,
93 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
);
96 run_timeout (CORBA::ORB_ptr orb
,
97 Test::Oneway_Buffering_ptr oneway_buffering
,
98 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
);
101 run_timeout_reactive (CORBA::ORB_ptr orb
,
102 Test::Oneway_Buffering_ptr oneway_buffering
,
103 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
);
106 run_buffer_size (CORBA::ORB_ptr orb
,
107 Test::Oneway_Buffering_ptr oneway_buffering
,
108 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
);
111 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
117 CORBA::ORB_init (argc
, argv
);
119 if (parse_args (argc
, argv
) != 0)
122 CORBA::Object_var tmp
=
123 orb
->string_to_object(server_ior
);
125 Test::Oneway_Buffering_var oneway_buffering
=
126 Test::Oneway_Buffering::_narrow(tmp
.in ());
128 if (CORBA::is_nil (oneway_buffering
.in ()))
130 ACE_ERROR_RETURN ((LM_DEBUG
,
131 "Nil Test::Oneway_Buffering reference <%s>\n",
137 orb
->string_to_object(admin_ior
);
139 Test::Oneway_Buffering_Admin_var oneway_buffering_admin
=
140 Test::Oneway_Buffering_Admin::_narrow(tmp
.in ());
142 if (CORBA::is_nil (oneway_buffering_admin
.in ()))
144 ACE_ERROR_RETURN ((LM_DEBUG
,
145 "Nil Test::Oneway_Buffering_Admin reference <%s>\n",
150 if (run_message_count_test
)
152 ACE_DEBUG ((LM_DEBUG
,
153 "Running message count flushing test\n"));
155 run_message_count (orb
.in (),
156 oneway_buffering
.in (),
157 oneway_buffering_admin
.in ());
159 else if (run_timeout_test
)
161 ACE_DEBUG ((LM_DEBUG
,
162 "Running timeout flushing test\n"));
164 run_timeout (orb
.in (),
165 oneway_buffering
.in (),
166 oneway_buffering_admin
.in ());
168 else if (run_timeout_reactive_test
)
170 ACE_DEBUG ((LM_DEBUG
,
171 "Running timeout (reactive) flushing test\n"));
173 run_timeout_reactive (orb
.in (),
174 oneway_buffering
.in (),
175 oneway_buffering_admin
.in ());
177 else if (run_buffer_size_test
)
179 ACE_DEBUG ((LM_DEBUG
,
180 "Running buffer size flushing test\n"));
182 run_buffer_size (orb
.in (),
183 oneway_buffering
.in (),
184 oneway_buffering_admin
.in ());
188 ACE_ERROR ((LM_ERROR
,
189 "ERROR: No test was configured\n"));
192 oneway_buffering
->shutdown ();
193 oneway_buffering_admin
->shutdown ();
197 catch (const CORBA::Exception
& ex
)
199 ACE_DEBUG ((LM_DEBUG
, "(%P) Client: "));
200 ex
._tao_print_exception ("CORBA Exception caught:");
205 ACE_DEBUG ((LM_DEBUG
, "(%P) Client caught unknown exception\n"));
213 configure_policies (CORBA::ORB_ptr orb
,
214 const TAO::BufferingConstraint
&buffering_constraint
,
215 Test::Oneway_Buffering_ptr oneway_buffering
,
216 Test::Oneway_Buffering_out flusher
)
218 CORBA::Object_var object
=
219 orb
->resolve_initial_references ("PolicyCurrent");
221 CORBA::PolicyCurrent_var policy_current
=
222 CORBA::PolicyCurrent::_narrow (object
.in ());
224 if (CORBA::is_nil (policy_current
.in ()))
226 ACE_ERROR ((LM_ERROR
, "ERROR: Nil policy current\n"));
229 CORBA::Any scope_as_any
;
230 scope_as_any
<<= Messaging::SYNC_NONE
;
232 CORBA::Any buffering_as_any
;
233 buffering_as_any
<<= buffering_constraint
;
235 CORBA::PolicyList
policies (2); policies
.length (2);
237 orb
->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE
,
240 orb
->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE
,
243 policy_current
->set_policy_overrides (policies
, CORBA::ADD_OVERRIDE
);
245 policies
[0]->destroy ();
246 policies
[1]->destroy ();
248 TAO::BufferingConstraint flush_constraint
;
249 flush_constraint
.mode
= TAO::BUFFER_FLUSH
;
250 flush_constraint
.message_count
= 0;
251 flush_constraint
.message_bytes
= 0;
252 flush_constraint
.timeout
= 0;
254 buffering_as_any
<<= flush_constraint
;
257 orb
->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE
,
261 oneway_buffering
->_set_policy_overrides (policies
,
262 CORBA::ADD_OVERRIDE
);
264 policies
[0]->destroy ();
267 Test::Oneway_Buffering::_narrow (object
.in ());
273 sync_server (Test::Oneway_Buffering_ptr flusher
)
275 // Get back in sync with the server...
276 int transient_count
= 0;
284 catch (const CORBA::TRANSIENT
&)
286 if (++transient_count
< TRANSIENT_LIMIT
)
288 if (transient_count
== TRANSIENT_LIMIT
/ 2)
290 ACE_DEBUG ((LM_DEBUG
,
291 "(%P) Client large TRANSIENTS encountered calling flush().\n"));
293 ACE_OS::sleep (TRANSIENT_HOLDOFF
);
297 throw; // Abort the message sending.
308 catch (const CORBA::TRANSIENT
&)
310 if (++transient_count
< TRANSIENT_LIMIT
)
312 if (transient_count
== TRANSIENT_LIMIT
/ 2)
314 ACE_DEBUG ((LM_DEBUG
,
315 "(%P) Client large TRANSIENTS encountered calling sync().\n"));
317 ACE_OS::sleep (TRANSIENT_HOLDOFF
);
321 throw; // Abort the message sending.
329 const Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
,
330 const CORBA::ULong expected_request_count
= static_cast<CORBA::ULong
>(0))
333 int transient_count
= 0;
338 count
= oneway_buffering_admin
->request_count (expected_request_count
);
341 catch (const CORBA::TRANSIENT
&)
343 if (++transient_count
< TRANSIENT_LIMIT
)
345 if (transient_count
== TRANSIENT_LIMIT
/ 2)
347 ACE_DEBUG ((LM_DEBUG
,
348 "(%P) Client large TRANSIENTS encountered calling request_count().\n"));
350 ACE_OS::sleep (TRANSIENT_HOLDOFF
);
354 throw; // Abort the message sending.
364 const Test::Oneway_Buffering_ptr oneway_buffering
,
365 const Test::Payload
&payload
)
367 int transient_count
= 0;
372 return oneway_buffering
->receive_data (payload
);
374 catch (const CORBA::TRANSIENT
&)
376 if (++transient_count
< TRANSIENT_LIMIT
)
378 if (transient_count
== TRANSIENT_LIMIT
/ 2)
380 ACE_DEBUG ((LM_DEBUG
,
381 "(%P) Client large TRANSIENTS encountered calling receive_data().\n"));
383 ACE_OS::sleep (TRANSIENT_HOLDOFF
);
387 throw; // Abort the message sending.
394 bytes_received_count (
395 const Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
,
396 const CORBA::ULong expected_bytes_received_count
= static_cast<CORBA::ULong
>(0))
399 int transient_count
= 0;
404 count
= oneway_buffering_admin
->bytes_received_count (expected_bytes_received_count
);
407 catch (const CORBA::TRANSIENT
&)
409 if (++transient_count
< TRANSIENT_LIMIT
)
411 if (transient_count
== TRANSIENT_LIMIT
/ 2)
413 ACE_DEBUG ((LM_DEBUG
,
414 "(%P) Client large TRANSIENTS encountered calling bytes_received_count().\n"));
416 ACE_OS::sleep (TRANSIENT_HOLDOFF
);
420 throw; // Abort the message sending.
429 run_liveness_test (CORBA::ORB_ptr orb
,
430 Test::Oneway_Buffering_ptr oneway_buffering
,
431 Test::Oneway_Buffering_ptr flusher
,
432 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
)
434 ACE_DEBUG ((LM_DEBUG
, ".... checking for liveness\n"));
437 sync_server (flusher
);
439 CORBA::ULong send_count
=
440 request_count (oneway_buffering_admin
);
442 int liveness_test_iterations
= static_cast<int> (send_count
);
444 Test::Payload
payload (PAYLOAD_LENGTH
);
445 payload
.length (PAYLOAD_LENGTH
);
446 for (int j
= 0; j
!= PAYLOAD_LENGTH
; ++j
)
447 payload
[j
] = CORBA::Octet(j
% 256);
450 for (int i
= 0; i
!= liveness_test_iterations
; ++i
)
452 receive_data (oneway_buffering
, payload
);
455 ACE_Time_Value
tv (0, 1000);
458 CORBA::ULong receive_count
=
459 request_count (oneway_buffering_admin
);
461 // Once the system has sent enough messages we don't
462 // expect it to fall too far behind, i.e. at least 90% of the
463 // messages should be delivered....
464 CORBA::ULong expected
=
465 CORBA::ULong (LIVENESS_TOLERANCE
* send_count
);
467 if (receive_count
< expected
)
470 ACE_DEBUG ((LM_DEBUG
,
471 "DEBUG: Iteration %d "
472 "not enough messages received %u "
474 i
, receive_count
, expected
));
476 sync_server (flusher
);
479 if (depth
++ == LIVENESS_MAX_DEPTH
)
481 sync_server (flusher
);
492 run_message_count (CORBA::ORB_ptr orb
,
493 Test::Oneway_Buffering_ptr oneway_buffering
,
494 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
)
496 TAO::BufferingConstraint buffering_constraint
;
497 buffering_constraint
.mode
= TAO::BUFFER_MESSAGE_COUNT
;
498 buffering_constraint
.message_count
= BUFFERED_MESSAGES_COUNT
;
499 buffering_constraint
.message_bytes
= 0;
500 buffering_constraint
.timeout
= 0;
502 Test::Oneway_Buffering_var flusher
;
504 configure_policies (orb
, buffering_constraint
,
505 oneway_buffering
, flusher
.out ());
507 if (test_failed
!= 0)
510 Test::Payload
payload (PAYLOAD_LENGTH
);
511 payload
.length (PAYLOAD_LENGTH
);
512 for (int j
= 0; j
!= PAYLOAD_LENGTH
; ++j
)
513 payload
[j
] = CORBA::Octet(j
% 256);
515 CORBA::ULong send_count
= 0;
516 for (int i
= 0; i
!= iterations
; ++i
)
518 sync_server (flusher
.in ());
520 CORBA::ULong initial_receive_count
=
521 request_count (oneway_buffering_admin
, send_count
);
523 if (initial_receive_count
!= send_count
)
526 ACE_DEBUG ((LM_DEBUG
,
527 "DEBUG: Iteration %d message lost (%u != %u)\n",
528 i
, initial_receive_count
, send_count
));
533 receive_data (oneway_buffering
, payload
);
536 ACE_Time_Value
tv (0, 1000);
539 CORBA::ULong iteration_count
=
540 send_count
- initial_receive_count
;
543 too_few
= (iteration_count
< static_cast<CORBA::ULong
> (BUFFERED_MESSAGES_COUNT
));
545 CORBA::ULong receive_count
=
547 oneway_buffering_admin
,
548 static_cast<CORBA::ULong
> (too_few
?
549 initial_receive_count
550 : (initial_receive_count
551 + static_cast<CORBA::ULong
> (BUFFERED_MESSAGES_COUNT
))));
553 if (receive_count
!= initial_receive_count
)
558 ACE_DEBUG ((LM_DEBUG
,
559 "DEBUG: Iteration %d flush before "
560 "message count reached. "
561 "Iteration count = %u, Threshold = %u\n",
563 iteration_count
, BUFFERED_MESSAGES_COUNT
));
568 if (iteration_count
> 3 * BUFFERED_MESSAGES_COUNT
)
571 ACE_DEBUG ((LM_DEBUG
,
572 "DEBUG: Iteration %d no flush past "
573 "message count threshold. "
574 "Iteration count = %u, Threshold = %u\n",
576 iteration_count
, BUFFERED_MESSAGES_COUNT
));
582 int liveness_test_failed
=
583 run_liveness_test (orb
,
586 oneway_buffering_admin
);
588 if (liveness_test_failed
)
595 run_timeout (CORBA::ORB_ptr orb
,
596 Test::Oneway_Buffering_ptr oneway_buffering
,
597 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
)
599 TAO::BufferingConstraint buffering_constraint
;
600 buffering_constraint
.mode
= TAO::BUFFER_TIMEOUT
;
601 buffering_constraint
.message_count
= 0;
602 buffering_constraint
.message_bytes
= 0;
603 buffering_constraint
.timeout
= TIMEOUT_MILLISECONDS
* 10000;
605 Test::Oneway_Buffering_var flusher
;
607 configure_policies (orb
, buffering_constraint
,
608 oneway_buffering
, flusher
.out ());
610 if (test_failed
!= 0)
613 Test::Payload
payload (PAYLOAD_LENGTH
);
614 payload
.length (PAYLOAD_LENGTH
);
615 for (int j
= 0; j
!= PAYLOAD_LENGTH
; ++j
)
616 payload
[j
] = CORBA::Octet(j
% 256);
618 CORBA::ULong send_count
= 0;
619 for (int i
= 0; i
!= iterations
; ++i
)
621 sync_server (flusher
.in ());
623 CORBA::ULong initial_receive_count
=
624 request_count (oneway_buffering_admin
, send_count
);
626 if (initial_receive_count
!= send_count
)
629 ACE_DEBUG ((LM_DEBUG
,
630 "DEBUG: Iteration %d message lost (%u != %u)\n",
631 i
, initial_receive_count
, send_count
));
634 ACE_Time_Value start
= ACE_OS::gettimeofday ();
637 receive_data (oneway_buffering
, payload
);
640 ACE_Time_Value
tv (0, 1000);
643 CORBA::ULong receive_count
=
644 request_count (oneway_buffering_admin
);
646 ACE_Time_Value elapsed
= ACE_OS::gettimeofday () - start
;
647 if (receive_count
!= initial_receive_count
)
649 if (elapsed
.msec () < TIMEOUT_MILLISECONDS
)
652 ACE_ERROR ((LM_ERROR
,
653 "ERROR: Iteration %d flush before "
655 "Elapsed = %d, Timeout = %d msecs\n",
657 elapsed
.msec (), TIMEOUT_MILLISECONDS
));
659 // terminate the while loop.
663 if (elapsed
.msec () > 3 * TIMEOUT_MILLISECONDS
)
666 ACE_ERROR ((LM_ERROR
,
667 "ERROR: Iteration %d no flush past "
668 "timeout threshold. "
669 "Elapsed = %d, Timeout = %d msecs\n",
671 elapsed
.msec (), TIMEOUT_MILLISECONDS
));
677 int liveness_test_failed
=
678 run_liveness_test (orb
, oneway_buffering
,
680 oneway_buffering_admin
);
682 if (liveness_test_failed
)
690 run_timeout_reactive (CORBA::ORB_ptr orb
,
691 Test::Oneway_Buffering_ptr oneway_buffering
,
692 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
)
694 TAO::BufferingConstraint buffering_constraint
;
695 buffering_constraint
.mode
= TAO::BUFFER_TIMEOUT
;
696 buffering_constraint
.message_count
= 0;
697 buffering_constraint
.message_bytes
= 0;
698 buffering_constraint
.timeout
= TIMEOUT_MILLISECONDS
* 10000;
700 Test::Oneway_Buffering_var flusher
;
702 configure_policies (orb
, buffering_constraint
,
703 oneway_buffering
, flusher
.out ());
705 if (test_failed
!= 0)
708 Test::Payload
payload (PAYLOAD_LENGTH
);
709 payload
.length (PAYLOAD_LENGTH
);
710 for (int j
= 0; j
!= PAYLOAD_LENGTH
; ++j
)
711 payload
[j
] = CORBA::Octet(j
% 256);
713 CORBA::ULong send_count
= 0;
714 for (int i
= 0; i
!= iterations
; ++i
)
716 sync_server (flusher
.in ());
718 CORBA::ULong initial_receive_count
=
719 request_count (oneway_buffering_admin
, send_count
);
721 if (initial_receive_count
!= send_count
)
724 ACE_DEBUG ((LM_DEBUG
,
725 "DEBUG: Iteration %d message lost (%u != %u)\n",
726 i
, initial_receive_count
, send_count
));
729 ACE_Time_Value start
= ACE_OS::gettimeofday ();
730 for (int j
= 0; j
!= 20; ++j
)
732 receive_data (oneway_buffering
, payload
);
736 ACE_Time_Value
tv (0, 1000);
741 CORBA::ULong receive_count
=
742 request_count (oneway_buffering_admin
);
744 //FUZZ: disable check_for_lack_ACE_OS
745 ACE_Time_Value
sleep (0, 10000);
747 //FUZZ: enable check_for_lack_ACE_OS
749 ACE_Time_Value elapsed
= ACE_OS::gettimeofday () - start
;
750 if (receive_count
!= initial_receive_count
)
752 if (elapsed
.msec () < TIMEOUT_MILLISECONDS
)
755 ACE_DEBUG ((LM_DEBUG
,
756 "DEBUG: Iteration %d flush before "
758 "Elapsed = %d, Timeout = %d msecs\n",
760 elapsed
.msec (), TIMEOUT_MILLISECONDS
));
762 // terminate the while loop.
766 if (elapsed
.msec () > 3 * TIMEOUT_MILLISECONDS
)
769 ACE_DEBUG ((LM_DEBUG
,
770 "DEBUG: Iteration %d no flush past "
771 "timeout threshold. "
772 "Elapsed = %d, Timeout = %d msecs\n",
774 elapsed
.msec (), TIMEOUT_MILLISECONDS
));
780 int liveness_test_failed
=
781 run_liveness_test (orb
, oneway_buffering
,
783 oneway_buffering_admin
);
785 if (liveness_test_failed
)
793 run_buffer_size (CORBA::ORB_ptr orb
,
794 Test::Oneway_Buffering_ptr oneway_buffering
,
795 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
)
797 TAO::BufferingConstraint buffering_constraint
;
798 buffering_constraint
.mode
= TAO::BUFFER_MESSAGE_BYTES
;
799 buffering_constraint
.message_count
= 0;
800 buffering_constraint
.message_bytes
= BUFFER_SIZE
;
801 buffering_constraint
.timeout
= 0;
803 Test::Oneway_Buffering_var flusher
;
805 configure_policies (orb
, buffering_constraint
,
806 oneway_buffering
, flusher
.out ());
808 if (test_failed
!= 0)
811 Test::Payload
payload (PAYLOAD_LENGTH
);
812 payload
.length (PAYLOAD_LENGTH
);
813 for (int j
= 0; j
!= PAYLOAD_LENGTH
; ++j
)
814 payload
[j
] = CORBA::Octet(j
% 256);
816 CORBA::ULong bytes_sent
= 0;
818 expected_size
= static_cast<CORBA::ULong
>(GIOP_OVERHEAD
* BUFFER_SIZE
);
820 for (int i
= 0; i
!= iterations
; ++i
)
822 sync_server (flusher
.in ());
824 CORBA::ULong initial_bytes_received
=
825 bytes_received_count (oneway_buffering_admin
, bytes_sent
);
827 if (initial_bytes_received
!= bytes_sent
)
830 ACE_DEBUG ((LM_DEBUG
,
831 "DEBUG: Iteration %d data lost (%u != %u)\n",
832 i
, initial_bytes_received
, bytes_sent
));
837 receive_data (oneway_buffering
, payload
);
838 bytes_sent
+= PAYLOAD_LENGTH
;
840 ACE_Time_Value
tv (0, 1000);
843 CORBA::ULong payload_delta
=
844 bytes_sent
- initial_bytes_received
;
846 CORBA::ULong bytes_received
=
847 bytes_received_count (oneway_buffering_admin
);
849 if (bytes_received
!= initial_bytes_received
)
851 // The queue has been flushed, check that enough data
852 // has been sent. The check cannot be precise because
853 // the ORB counts the GIOP message overhead, in this
854 // test we assume the overhead to be less than 10%
856 if (payload_delta
< expected_size
)
859 ACE_DEBUG ((LM_DEBUG
,
860 "DEBUG: Iteration %d flush before "
861 "minimum buffer size was reached. "
862 "Sent = %u, Minimum buffer = %u bytes\n",
864 payload_delta
, BUFFER_SIZE
));
869 if (payload_delta
> 3 * BUFFER_SIZE
)
872 ACE_DEBUG ((LM_DEBUG
,
873 "DEBUG: Iteration %d no flush past "
874 "buffer size threshold. "
875 "Sent = %u, Minimum buffer = %u bytes\n",
877 payload_delta
, BUFFER_SIZE
));
883 int liveness_test_failed
=
884 run_liveness_test (orb
, oneway_buffering
,
886 oneway_buffering_admin
);
888 if (liveness_test_failed
)