1 #include "Reply_Handler.h"
2 #include "Client_Task.h"
3 #include "tao/Messaging/Messaging.h"
5 #include "tao/AnyTypeCode/TAOA.h"
6 #include "tao/AnyTypeCode/Any.h"
7 #include "ace/Get_Opt.h"
9 const ACE_TCHAR
*server_ior
= ACE_TEXT("file://server.ior");
10 const ACE_TCHAR
*admin_ior
= ACE_TEXT("file://admin.ior");
13 int run_message_count_test
= 0;
14 int run_timeout_test
= 0;
15 int run_timeout_reactive_test
= 0;
16 int run_buffer_size_test
= 0;
18 const int PAYLOAD_LENGTH
= 1024;
19 const int BUFFERED_MESSAGES_COUNT
= 10;
20 const unsigned int TIMEOUT_MILLISECONDS
= 50;
21 const int BUFFER_SIZE
= 10 * PAYLOAD_LENGTH
;
23 /// Allow a larger timeout to occur due to scheduler differences
24 const unsigned int TIMEOUT_TOLERANCE
= 4 * TIMEOUT_MILLISECONDS
;
26 /// Check that no more than 10% of the messages are not sent.
27 const double LIVENESS_TOLERANCE
= 0.9;
29 /// Limit the depth of the liveness test, avoid blowing up the stack
31 const int LIVENESS_MAX_DEPTH
= 256;
33 /// Factor in GIOP overhead in the buffer size test
34 const double GIOP_OVERHEAD
= 0.9;
37 parse_args (int argc
, ACE_TCHAR
*argv
[])
39 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("k:a:i:ctbr"));
42 while ((c
= get_opts ()) != -1)
46 server_ior
= get_opts
.opt_arg ();
50 admin_ior
= get_opts
.opt_arg ();
54 iterations
= ACE_OS::atoi (get_opts
.opt_arg ());
58 run_message_count_test
= 1;
66 run_buffer_size_test
= 1;
70 run_timeout_reactive_test
= 1;
75 ACE_ERROR_RETURN ((LM_ERROR
,
85 // Indicates successful parsing of the command line
90 run_message_count (CORBA::ORB_ptr orb
,
91 PortableServer::POA_ptr root_poa
,
92 Test::AMI_Buffering_ptr ami_buffering
,
93 Test::AMI_Buffering_Admin_ptr ami_buffering_admin
);
95 run_timeout (CORBA::ORB_ptr orb
,
96 PortableServer::POA_ptr root_poa
,
97 Test::AMI_Buffering_ptr ami_buffering
,
98 Test::AMI_Buffering_Admin_ptr ami_buffering_admin
);
101 run_timeout_reactive (CORBA::ORB_ptr orb
,
102 PortableServer::POA_ptr root_poa
,
103 Test::AMI_Buffering_ptr oneway_buffering
,
104 Test::AMI_Buffering_Admin_ptr oneway_buffering_admin
);
107 run_buffer_size (CORBA::ORB_ptr orb
,
108 PortableServer::POA_ptr root_poa
,
109 Test::AMI_Buffering_ptr ami_buffering
,
110 Test::AMI_Buffering_Admin_ptr ami_buffering_admin
);
113 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
119 CORBA::ORB_init (argc
, argv
);
121 CORBA::Object_var poa_object
=
122 orb
->resolve_initial_references("RootPOA");
124 PortableServer::POA_var root_poa
=
125 PortableServer::POA::_narrow (poa_object
.in ());
127 if (CORBA::is_nil (root_poa
.in ()))
128 ACE_ERROR_RETURN ((LM_ERROR
,
129 " (%P|%t) Panic: nil RootPOA\n"),
132 PortableServer::POAManager_var poa_manager
=
133 root_poa
->the_POAManager ();
135 poa_manager
->activate ();
137 if (parse_args (argc
, argv
) != 0)
140 CORBA::Object_var tmp
=
141 orb
->string_to_object(server_ior
);
143 Test::AMI_Buffering_var ami_buffering
=
144 Test::AMI_Buffering::_narrow(tmp
.in ());
146 if (CORBA::is_nil (ami_buffering
.in ()))
148 ACE_ERROR_RETURN ((LM_DEBUG
,
149 "Nil Test::AMI_Buffering reference <%s>\n",
155 orb
->string_to_object(admin_ior
);
157 Test::AMI_Buffering_Admin_var ami_buffering_admin
=
158 Test::AMI_Buffering_Admin::_narrow(tmp
.in ());
160 if (CORBA::is_nil (ami_buffering_admin
.in ()))
162 ACE_ERROR_RETURN ((LM_DEBUG
,
163 "Nil Test::AMI_Buffering_Admin reference <%s>\n",
168 Client_Task
client_task (orb
.in ());
169 if (client_task
.activate (THR_NEW_LWP
| THR_JOINABLE
) == -1)
171 ACE_ERROR ((LM_ERROR
, "Error activating client task\n"));
174 if (run_message_count_test
)
176 ACE_DEBUG ((LM_DEBUG
,
177 "Running message count flushing test\n"));
179 run_message_count (orb
.in (),
182 ami_buffering_admin
.in ());
184 else if (run_timeout_test
)
186 ACE_DEBUG ((LM_DEBUG
,
187 "Running timeout flushing test\n"));
189 run_timeout (orb
.in (),
192 ami_buffering_admin
.in ());
194 else if (run_timeout_reactive_test
)
196 ACE_DEBUG ((LM_DEBUG
,
197 "Running timeout (reactive) flushing test\n"));
199 run_timeout_reactive (orb
.in (),
202 ami_buffering_admin
.in ());
204 else if (run_buffer_size_test
)
206 ACE_DEBUG ((LM_DEBUG
,
207 "Running buffer size flushing test\n"));
209 run_buffer_size (orb
.in (),
212 ami_buffering_admin
.in ());
216 ACE_ERROR ((LM_ERROR
,
217 "ERROR: No test was configured\n"));
220 client_task
.terminate_loop ();
222 client_task
.thr_mgr ()->wait ();
224 ami_buffering
->shutdown ();
226 root_poa
->destroy (true, true);
230 catch (const CORBA::Exception
& ex
)
232 ex
._tao_print_exception ("Exception caught in client:");
240 configure_policies (CORBA::ORB_ptr orb
,
241 const TAO::BufferingConstraint
&buffering_constraint
,
242 Test::AMI_Buffering_ptr ami_buffering
,
243 Test::AMI_Buffering_out flusher
)
245 CORBA::Object_var object
=
246 orb
->resolve_initial_references ("PolicyCurrent");
248 CORBA::PolicyCurrent_var policy_current
=
249 CORBA::PolicyCurrent::_narrow (object
.in ());
251 if (CORBA::is_nil (policy_current
.in ()))
253 ACE_ERROR ((LM_ERROR
, "ERROR: Nil policy current\n"));
256 CORBA::Any scope_as_any
;
257 scope_as_any
<<= Messaging::SYNC_NONE
;
259 CORBA::Any buffering_as_any
;
260 buffering_as_any
<<= buffering_constraint
;
262 CORBA::PolicyList
policies (2); policies
.length (2);
264 orb
->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE
,
267 orb
->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE
,
270 policy_current
->set_policy_overrides (policies
, CORBA::ADD_OVERRIDE
);
272 policies
[0]->destroy ();
273 policies
[1]->destroy ();
275 TAO::BufferingConstraint flush_constraint
;
276 flush_constraint
.mode
= TAO::BUFFER_FLUSH
;
277 flush_constraint
.message_count
= 0;
278 flush_constraint
.message_bytes
= 0;
279 flush_constraint
.timeout
= 0;
281 buffering_as_any
<<= flush_constraint
;
284 orb
->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE
,
288 ami_buffering
->_set_policy_overrides (policies
,
289 CORBA::ADD_OVERRIDE
);
291 policies
[0]->destroy ();
294 Test::AMI_Buffering::_narrow (object
.in ());
300 sync_server (CORBA::ORB_ptr orb
,
301 Test::AMI_Buffering_ptr flusher
)
303 // Get back in sync with the server...
307 // Drain responses from the queue
308 ACE_Time_Value
tv (0, 100000);
313 run_liveness_test (CORBA::ORB_ptr orb
,
314 Test::AMI_AMI_BufferingHandler_ptr reply_handler
,
315 Test::AMI_Buffering_ptr ami_buffering
,
316 Test::AMI_Buffering_ptr flusher
,
317 Test::AMI_Buffering_Admin_ptr ami_buffering_admin
)
319 ACE_DEBUG ((LM_DEBUG
, ".... checking for liveness\n"));
322 // Get back in sync with the server...
323 sync_server (orb
, flusher
);
325 CORBA::ULong send_count
=
326 ami_buffering_admin
->request_count ();
328 int liveness_test_iterations
= int(send_count
);
329 ACE_DEBUG ((LM_DEBUG
, " liveness_test_iterations = %d\n",
330 liveness_test_iterations
));
332 Test::Payload
payload (PAYLOAD_LENGTH
);
333 payload
.length (PAYLOAD_LENGTH
);
334 for (int j
= 0; j
!= PAYLOAD_LENGTH
; ++j
)
335 payload
[j
] = CORBA::Octet(j
% 256);
338 for (int i
= 0; i
!= liveness_test_iterations
; ++i
)
340 ami_buffering
->sendc_receive_data (reply_handler
,
344 CORBA::ULong receive_count
=
345 ami_buffering_admin
->request_count ();
347 ACE_Time_Value
tv (0, 10 * 1000);
350 // Once the system has sent enough messages we don't
351 // expect it to fall too far behind, i.e. at least 90% of the
352 // messages should be delivered....
353 CORBA::ULong expected
=
354 CORBA::ULong (LIVENESS_TOLERANCE
* send_count
);
356 if (receive_count
< expected
)
359 ACE_DEBUG ((LM_DEBUG
,
360 "DEBUG: Iteration %d "
361 "not enough messages received %u "
363 i
, receive_count
, expected
));
365 sync_server (orb
, flusher
);
368 if (depth
++ == LIVENESS_MAX_DEPTH
)
370 sync_server (orb
, flusher
);
380 run_message_count (CORBA::ORB_ptr orb
,
381 PortableServer::POA_ptr root_poa
,
382 Test::AMI_Buffering_ptr ami_buffering
,
383 Test::AMI_Buffering_Admin_ptr ami_buffering_admin
)
385 TAO::BufferingConstraint buffering_constraint
;
386 buffering_constraint
.mode
= TAO::BUFFER_MESSAGE_COUNT
;
387 buffering_constraint
.message_count
= BUFFERED_MESSAGES_COUNT
;
388 buffering_constraint
.message_bytes
= 0;
389 buffering_constraint
.timeout
= 0;
391 Test::AMI_Buffering_var flusher
;
393 configure_policies (orb
, buffering_constraint
,
394 ami_buffering
, flusher
.out ());
396 if (test_failed
!= 0)
399 Test::Payload
payload (PAYLOAD_LENGTH
);
400 payload
.length (PAYLOAD_LENGTH
);
401 for (int j
= 0; j
!= PAYLOAD_LENGTH
; ++j
)
402 payload
[j
] = CORBA::Octet(j
% 256);
404 Reply_Handler
*reply_handler_impl
= 0;
405 ACE_NEW_RETURN (reply_handler_impl
,
408 PortableServer::ServantBase_var
owner_transfer(reply_handler_impl
);
410 PortableServer::ObjectId_var id
=
411 root_poa
->activate_object (reply_handler_impl
);
413 CORBA::Object_var object_act
= root_poa
->id_to_reference (id
.in ());
415 Test::AMI_AMI_BufferingHandler_var reply_handler
=
416 Test::AMI_AMI_BufferingHandler::_narrow (object_act
.in ());
418 CORBA::ULong send_count
= 0;
419 for (int i
= 0; i
!= iterations
; ++i
)
421 sync_server (orb
, flusher
.in ());
423 CORBA::ULong initial_receive_count
=
424 ami_buffering_admin
->request_count ();
426 if (initial_receive_count
!= send_count
)
429 ACE_DEBUG ((LM_DEBUG
,
430 "DEBUG: Iteration %d message lost (%u != %u)\n",
431 i
, initial_receive_count
, send_count
));
436 ami_buffering
->sendc_receive_data (reply_handler
.in (),
440 CORBA::ULong receive_count
=
441 ami_buffering_admin
->request_count ();
443 ACE_Time_Value
tv (0, 10 * 1000);
446 CORBA::ULong iteration_count
=
447 send_count
- initial_receive_count
;
448 if (receive_count
!= initial_receive_count
)
450 if (iteration_count
< CORBA::ULong(BUFFERED_MESSAGES_COUNT
))
453 ACE_DEBUG ((LM_DEBUG
,
454 "DEBUG: Iteration %d flush before "
455 "message count reached. "
456 "Iteration count = %u, Threshold = %u\n",
458 iteration_count
, BUFFERED_MESSAGES_COUNT
));
463 if (iteration_count
> 2 * BUFFERED_MESSAGES_COUNT
)
466 ACE_DEBUG ((LM_DEBUG
,
467 "DEBUG: Iteration %d no flush past "
468 "message count threshold. "
469 "Iteration count = %u, Threshold = %u\n",
471 iteration_count
, BUFFERED_MESSAGES_COUNT
));
477 int liveness_test_failed
=
478 run_liveness_test (orb
,
482 ami_buffering_admin
);
484 if (liveness_test_failed
)
491 run_timeout (CORBA::ORB_ptr orb
,
492 PortableServer::POA_ptr root_poa
,
493 Test::AMI_Buffering_ptr ami_buffering
,
494 Test::AMI_Buffering_Admin_ptr ami_buffering_admin
)
496 TAO::BufferingConstraint buffering_constraint
;
497 buffering_constraint
.mode
= TAO::BUFFER_TIMEOUT
;
498 buffering_constraint
.message_count
= 0;
499 buffering_constraint
.message_bytes
= 0;
500 buffering_constraint
.timeout
= TIMEOUT_MILLISECONDS
* 10000;
502 Test::AMI_Buffering_var flusher
;
504 configure_policies (orb
, buffering_constraint
,
505 ami_buffering
, flusher
.out ());
507 if (test_failed
!= 0)
510 Test::Payload
payload (PAYLOAD_LENGTH
);
511 payload
.length (PAYLOAD_LENGTH
);
512 for (int j
= 0; j
!= PAYLOAD_LENGTH
; ++j
)
513 payload
[j
] = CORBA::Octet(j
% 256);
515 Reply_Handler
*reply_handler_impl
= 0;
516 ACE_NEW_RETURN (reply_handler_impl
,
519 PortableServer::ServantBase_var
owner_transfer(reply_handler_impl
);
521 PortableServer::ObjectId_var id
=
522 root_poa
->activate_object (reply_handler_impl
);
524 CORBA::Object_var object_act
= root_poa
->id_to_reference (id
.in ());
526 Test::AMI_AMI_BufferingHandler_var reply_handler
=
527 Test::AMI_AMI_BufferingHandler::_narrow (object_act
.in ());
529 CORBA::ULong send_count
= 0;
530 int retry_attempt
= 0;
531 for (int i
= 0; i
!= iterations
; ++i
)
533 sync_server (orb
, flusher
.in ());
535 CORBA::ULong initial_receive_count
=
536 ami_buffering_admin
->request_count ();
538 if (initial_receive_count
!= send_count
)
541 ACE_DEBUG ((LM_DEBUG
,
542 "DEBUG: Iteration %d message lost (%u != %u)\n",
543 i
, initial_receive_count
, send_count
));
546 ACE_Time_Value start
= ACE_OS::gettimeofday ();
549 ami_buffering
->sendc_receive_data (reply_handler
.in (),
553 CORBA::ULong receive_count
=
554 ami_buffering_admin
->request_count ();
556 ACE_Time_Value
tv (0, 10 * 1000);
559 ACE_Time_Value elapsed
= ACE_OS::gettimeofday () - start
;
560 if (receive_count
!= initial_receive_count
)
562 if (elapsed
.msec () < TIMEOUT_MILLISECONDS
)
565 ACE_DEBUG ((LM_DEBUG
,
566 "DEBUG: Iteration %d flush before "
568 "Elapsed = %d, Timeout = %d msecs\n",
570 elapsed
.msec (), TIMEOUT_MILLISECONDS
));
572 // terminate the while loop.
576 if (elapsed
.msec () > TIMEOUT_TOLERANCE
)
578 if (retry_attempt
++ < 10) {
579 ACE_DEBUG ((LM_DEBUG
, "DEBUG: Retry attempt %d beyond TIMEOUT_TOLERANCE.\n",
585 ACE_DEBUG ((LM_DEBUG
,
586 "DEBUG: Iteration %d no flush past "
587 "timeout threshold. "
588 "Elapsed = %d, Timeout = %d msecs\n",
590 elapsed
.msec (), TIMEOUT_TOLERANCE
));
596 int liveness_test_failed
=
597 run_liveness_test (orb
,
601 ami_buffering_admin
);
603 if (liveness_test_failed
)
610 run_timeout_reactive (CORBA::ORB_ptr orb
,
611 PortableServer::POA_ptr root_poa
,
612 Test::AMI_Buffering_ptr ami_buffering
,
613 Test::AMI_Buffering_Admin_ptr ami_buffering_admin
)
615 TAO::BufferingConstraint buffering_constraint
;
616 buffering_constraint
.mode
= TAO::BUFFER_TIMEOUT
;
617 buffering_constraint
.message_count
= 0;
618 buffering_constraint
.message_bytes
= 0;
619 buffering_constraint
.timeout
= TIMEOUT_MILLISECONDS
* 10000;
621 Test::AMI_Buffering_var flusher
;
623 configure_policies (orb
, buffering_constraint
,
624 ami_buffering
, flusher
.out ());
626 if (test_failed
!= 0)
629 Test::Payload
payload (PAYLOAD_LENGTH
);
630 payload
.length (PAYLOAD_LENGTH
);
631 for (int j
= 0; j
!= PAYLOAD_LENGTH
; ++j
)
632 payload
[j
] = CORBA::Octet(j
% 256);
634 Reply_Handler
*reply_handler_impl
= 0;
635 ACE_NEW_RETURN (reply_handler_impl
,
638 PortableServer::ServantBase_var
owner_transfer(reply_handler_impl
);
640 PortableServer::ObjectId_var id
=
641 root_poa
->activate_object (reply_handler_impl
);
643 CORBA::Object_var object_act
= root_poa
->id_to_reference (id
.in ());
645 Test::AMI_AMI_BufferingHandler_var reply_handler
=
646 Test::AMI_AMI_BufferingHandler::_narrow (object_act
.in ());
648 CORBA::ULong send_count
= 0;
649 int retry_attempt
= 0;
650 for (int i
= 0; i
!= iterations
; ++i
)
652 sync_server (orb
, flusher
.in ());
654 CORBA::ULong initial_receive_count
=
655 ami_buffering_admin
->request_count ();
657 if (initial_receive_count
!= send_count
)
660 ACE_DEBUG ((LM_DEBUG
,
661 "DEBUG: Iteration %d message lost (%u != %u)\n",
662 i
, initial_receive_count
, send_count
));
664 ACE_Time_Value start
= ACE_OS::gettimeofday ();
665 for (int j
= 0; j
!= 20; ++j
)
667 ami_buffering
->sendc_receive_data (reply_handler
.in (),
673 CORBA::ULong receive_count
=
674 ami_buffering_admin
->request_count ();
676 ACE_Time_Value
tv (0, 10 * 1000);
679 ACE_Time_Value elapsed
= ACE_OS::gettimeofday () - start
;
680 if (receive_count
!= initial_receive_count
)
682 if (elapsed
.msec () < TIMEOUT_MILLISECONDS
)
685 ACE_DEBUG ((LM_DEBUG
,
686 "DEBUG: Iteration %d flush before "
688 "Elapsed = %d, Timeout = %d msecs\n",
690 elapsed
.msec (), TIMEOUT_MILLISECONDS
));
692 // terminate the while loop.
696 if (elapsed
.msec () > TIMEOUT_TOLERANCE
)
698 if (retry_attempt
++ < 10) {
699 ACE_DEBUG ((LM_DEBUG
, "DEBUG: Retry attempt %d beyond TIMEOUT_TOLERANCE.\n",
705 ACE_DEBUG ((LM_DEBUG
,
706 "DEBUG: Iteration %d no flush past "
707 "timeout threshold. "
708 "Elapsed = %d, Timeout = %d msecs\n",
710 elapsed
.msec (), TIMEOUT_TOLERANCE
));
717 int liveness_test_failed
=
718 run_liveness_test (orb
,
722 ami_buffering_admin
);
724 if (liveness_test_failed
)
733 run_buffer_size (CORBA::ORB_ptr orb
,
734 PortableServer::POA_ptr root_poa
,
735 Test::AMI_Buffering_ptr ami_buffering
,
736 Test::AMI_Buffering_Admin_ptr ami_buffering_admin
)
738 TAO::BufferingConstraint buffering_constraint
;
739 buffering_constraint
.mode
= TAO::BUFFER_MESSAGE_BYTES
;
740 buffering_constraint
.message_count
= 0;
741 buffering_constraint
.message_bytes
= BUFFER_SIZE
;
742 buffering_constraint
.timeout
= 0;
744 Test::AMI_Buffering_var flusher
;
746 configure_policies (orb
, buffering_constraint
,
747 ami_buffering
, flusher
.out ());
749 if (test_failed
!= 0)
752 Test::Payload
payload (PAYLOAD_LENGTH
);
753 payload
.length (PAYLOAD_LENGTH
);
755 Reply_Handler
*reply_handler_impl
;
756 ACE_NEW_RETURN (reply_handler_impl
,
759 PortableServer::ServantBase_var
owner_transfer(reply_handler_impl
);
761 PortableServer::ObjectId_var id
=
762 root_poa
->activate_object (reply_handler_impl
);
764 CORBA::Object_var object_act
= root_poa
->id_to_reference (id
.in ());
766 Test::AMI_AMI_BufferingHandler_var reply_handler
=
767 Test::AMI_AMI_BufferingHandler::_narrow (object_act
.in ());
769 CORBA::ULong bytes_sent
= 0;
770 for (int i
= 0; i
!= iterations
; ++i
)
772 sync_server (orb
, flusher
.in ());
774 CORBA::ULong initial_bytes_received
=
775 ami_buffering_admin
->bytes_received_count ();
777 if (initial_bytes_received
!= bytes_sent
)
780 ACE_DEBUG ((LM_DEBUG
,
781 "DEBUG: Iteration %d data lost (%u != %u)\n",
782 i
, initial_bytes_received
, bytes_sent
));
787 ami_buffering
->sendc_receive_data (reply_handler
.in (),
789 bytes_sent
+= PAYLOAD_LENGTH
;
791 CORBA::ULong bytes_received
=
792 ami_buffering_admin
->bytes_received_count ();
794 ACE_Time_Value
tv (0, 10 * 1000);
797 CORBA::ULong payload_delta
=
798 bytes_sent
- initial_bytes_received
;
799 if (bytes_received
!= initial_bytes_received
)
801 // The queue has been flushed, check that enough data
802 // has been sent. The check cannot be precise because
803 // the ORB counts the GIOP message overhead, in this
804 // test we assume the overhead to be less than 10%
806 if (payload_delta
< CORBA::ULong (GIOP_OVERHEAD
* BUFFER_SIZE
))
809 ACE_DEBUG ((LM_DEBUG
,
810 "DEBUG: Iteration %d flush before "
811 "minimum buffer size was reached. "
812 "Sent = %u, Minimum buffer = %u bytes\n",
814 payload_delta
, BUFFER_SIZE
));
819 if (payload_delta
> 2 * BUFFER_SIZE
)
822 ACE_DEBUG ((LM_DEBUG
,
823 "DEBUG: Iteration %d no flush past "
824 "buffer size threshold. "
825 "Sent = %u, Minimum buffer = %u bytes\n",
827 payload_delta
, BUFFER_SIZE
));
833 int liveness_test_failed
=
834 run_liveness_test (orb
,
838 ami_buffering_admin
);
840 if (liveness_test_failed
)