Merge pull request #2218 from jwillemsen/jwi-pthreadsigmask
[ACE_TAO.git] / TAO / tests / AMI_Buffering / client.cpp
blob1a0ca510390ae8b798d42ea4f7ad088864aa6d21
1 #include "Reply_Handler.h"
2 #include "Client_Task.h"
3 #include "tao/Messaging/Messaging.h"
4 #include "tao/TAOC.h"
5 #include "tao/AnyTypeCode/TAOA.h"
6 #include "tao/AnyTypeCode/Any.h"
7 #include "ace/Get_Opt.h"
9 const ACE_TCHAR *server_ior = ACE_TEXT("file://server.ior");
10 const ACE_TCHAR *admin_ior = ACE_TEXT("file://admin.ior");
11 int iterations = 20;
13 int run_message_count_test = 0;
14 int run_timeout_test = 0;
15 int run_timeout_reactive_test = 0;
16 int run_buffer_size_test = 0;
18 const int PAYLOAD_LENGTH = 1024;
19 const int BUFFERED_MESSAGES_COUNT = 10;
20 const unsigned int TIMEOUT_MILLISECONDS = 50;
21 const int BUFFER_SIZE = 10 * PAYLOAD_LENGTH;
23 /// Allow a larger timeout to occur due to scheduler differences
24 const unsigned int TIMEOUT_TOLERANCE = 4 * TIMEOUT_MILLISECONDS;
26 /// Check that no more than 10% of the messages are not sent.
27 const double LIVENESS_TOLERANCE = 0.9;
29 /// Limit the depth of the liveness test, avoid blowing up the stack
30 /// on the server
31 const int LIVENESS_MAX_DEPTH = 256;
33 /// Factor in GIOP overhead in the buffer size test
34 const double GIOP_OVERHEAD = 0.9;
36 int
37 parse_args (int argc, ACE_TCHAR *argv[])
39 ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("k:a:i:ctbr"));
40 int c;
42 while ((c = get_opts ()) != -1)
43 switch (c)
45 case 'k':
46 server_ior = get_opts.opt_arg ();
47 break;
49 case 'a':
50 admin_ior = get_opts.opt_arg ();
51 break;
53 case 'i':
54 iterations = ACE_OS::atoi (get_opts.opt_arg ());
55 break;
57 case 'c':
58 run_message_count_test = 1;
59 break;
61 case 't':
62 run_timeout_test = 1;
63 break;
65 case 'b':
66 run_buffer_size_test = 1;
67 break;
69 case 'r':
70 run_timeout_reactive_test = 1;
71 break;
73 case '?':
74 default:
75 ACE_ERROR_RETURN ((LM_ERROR,
76 "usage: %s "
77 "-k <server_ior> "
78 "-a <admin_ior> "
79 "-i <iterations> "
80 "<-c|-t|-b|-r> "
81 "\n",
82 argv [0]),
83 -1);
85 // Indicates successful parsing of the command line
86 return 0;
89 int
90 run_message_count (CORBA::ORB_ptr orb,
91 PortableServer::POA_ptr root_poa,
92 Test::AMI_Buffering_ptr ami_buffering,
93 Test::AMI_Buffering_Admin_ptr ami_buffering_admin);
94 int
95 run_timeout (CORBA::ORB_ptr orb,
96 PortableServer::POA_ptr root_poa,
97 Test::AMI_Buffering_ptr ami_buffering,
98 Test::AMI_Buffering_Admin_ptr ami_buffering_admin);
101 run_timeout_reactive (CORBA::ORB_ptr orb,
102 PortableServer::POA_ptr root_poa,
103 Test::AMI_Buffering_ptr oneway_buffering,
104 Test::AMI_Buffering_Admin_ptr oneway_buffering_admin);
107 run_buffer_size (CORBA::ORB_ptr orb,
108 PortableServer::POA_ptr root_poa,
109 Test::AMI_Buffering_ptr ami_buffering,
110 Test::AMI_Buffering_Admin_ptr ami_buffering_admin);
113 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
115 int test_failed = 0;
118 CORBA::ORB_var orb =
119 CORBA::ORB_init (argc, argv);
121 CORBA::Object_var poa_object =
122 orb->resolve_initial_references("RootPOA");
124 PortableServer::POA_var root_poa =
125 PortableServer::POA::_narrow (poa_object.in ());
127 if (CORBA::is_nil (root_poa.in ()))
128 ACE_ERROR_RETURN ((LM_ERROR,
129 " (%P|%t) Panic: nil RootPOA\n"),
132 PortableServer::POAManager_var poa_manager =
133 root_poa->the_POAManager ();
135 poa_manager->activate ();
137 if (parse_args (argc, argv) != 0)
138 return 1;
140 CORBA::Object_var tmp =
141 orb->string_to_object(server_ior);
143 Test::AMI_Buffering_var ami_buffering =
144 Test::AMI_Buffering::_narrow(tmp.in ());
146 if (CORBA::is_nil (ami_buffering.in ()))
148 ACE_ERROR_RETURN ((LM_DEBUG,
149 "Nil Test::AMI_Buffering reference <%s>\n",
150 server_ior),
154 tmp =
155 orb->string_to_object(admin_ior);
157 Test::AMI_Buffering_Admin_var ami_buffering_admin =
158 Test::AMI_Buffering_Admin::_narrow(tmp.in ());
160 if (CORBA::is_nil (ami_buffering_admin.in ()))
162 ACE_ERROR_RETURN ((LM_DEBUG,
163 "Nil Test::AMI_Buffering_Admin reference <%s>\n",
164 admin_ior),
168 Client_Task client_task (orb.in ());
169 if (client_task.activate (THR_NEW_LWP | THR_JOINABLE) == -1)
171 ACE_ERROR ((LM_ERROR, "Error activating client task\n"));
174 if (run_message_count_test)
176 ACE_DEBUG ((LM_DEBUG,
177 "Running message count flushing test\n"));
178 test_failed =
179 run_message_count (orb.in (),
180 root_poa.in (),
181 ami_buffering.in (),
182 ami_buffering_admin.in ());
184 else if (run_timeout_test)
186 ACE_DEBUG ((LM_DEBUG,
187 "Running timeout flushing test\n"));
188 test_failed =
189 run_timeout (orb.in (),
190 root_poa.in (),
191 ami_buffering.in (),
192 ami_buffering_admin.in ());
194 else if (run_timeout_reactive_test)
196 ACE_DEBUG ((LM_DEBUG,
197 "Running timeout (reactive) flushing test\n"));
198 test_failed =
199 run_timeout_reactive (orb.in (),
200 root_poa.in (),
201 ami_buffering.in (),
202 ami_buffering_admin.in ());
204 else if (run_buffer_size_test)
206 ACE_DEBUG ((LM_DEBUG,
207 "Running buffer size flushing test\n"));
208 test_failed =
209 run_buffer_size (orb.in (),
210 root_poa.in (),
211 ami_buffering.in (),
212 ami_buffering_admin.in ());
214 else
216 ACE_ERROR ((LM_ERROR,
217 "ERROR: No test was configured\n"));
220 client_task.terminate_loop ();
222 client_task.thr_mgr ()->wait ();
224 ami_buffering->shutdown ();
226 root_poa->destroy (true, true);
228 orb->destroy ();
230 catch (const CORBA::Exception& ex)
232 ex._tao_print_exception ("Exception caught in client:");
233 return 1;
236 return test_failed;
240 configure_policies (CORBA::ORB_ptr orb,
241 const TAO::BufferingConstraint &buffering_constraint,
242 Test::AMI_Buffering_ptr ami_buffering,
243 Test::AMI_Buffering_out flusher)
245 CORBA::Object_var object =
246 orb->resolve_initial_references ("PolicyCurrent");
248 CORBA::PolicyCurrent_var policy_current =
249 CORBA::PolicyCurrent::_narrow (object.in ());
251 if (CORBA::is_nil (policy_current.in ()))
253 ACE_ERROR ((LM_ERROR, "ERROR: Nil policy current\n"));
254 return 1;
256 CORBA::Any scope_as_any;
257 scope_as_any <<= Messaging::SYNC_NONE;
259 CORBA::Any buffering_as_any;
260 buffering_as_any <<= buffering_constraint;
262 CORBA::PolicyList policies (2); policies.length (2);
263 policies[0] =
264 orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
265 scope_as_any);
266 policies[1] =
267 orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE,
268 buffering_as_any);
270 policy_current->set_policy_overrides (policies, CORBA::ADD_OVERRIDE);
272 policies[0]->destroy ();
273 policies[1]->destroy ();
275 TAO::BufferingConstraint flush_constraint;
276 flush_constraint.mode = TAO::BUFFER_FLUSH;
277 flush_constraint.message_count = 0;
278 flush_constraint.message_bytes = 0;
279 flush_constraint.timeout = 0;
281 buffering_as_any <<= flush_constraint;
282 policies.length (1);
283 policies[0] =
284 orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE,
285 buffering_as_any);
287 object =
288 ami_buffering->_set_policy_overrides (policies,
289 CORBA::ADD_OVERRIDE);
291 policies[0]->destroy ();
293 flusher =
294 Test::AMI_Buffering::_narrow (object.in ());
296 return 0;
299 void
300 sync_server (CORBA::ORB_ptr orb,
301 Test::AMI_Buffering_ptr flusher)
303 // Get back in sync with the server...
304 flusher->flush ();
305 flusher->sync ();
307 // Drain responses from the queue
308 ACE_Time_Value tv (0, 100000);
309 orb->run (tv);
313 run_liveness_test (CORBA::ORB_ptr orb,
314 Test::AMI_AMI_BufferingHandler_ptr reply_handler,
315 Test::AMI_Buffering_ptr ami_buffering,
316 Test::AMI_Buffering_ptr flusher,
317 Test::AMI_Buffering_Admin_ptr ami_buffering_admin)
319 ACE_DEBUG ((LM_DEBUG, ".... checking for liveness\n"));
320 int test_failed = 0;
322 // Get back in sync with the server...
323 sync_server (orb, flusher);
325 CORBA::ULong send_count =
326 ami_buffering_admin->request_count ();
328 int liveness_test_iterations = int(send_count);
329 ACE_DEBUG ((LM_DEBUG, " liveness_test_iterations = %d\n",
330 liveness_test_iterations));
332 Test::Payload payload (PAYLOAD_LENGTH);
333 payload.length (PAYLOAD_LENGTH);
334 for (int j = 0; j != PAYLOAD_LENGTH; ++j)
335 payload[j] = CORBA::Octet(j % 256);
337 int depth = 0;
338 for (int i = 0; i != liveness_test_iterations; ++i)
340 ami_buffering->sendc_receive_data (reply_handler,
341 payload);
342 send_count++;
344 CORBA::ULong receive_count =
345 ami_buffering_admin->request_count ();
347 ACE_Time_Value tv (0, 10 * 1000);
348 orb->run (tv);
350 // Once the system has sent enough messages we don't
351 // expect it to fall too far behind, i.e. at least 90% of the
352 // messages should be delivered....
353 CORBA::ULong expected =
354 CORBA::ULong (LIVENESS_TOLERANCE * send_count);
356 if (receive_count < expected)
358 test_failed = 1;
359 ACE_DEBUG ((LM_DEBUG,
360 "DEBUG: Iteration %d "
361 "not enough messages received %u "
362 "expected %u\n",
363 i, receive_count, expected));
365 sync_server (orb, flusher);
368 if (depth++ == LIVENESS_MAX_DEPTH)
370 sync_server (orb, flusher);
372 depth = 0;
376 return test_failed;
380 run_message_count (CORBA::ORB_ptr orb,
381 PortableServer::POA_ptr root_poa,
382 Test::AMI_Buffering_ptr ami_buffering,
383 Test::AMI_Buffering_Admin_ptr ami_buffering_admin)
385 TAO::BufferingConstraint buffering_constraint;
386 buffering_constraint.mode = TAO::BUFFER_MESSAGE_COUNT;
387 buffering_constraint.message_count = BUFFERED_MESSAGES_COUNT;
388 buffering_constraint.message_bytes = 0;
389 buffering_constraint.timeout = 0;
391 Test::AMI_Buffering_var flusher;
392 int test_failed =
393 configure_policies (orb, buffering_constraint,
394 ami_buffering, flusher.out ());
396 if (test_failed != 0)
397 return test_failed;
399 Test::Payload payload (PAYLOAD_LENGTH);
400 payload.length (PAYLOAD_LENGTH);
401 for (int j = 0; j != PAYLOAD_LENGTH; ++j)
402 payload[j] = CORBA::Octet(j % 256);
404 Reply_Handler *reply_handler_impl = 0;
405 ACE_NEW_RETURN (reply_handler_impl,
406 Reply_Handler,
408 PortableServer::ServantBase_var owner_transfer(reply_handler_impl);
410 PortableServer::ObjectId_var id =
411 root_poa->activate_object (reply_handler_impl);
413 CORBA::Object_var object_act = root_poa->id_to_reference (id.in ());
415 Test::AMI_AMI_BufferingHandler_var reply_handler =
416 Test::AMI_AMI_BufferingHandler::_narrow (object_act.in ());
418 CORBA::ULong send_count = 0;
419 for (int i = 0; i != iterations; ++i)
421 sync_server (orb, flusher.in ());
423 CORBA::ULong initial_receive_count =
424 ami_buffering_admin->request_count ();
426 if (initial_receive_count != send_count)
428 test_failed = 1;
429 ACE_DEBUG ((LM_DEBUG,
430 "DEBUG: Iteration %d message lost (%u != %u)\n",
431 i, initial_receive_count, send_count));
434 while (1)
436 ami_buffering->sendc_receive_data (reply_handler.in (),
437 payload);
438 send_count++;
440 CORBA::ULong receive_count =
441 ami_buffering_admin->request_count ();
443 ACE_Time_Value tv (0, 10 * 1000);
444 orb->run (tv);
446 CORBA::ULong iteration_count =
447 send_count - initial_receive_count;
448 if (receive_count != initial_receive_count)
450 if (iteration_count < CORBA::ULong(BUFFERED_MESSAGES_COUNT))
452 test_failed = 1;
453 ACE_DEBUG ((LM_DEBUG,
454 "DEBUG: Iteration %d flush before "
455 "message count reached. "
456 "Iteration count = %u, Threshold = %u\n",
458 iteration_count, BUFFERED_MESSAGES_COUNT));
460 break;
463 if (iteration_count > 2 * BUFFERED_MESSAGES_COUNT)
465 test_failed = 1;
466 ACE_DEBUG ((LM_DEBUG,
467 "DEBUG: Iteration %d no flush past "
468 "message count threshold. "
469 "Iteration count = %u, Threshold = %u\n",
471 iteration_count, BUFFERED_MESSAGES_COUNT));
472 break;
477 int liveness_test_failed =
478 run_liveness_test (orb,
479 reply_handler.in (),
480 ami_buffering,
481 flusher.in (),
482 ami_buffering_admin);
484 if (liveness_test_failed)
485 test_failed = 1;
487 return test_failed;
491 run_timeout (CORBA::ORB_ptr orb,
492 PortableServer::POA_ptr root_poa,
493 Test::AMI_Buffering_ptr ami_buffering,
494 Test::AMI_Buffering_Admin_ptr ami_buffering_admin)
496 TAO::BufferingConstraint buffering_constraint;
497 buffering_constraint.mode = TAO::BUFFER_TIMEOUT;
498 buffering_constraint.message_count = 0;
499 buffering_constraint.message_bytes = 0;
500 buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000;
502 Test::AMI_Buffering_var flusher;
503 int test_failed =
504 configure_policies (orb, buffering_constraint,
505 ami_buffering, flusher.out ());
507 if (test_failed != 0)
508 return test_failed;
510 Test::Payload payload (PAYLOAD_LENGTH);
511 payload.length (PAYLOAD_LENGTH);
512 for (int j = 0; j != PAYLOAD_LENGTH; ++j)
513 payload[j] = CORBA::Octet(j % 256);
515 Reply_Handler *reply_handler_impl = 0;
516 ACE_NEW_RETURN (reply_handler_impl,
517 Reply_Handler,
519 PortableServer::ServantBase_var owner_transfer(reply_handler_impl);
521 PortableServer::ObjectId_var id =
522 root_poa->activate_object (reply_handler_impl);
524 CORBA::Object_var object_act = root_poa->id_to_reference (id.in ());
526 Test::AMI_AMI_BufferingHandler_var reply_handler =
527 Test::AMI_AMI_BufferingHandler::_narrow (object_act.in ());
529 CORBA::ULong send_count = 0;
530 int retry_attempt = 0;
531 for (int i = 0; i != iterations; ++i)
533 sync_server (orb, flusher.in ());
535 CORBA::ULong initial_receive_count =
536 ami_buffering_admin->request_count ();
538 if (initial_receive_count != send_count)
540 test_failed = 1;
541 ACE_DEBUG ((LM_DEBUG,
542 "DEBUG: Iteration %d message lost (%u != %u)\n",
543 i, initial_receive_count, send_count));
546 ACE_Time_Value start = ACE_OS::gettimeofday ();
547 while (1)
549 ami_buffering->sendc_receive_data (reply_handler.in (),
550 payload);
551 ++send_count;
553 CORBA::ULong receive_count =
554 ami_buffering_admin->request_count ();
556 ACE_Time_Value tv (0, 10 * 1000);
557 orb->run (tv);
559 ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
560 if (receive_count != initial_receive_count)
562 if (elapsed.msec () < TIMEOUT_MILLISECONDS)
564 test_failed = 1;
565 ACE_DEBUG ((LM_DEBUG,
566 "DEBUG: Iteration %d flush before "
567 "timeout expired. "
568 "Elapsed = %d, Timeout = %d msecs\n",
570 elapsed.msec (), TIMEOUT_MILLISECONDS));
572 // terminate the while loop.
573 break;
576 if (elapsed.msec () > TIMEOUT_TOLERANCE)
578 if (retry_attempt++ < 10) {
579 ACE_DEBUG ((LM_DEBUG, "DEBUG: Retry attempt %d beyond TIMEOUT_TOLERANCE.\n",
580 retry_attempt));
581 continue;
584 test_failed = 1;
585 ACE_DEBUG ((LM_DEBUG,
586 "DEBUG: Iteration %d no flush past "
587 "timeout threshold. "
588 "Elapsed = %d, Timeout = %d msecs\n",
590 elapsed.msec (), TIMEOUT_TOLERANCE));
591 break;
596 int liveness_test_failed =
597 run_liveness_test (orb,
598 reply_handler.in (),
599 ami_buffering,
600 flusher.in (),
601 ami_buffering_admin);
603 if (liveness_test_failed)
604 test_failed = 1;
606 return test_failed;
610 run_timeout_reactive (CORBA::ORB_ptr orb,
611 PortableServer::POA_ptr root_poa,
612 Test::AMI_Buffering_ptr ami_buffering,
613 Test::AMI_Buffering_Admin_ptr ami_buffering_admin)
615 TAO::BufferingConstraint buffering_constraint;
616 buffering_constraint.mode = TAO::BUFFER_TIMEOUT;
617 buffering_constraint.message_count = 0;
618 buffering_constraint.message_bytes = 0;
619 buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000;
621 Test::AMI_Buffering_var flusher;
622 int test_failed =
623 configure_policies (orb, buffering_constraint,
624 ami_buffering, flusher.out ());
626 if (test_failed != 0)
627 return test_failed;
629 Test::Payload payload (PAYLOAD_LENGTH);
630 payload.length (PAYLOAD_LENGTH);
631 for (int j = 0; j != PAYLOAD_LENGTH; ++j)
632 payload[j] = CORBA::Octet(j % 256);
634 Reply_Handler *reply_handler_impl = 0;
635 ACE_NEW_RETURN (reply_handler_impl,
636 Reply_Handler,
638 PortableServer::ServantBase_var owner_transfer(reply_handler_impl);
640 PortableServer::ObjectId_var id =
641 root_poa->activate_object (reply_handler_impl);
643 CORBA::Object_var object_act = root_poa->id_to_reference (id.in ());
645 Test::AMI_AMI_BufferingHandler_var reply_handler =
646 Test::AMI_AMI_BufferingHandler::_narrow (object_act.in ());
648 CORBA::ULong send_count = 0;
649 int retry_attempt = 0;
650 for (int i = 0; i != iterations; ++i)
652 sync_server (orb, flusher.in ());
654 CORBA::ULong initial_receive_count =
655 ami_buffering_admin->request_count ();
657 if (initial_receive_count != send_count)
659 test_failed = 1;
660 ACE_DEBUG ((LM_DEBUG,
661 "DEBUG: Iteration %d message lost (%u != %u)\n",
662 i, initial_receive_count, send_count));
664 ACE_Time_Value start = ACE_OS::gettimeofday ();
665 for (int j = 0; j != 20; ++j)
667 ami_buffering->sendc_receive_data (reply_handler.in (),
668 payload);
669 send_count++;
671 while (1)
673 CORBA::ULong receive_count =
674 ami_buffering_admin->request_count ();
676 ACE_Time_Value tv (0, 10 * 1000);
677 orb->run (tv);
679 ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
680 if (receive_count != initial_receive_count)
682 if (elapsed.msec () < TIMEOUT_MILLISECONDS)
684 test_failed = 1;
685 ACE_DEBUG ((LM_DEBUG,
686 "DEBUG: Iteration %d flush before "
687 "timeout expired. "
688 "Elapsed = %d, Timeout = %d msecs\n",
690 elapsed.msec (), TIMEOUT_MILLISECONDS));
692 // terminate the while loop.
693 break;
696 if (elapsed.msec () > TIMEOUT_TOLERANCE)
698 if (retry_attempt++ < 10) {
699 ACE_DEBUG ((LM_DEBUG, "DEBUG: Retry attempt %d beyond TIMEOUT_TOLERANCE.\n",
700 retry_attempt));
701 continue;
704 test_failed = 1;
705 ACE_DEBUG ((LM_DEBUG,
706 "DEBUG: Iteration %d no flush past "
707 "timeout threshold. "
708 "Elapsed = %d, Timeout = %d msecs\n",
710 elapsed.msec (), TIMEOUT_TOLERANCE));
711 break;
716 #if 0
717 int liveness_test_failed =
718 run_liveness_test (orb,
719 reply_handler.in (),
720 ami_buffering,
721 flusher.in (),
722 ami_buffering_admin);
724 if (liveness_test_failed)
725 test_failed = 1;
726 #endif /* 0 */
729 return test_failed;
733 run_buffer_size (CORBA::ORB_ptr orb,
734 PortableServer::POA_ptr root_poa,
735 Test::AMI_Buffering_ptr ami_buffering,
736 Test::AMI_Buffering_Admin_ptr ami_buffering_admin)
738 TAO::BufferingConstraint buffering_constraint;
739 buffering_constraint.mode = TAO::BUFFER_MESSAGE_BYTES;
740 buffering_constraint.message_count = 0;
741 buffering_constraint.message_bytes = BUFFER_SIZE;
742 buffering_constraint.timeout = 0;
744 Test::AMI_Buffering_var flusher;
745 int test_failed =
746 configure_policies (orb, buffering_constraint,
747 ami_buffering, flusher.out ());
749 if (test_failed != 0)
750 return test_failed;
752 Test::Payload payload (PAYLOAD_LENGTH);
753 payload.length (PAYLOAD_LENGTH);
755 Reply_Handler *reply_handler_impl;
756 ACE_NEW_RETURN (reply_handler_impl,
757 Reply_Handler,
759 PortableServer::ServantBase_var owner_transfer(reply_handler_impl);
761 PortableServer::ObjectId_var id =
762 root_poa->activate_object (reply_handler_impl);
764 CORBA::Object_var object_act = root_poa->id_to_reference (id.in ());
766 Test::AMI_AMI_BufferingHandler_var reply_handler =
767 Test::AMI_AMI_BufferingHandler::_narrow (object_act.in ());
769 CORBA::ULong bytes_sent = 0;
770 for (int i = 0; i != iterations; ++i)
772 sync_server (orb, flusher.in ());
774 CORBA::ULong initial_bytes_received =
775 ami_buffering_admin->bytes_received_count ();
777 if (initial_bytes_received != bytes_sent)
779 test_failed = 1;
780 ACE_DEBUG ((LM_DEBUG,
781 "DEBUG: Iteration %d data lost (%u != %u)\n",
782 i, initial_bytes_received, bytes_sent));
785 while (1)
787 ami_buffering->sendc_receive_data (reply_handler.in (),
788 payload);
789 bytes_sent += PAYLOAD_LENGTH;
791 CORBA::ULong bytes_received =
792 ami_buffering_admin->bytes_received_count ();
794 ACE_Time_Value tv (0, 10 * 1000);
795 orb->run (tv);
797 CORBA::ULong payload_delta =
798 bytes_sent - initial_bytes_received;
799 if (bytes_received != initial_bytes_received)
801 // The queue has been flushed, check that enough data
802 // has been sent. The check cannot be precise because
803 // the ORB counts the GIOP message overhead, in this
804 // test we assume the overhead to be less than 10%
806 if (payload_delta < CORBA::ULong (GIOP_OVERHEAD * BUFFER_SIZE))
808 test_failed = 1;
809 ACE_DEBUG ((LM_DEBUG,
810 "DEBUG: Iteration %d flush before "
811 "minimum buffer size was reached. "
812 "Sent = %u, Minimum buffer = %u bytes\n",
814 payload_delta, BUFFER_SIZE));
816 break;
819 if (payload_delta > 2 * BUFFER_SIZE)
821 test_failed = 1;
822 ACE_DEBUG ((LM_DEBUG,
823 "DEBUG: Iteration %d no flush past "
824 "buffer size threshold. "
825 "Sent = %u, Minimum buffer = %u bytes\n",
827 payload_delta, BUFFER_SIZE));
828 break;
833 int liveness_test_failed =
834 run_liveness_test (orb,
835 reply_handler.in (),
836 ami_buffering,
837 flusher.in (),
838 ami_buffering_admin);
840 if (liveness_test_failed)
841 test_failed = 1;
843 return test_failed;