2 //=============================================================================
4 * @file Message_Queue_Test.cpp
7 * 0) a test that ensures key ACE_Message_Queue features are
8 * working properly, including timeouts and priorities
9 * 1) a simple test of the ACE_Message_Queue that illustrates how to
10 * use the forward and reverse iterators
11 * 2) a simple performance measurement test for both single-threaded
12 * (null synch), thread-safe ACE_Message_Queues, and
13 * ACE_Message_Queue_Vx, which wraps VxWorks message queues
14 * 3) a test/usage example of ACE_Message_Queue_Vx
15 * 4) a test of the message counting in a message queue under load.
17 * @author Irfan Pyarali <irfan@cs.wustl.edu>
18 * @author David L. Levine <levine@cs.wustl.edu>
19 * @author and Douglas C. Schmidt <schmidt@vanderbilt.edu>
21 //=============================================================================
24 #include "test_config.h"
25 #include "ace/Atomic_Op.h"
26 #include "ace/Thread_Manager.h"
27 #include "ace/Message_Queue.h"
28 #include "ace/Message_Queue_NT.h"
29 #include "ace/Message_Queue_Vx.h"
30 #include "ace/Synch_Traits.h"
31 #include "ace/Null_Mutex.h"
32 #include "ace/Null_Condition.h"
33 #include "ace/High_Res_Timer.h"
35 #include "ace/OS_NS_stdio.h"
36 #include "ace/OS_NS_string.h"
37 #include "ace/OS_NS_sys_time.h"
38 #include "ace/OS_NS_unistd.h"
42 const ACE_TCHAR usage
[] = ACE_TEXT ("usage: Message_Queue_Test <number of messages>\n");
44 typedef ACE_Message_Queue
<ACE_NULL_SYNCH
> QUEUE
;
45 typedef ACE_Message_Queue_Iterator
<ACE_NULL_SYNCH
> ITERATOR
;
46 typedef ACE_Message_Queue_Reverse_Iterator
<ACE_NULL_SYNCH
> REVERSE_ITERATOR
;
48 #if defined (ACE_HAS_WINCE)
49 static const int MESSAGE_FACTOR
= 10000;
51 static const int MESSAGE_FACTOR
= 100000;
53 static const int MAX_MESSAGES
= 10000;
54 static const int MAX_MESSAGE_SIZE
= 32;
55 static const char test_message
[] = "ACE_Message_Queue Test Message";
57 static int max_messages
= MAX_MESSAGES
;
59 // Dynamically allocate to avoid a static.
60 static ACE_High_Res_Timer
*timer
= 0;
62 #if defined (ACE_HAS_THREADS)
63 typedef ACE_Message_Queue
<ACE_MT_SYNCH
> SYNCH_QUEUE
;
68 // Container for data passed to sender and receiver in
72 // For use in multithreaded performance test.
74 ACE_Message_Queue_Base
*q_
;
77 ACE_Message_Block
**send_block_
;
78 // Pointer to messages blocks for sender to send to reciever.
81 : q_ (0), send_block_ (0)
84 // Default constructor.
87 // For the message counting test, there are two tasks, producer and consumer.
88 // Each will spawn a number of threads, and the two tasks share a queue.
89 class Counting_Test_Producer
: public ACE_Task
<ACE_MT_SYNCH
>
92 Counting_Test_Producer (ACE_Message_Queue
<ACE_MT_SYNCH
> *queue
)
93 : ACE_Task
<ACE_MT_SYNCH
> (0, queue
), sequence_ (0), produced_ (0) {}
94 virtual int svc (void);
96 ACE_Atomic_Op
<ACE_Thread_Mutex
, long> sequence_
;
97 ACE_Atomic_Op
<ACE_Thread_Mutex
, long> produced_
;
100 class Counting_Test_Consumer
: public ACE_Task
<ACE_MT_SYNCH
>
103 Counting_Test_Consumer (ACE_Message_Queue
<ACE_MT_SYNCH
> *queue
)
104 : ACE_Task
<ACE_MT_SYNCH
> (0, queue
), consumed_ (0) {}
105 virtual int svc (void);
107 ACE_Atomic_Op
<ACE_Thread_Mutex
, long> consumed_
;
111 Counting_Test_Producer::svc (void)
113 // Going to produce a lot of blocks. Since we don't necessarily want them
114 // all consumed, there's no arrangement with the consumer to be sure that
115 // the same number produced will be consumed; the test check will compare
116 // the number produced, consumed, and remaining to be sure it ends up
118 // Also, to be sure there's not just 1 producer and 1 consumer pinging
119 // back and forth, make the producers randomly delay between blocks.
120 ACE_OS::srand (static_cast<unsigned int> (ACE_OS::time ()));
121 int multiple
= ACE_OS::rand () % 10;
122 int delay_ms
= (ACE_OS::rand () % 10) / 2;
123 // The delay usually causes the test to time out in the automated
124 // regression testing. I just left it here in case it's needed someday.
126 long count
= MESSAGE_FACTOR
* (multiple
? multiple
: 1);
128 // Some of the threads enqueue single blocks, others sequences.
129 long lsequence
= ++(this->sequence_
);
130 int seq
= static_cast<int> (lsequence
);
131 ACE_DEBUG ((LM_DEBUG
,
132 ACE_TEXT ("(%t) Producer will enqueue %B blocks in seq of %d, ")
133 ACE_TEXT ("%d msec delay\n"),
138 ACE_Message_Block
*first
= 0, *prev
= 0, *b
= 0;
139 ACE_Time_Value
delay (0, delay_ms
);
140 ACE_Time_Value
timeout (10);
141 while (produced
< count
)
143 ACE_NEW_NORETURN (b
, ACE_Message_Block (1));
146 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Producer out of memory\n")));
151 for (int s
= 1; s
< seq
; ++s
)
153 ACE_NEW_NORETURN (b
, ACE_Message_Block (1));
164 while (first
->next () != 0)
172 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Producer out of memory\n")));
175 // To be sure we can keep going on slow or completed consumers, but not
176 // delay excessively if the consumers have stopped, limit the time
177 // spent waiting to 10 seconds.
178 ACE_Time_Value block
= ACE_OS::gettimeofday ();
180 if (this->putq (first
, &block
) == -1)
182 ACE_DEBUG ((LM_DEBUG
,
183 ACE_TEXT ("(%t) Producer cannot putq; giving up\n")));
184 while (first
->next () != 0)
195 ACE_OS::sleep (delay
);
197 this->produced_
+= produced
;
198 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Producer done\n")));
203 Counting_Test_Consumer::svc (void)
205 // Consume lots of blocks and release them. To mimic a thread with work
206 // to do, put a small random delay between dequeuing the blocks. Consume
207 // a calculated number of blocks then stop; the test checker will determine
208 // if the number consumed plus the number remaining is correct for the
210 unsigned int seed
= static_cast<unsigned int> (ACE_OS::time ());
212 int multiple
= ACE_OS::rand_r (&seed
) % 10;
213 int delay_ms
= ACE_OS::rand_r (&seed
) % 10;
214 // The delay usually causes the test to time out in the automated
215 // regression testing. I just left it here in case it's needed someday.
217 long count
= MESSAGE_FACTOR
* (multiple
? multiple
: 1);
219 ACE_DEBUG ((LM_DEBUG
,
220 ACE_TEXT ("(%t) Consumer will dequeue %B blocks, ")
221 ACE_TEXT ("%d msec delay\n"),
224 ACE_Message_Block
*b
= 0;
225 ACE_Time_Value
delay (0, delay_ms
);
226 ACE_Time_Value
timeout (2);
227 while (consumed
< count
)
229 // To be sure we can wait in the case of an empty queue, but not
230 // delay excessively if the producers have stopped, limit the time
231 // spent waiting to 2 seconds.
232 ACE_Time_Value block
= ACE_OS::gettimeofday ();
234 if (this->getq (b
, &block
) == -1)
236 if (errno
== EWOULDBLOCK
)
237 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Consumer timed out\n")));
239 ACE_ERROR ((LM_ERROR
,
240 ACE_TEXT ("(%t) Consumer %p\n"),
247 ACE_OS::sleep (delay
);
249 this->consumed_
+= consumed
;
250 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Consumer done\n")));
257 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Starting counting test\n")));
259 ACE_Message_Queue
<ACE_MT_SYNCH
> q (2 * 1024 * 1024); // 2MB high water
260 Counting_Test_Producer
p (&q
);
261 Counting_Test_Consumer
c (&q
);
262 // Activate consumers first; if the producers fail to start, consumers will
264 if (c
.activate (THR_NEW_LWP
| THR_JOINABLE
| THR_INHERIT_SCHED
, 5) == -1)
265 ACE_ERROR_RETURN ((LM_ERROR
,
266 ACE_TEXT ("Consumers %p\n"),
267 ACE_TEXT ("activate")),
269 if (p
.activate (THR_NEW_LWP
| THR_JOINABLE
| THR_INHERIT_SCHED
, 5) == -1)
271 ACE_ERROR ((LM_ERROR
,
272 ACE_TEXT ("Producers %p\n"),
273 ACE_TEXT ("activate")));
277 // Producers and consumers are both running; wait for them then
278 // check the results.
281 // This compare relies on the flush() method counting blocks as it
282 // walks the chain releasing them, and doesn't rely on the count.
284 long q_count
= static_cast<long> (q
.message_count ());
285 long remaining
= q
.flush ();
286 ACE_DEBUG ((LM_DEBUG
,
287 ACE_TEXT ("Queue message_count is %b; %b flushed\n"),
289 (ssize_t
)remaining
));
290 if (q_count
!= remaining
)
293 ACE_ERROR ((LM_ERROR
,
294 ACE_TEXT ("message_count and flushed should be equal!\n")));
296 long expected
= p
.produced_
.value () - c
.consumed_
.value ();
297 ACE_DEBUG ((LM_DEBUG
,
298 ACE_TEXT ("Produced %b, consumed %b; diff %b\n"),
299 (ssize_t
)p
.produced_
.value (),
300 (ssize_t
)c
.consumed_
.value (),
302 if (expected
!= remaining
)
305 ACE_ERROR ((LM_ERROR
,
306 ACE_TEXT ("Producer-consumer diff is %b; should be %b\n"),
308 (ssize_t
)remaining
));
310 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Ending counting test\n")));
314 #endif /* ACE_HAS_THREADS */
319 const int ITERATIONS
= 5;
320 ACE_TCHAR buffer
[ITERATIONS
][BUFSIZ
];
321 // Use queue size from of 32 Kb (more if using wide-char), instead of the
322 // default of 16 Kb (defined by ACE_Message_Queue_Base::DEFAULT_HWM),
323 // so that the test runs on machines with 8Kb pagesizes.
325 // QUEUE queue (32 * 1024 * sizeof (ACE_TCHAR));
326 QUEUE
queue (sizeof(buffer
));
330 for (i
= 0; i
< ITERATIONS
; i
++)
332 ACE_OS::snprintf (buffer
[i
], BUFSIZ
, ACE_TEXT ("%d"), i
+ 1);
334 ACE_Message_Block
*entry
= 0;
335 ACE_NEW_RETURN (entry
,
336 ACE_Message_Block ((char *) buffer
[i
],
340 if (queue
.is_full ())
341 ACE_ERROR_RETURN ((LM_ERROR
,
342 ACE_TEXT ("QUEUE:: the message queue is full on iteration %u!\n"),
346 if (queue
.enqueue (entry
) == -1)
347 ACE_ERROR_RETURN ((LM_ERROR
,
348 ACE_TEXT ("QUEUE::enqueue\n")),
352 ACE_DEBUG ((LM_DEBUG
,
353 ACE_TEXT ("\nForward Iterations\n")));
355 ITERATOR
iterator (queue
);
357 for (ACE_Message_Block
*entry
= 0;
358 iterator
.next (entry
) != 0;
360 ACE_DEBUG ((LM_DEBUG
,
365 ACE_DEBUG ((LM_DEBUG
,
366 ACE_TEXT ("\nReverse Iterations\n")));
368 REVERSE_ITERATOR
iterator (queue
);
370 for (ACE_Message_Block
*entry
= 0;
371 iterator
.next (entry
) != 0;
373 ACE_DEBUG ((LM_DEBUG
,
378 ACE_DEBUG ((LM_DEBUG
,
379 ACE_TEXT ("\nForward Iterations\n")));
381 QUEUE::ITERATOR
iterator (queue
);
383 for (ACE_Message_Block
*entry
= 0;
384 iterator
.next (entry
) != 0;
386 ACE_DEBUG ((LM_DEBUG
,
391 ACE_DEBUG ((LM_DEBUG
,
392 ACE_TEXT ("\nReverse Iterations\n")));
394 QUEUE::REVERSE_ITERATOR
iterator (queue
);
396 for (ACE_Message_Block
*entry
= 0;
397 iterator
.next (entry
) != 0;
399 ACE_DEBUG ((LM_DEBUG
,
407 #if defined (ACE_HAS_THREADS)
410 chained_block_test (void)
414 const char * s
= "123456789"; // Will be length 10 when copied to block
415 const size_t slen
= 10;
416 const size_t num_blks
= 10;
417 ACE_Message_Block b
[num_blks
];
421 for (i
= 0; i
< num_blks
; ++i
)
427 // Test enqueueing single and chained blocks and be sure they end up with
428 // the proper enqueued block count and sizes. Then be sure they are dequeued
429 // in the proper order.
432 // b[3] and b[4] are unchained.
437 q
.enqueue_tail (&b
[3]);
438 q
.enqueue_tail (&b
[4]);
439 int num
= q
.enqueue_head (&b
[0]);
442 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Chained enqueue expected 5; has %d\n"),
446 num
= q
.enqueue_tail (&b
[5]);
449 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Chained enqueue expected 9; has %d\n"),
453 num
= q
.enqueue_tail (&b
[9]);
456 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Chained enqueue expected 10; has %d\n"),
461 msgs
= q
.message_count ();
462 bytes
= q
.message_bytes ();
463 if (msgs
!= 10 || bytes
!= 100)
465 ACE_ERROR ((LM_ERROR
,
466 ACE_TEXT ("Chained enqueue totals: %d msgs, %d bytes; ")
467 ACE_TEXT ("should be 10 msgs, 100 bytes\n"),
468 (int)msgs
, (int)bytes
));
472 // Now see if we can dequeue them, checking the order.
473 ACE_Time_Value
nowait (ACE_OS::gettimeofday ());
474 ACE_Message_Block
*bp
;
476 for (i
= 0; i
< num_blks
; ++i
)
478 qstat
= q
.dequeue_head (bp
, &nowait
);
481 ACE_ERROR ((LM_ERROR
,
482 ACE_TEXT ("Checking chained blocks, pass %d: %p\n"),
483 (int)i
, ACE_TEXT ("dequeue_head")));
486 else if (bp
!= &b
[i
])
488 ACE_ERROR ((LM_ERROR
,
489 ACE_TEXT ("Checking chained blocks, pass %d: ")
490 ACE_TEXT ("block out of order\n"),
497 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Chained block test OK\n")));
502 single_thread_performance_test (int queue_type
= 0)
504 const char test_message
[] =
505 "ACE_Message_Queue Test Message";
506 const ACE_TCHAR
*message
=
507 ACE_TEXT ("ACE_Message_Queue<ACE_NULL_SYNCH>, single thread");
510 // Create a message queue.
511 ACE_Message_Queue_Base
*msgq
= 0;
514 ACE_NEW_RETURN (msgq
,
517 #if defined (ACE_VXWORKS)
520 ACE_NEW_RETURN (msgq
,
521 ACE_Message_Queue_Vx (max_messages
,
524 message
= "ACE_Message_Queue_Vx, single thread test";
526 #elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
529 ACE_NEW_RETURN (msgq
,
530 ACE_Message_Queue_NT
,
532 message
= ACE_TEXT ("ACE_Message_Queue_NT, single thread test");
534 #endif /* ACE_VXWORKS */
536 // Create the messages. Allocate off the heap in case messages
537 // is large relative to the amount of stack space available.
538 ACE_Message_Block
**send_block
= 0;
539 ACE_NEW_RETURN (send_block
,
540 ACE_Message_Block
*[max_messages
],
543 for (i
= 0; i
< max_messages
; ++i
)
544 ACE_NEW_RETURN (send_block
[i
],
545 ACE_Message_Block (test_message
,
549 ACE_Message_Block
**receive_block_p
= 0;
550 ACE_NEW_RETURN (receive_block_p
,
551 ACE_Message_Block
*[max_messages
],
554 #if defined (ACE_VXWORKS)
555 // Set up blocks to receive the messages. Allocate these off the
556 // heap in case messages is large relative to the amount of
557 // stack space available.
558 ACE_Message_Block
*receive_block
= 0;
559 ACE_NEW_RETURN (receive_block
,
560 ACE_Message_Block
[max_messages
],
563 for (i
= 0; i
< max_messages
; ++i
)
565 receive_block
[i
].init (MAX_MESSAGE_SIZE
);
567 // For VxWorks Message Queues, the receive block pointer must be
568 // assigned. It will be used by dequeue_head ().
569 receive_block_p
[i
] = &receive_block
[i
];
571 #endif /* ACE_VXWORKS */
575 // Send/receive the messages.
576 for (i
= 0; i
< max_messages
; ++i
)
578 if (msgq
->enqueue_tail (send_block
[i
]) == -1)
579 ACE_ERROR_RETURN ((LM_ERROR
,
581 ACE_TEXT ("enqueue")),
584 if (msgq
->dequeue_head (receive_block_p
[i
]) == -1)
585 ACE_ERROR_RETURN ((LM_ERROR
,
587 ACE_TEXT ("dequeue_head")),
594 timer
->elapsed_time (tv
);
596 ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
600 (double) tv
.msec () / max_messages
));
603 delete [] receive_block_p
;
604 #if defined (ACE_VXWORKS)
605 delete [] receive_block
;
606 #endif /* ACE_VXWORKS */
608 for (i
= 0; i
< max_messages
; ++i
)
609 delete send_block
[i
];
610 delete [] send_block
;
619 Queue_Wrapper
*queue_wrapper
=
620 reinterpret_cast<Queue_Wrapper
*> (arg
);
623 ACE_Message_Block
**receive_block_p
= 0;
624 ACE_NEW_RETURN (receive_block_p
,
625 ACE_Message_Block
*[max_messages
],
628 #if defined (ACE_VXWORKS)
629 // Set up blocks to receive the messages. Allocate these off the
630 // heap in case messages is large relative to the amount of stack
632 ACE_Message_Block
*receive_block
;
633 ACE_NEW_RETURN (receive_block
,
634 ACE_Message_Block
[max_messages
],
637 for (i
= 0; i
< max_messages
; ++i
)
639 receive_block
[i
].init (MAX_MESSAGE_SIZE
);
641 // For VxWorks Message Queues, the receive block pointer must be
642 // assigned. It will be used by <dequeue_head>.
643 receive_block_p
[i
] = &receive_block
[i
];
645 #endif /* ACE_VXWORKS */
647 for (i
= 0; i
< max_messages
; ++i
)
648 if (queue_wrapper
->q_
->dequeue_head (receive_block_p
[i
]) == -1)
649 ACE_ERROR_RETURN ((LM_ERROR
,
651 ACE_TEXT ("dequeue_head")),
655 delete [] receive_block_p
;
656 #if defined (ACE_VXWORKS)
657 delete [] receive_block
;
658 #endif /* ACE_VXWORKS */
666 Queue_Wrapper
*queue_wrapper
=
667 reinterpret_cast<Queue_Wrapper
*> (arg
);
672 // Send the messages.
673 for (i
= 0; i
< max_messages
; ++i
)
674 if (queue_wrapper
->q_
->
675 enqueue_tail (queue_wrapper
->send_block_
[i
]) == -1)
676 ACE_ERROR_RETURN ((LM_ERROR
,
678 ACE_TEXT ("enqueue")),
685 performance_test (int queue_type
= 0)
687 Queue_Wrapper queue_wrapper
;
688 const ACE_TCHAR
*message
=
689 ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH>");
692 // Create the messages. Allocate off the heap in case messages is
693 // large relative to the amount of stack space available. Allocate
694 // it here instead of in the sender, so that we can delete it after
695 // the _receiver_ is done.
696 ACE_Message_Block
**send_block
= 0;
697 ACE_NEW_RETURN (send_block
,
698 ACE_Message_Block
*[max_messages
],
701 for (i
= 0; i
< max_messages
; ++i
)
702 ACE_NEW_RETURN (send_block
[i
],
703 ACE_Message_Block (test_message
,
707 queue_wrapper
.send_block_
= send_block
;
710 ACE_NEW_RETURN (queue_wrapper
.q_
,
713 #if defined (ACE_VXWORKS)
716 ACE_NEW_RETURN (queue_wrapper
.q_
,
717 ACE_Message_Queue_Vx (max_messages
,
720 message
= "ACE_Message_Queue_Vx";
722 #elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
725 ACE_NEW_RETURN (queue_wrapper
.q_
,
726 ACE_Message_Queue_NT
,
728 message
= ACE_TEXT ("ACE_Message_Queue_NT");
730 #endif /* ACE_VXWORKS */
732 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) sender
,
735 ACE_ERROR_RETURN ((LM_ERROR
,
737 ACE_TEXT ("spawning sender thread")),
740 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) receiver
,
743 ACE_ERROR_RETURN ((LM_ERROR
,
745 ACE_TEXT ("spawning receiver thread")),
748 ACE_Thread_Manager::instance ()->wait ();
750 timer
->elapsed_time (tv
);
751 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
755 (double) tv
.msec () / max_messages
));
758 delete queue_wrapper
.q_
;
759 queue_wrapper
.q_
= 0;
761 for (i
= 0; i
< max_messages
; ++i
)
762 delete send_block
[i
];
763 delete [] send_block
;
768 // Ensure that the timedout dequeue_head() sets errno code properly.
778 ACE_ERROR ((LM_ERROR
,
779 ACE_TEXT ("New queue is not empty!\n")));
784 ACE_Message_Block
*b
;
785 ACE_Time_Value
tv (ACE_OS::gettimeofday ()); // Now
787 if (mq
.dequeue_head (b
, &tv
) != -1)
789 ACE_ERROR ((LM_ERROR
,
790 ACE_TEXT ("Dequeued from empty queue!\n")));
793 else if (errno
!= EWOULDBLOCK
)
795 ACE_ERROR ((LM_ERROR
,
797 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
802 ACE_DEBUG ((LM_DEBUG
,
803 ACE_TEXT ("Timed dequeue test: OK\n")));
804 status
= 0; // All is well
810 #endif /* ACE_HAS_THREADS */
812 // Check to make sure that dequeue_prio() respects FIFO ordering.
813 // @@ At some point, this function should be enhanced to do a more
819 const char S1
[] = "first";
820 const char S2
[] = "second";
821 const int PRIORITY
= 50;
825 ACE_Message_Block
mb1 (S1
, sizeof S1
, PRIORITY
);
826 ACE_Message_Block
mb2 (S2
, sizeof S2
, PRIORITY
);
828 mq
.enqueue_prio (&mb1
);
829 mq
.enqueue_prio (&mb2
);
831 ACE_Message_Block
*mb1p
= 0;
832 ACE_Message_Block
*mb2p
= 0;
834 mq
.dequeue_prio (mb1p
);
835 mq
.dequeue_prio (mb2p
);
837 ACE_DEBUG ((LM_DEBUG
, "message 1 = %C\nmessage 2 = %C\n",
841 if (ACE_OS::strcmp (mb1p
->rd_ptr (), S1
) == 0
842 && ACE_OS::strcmp (mb2p
->rd_ptr (), S2
) == 0)
855 int flushed_messages
;
858 flushed_messages
= mq1
.close ();
860 if (flushed_messages
!= 0)
862 ACE_ERROR ((LM_ERROR
,
863 ACE_TEXT ("Closing queue should flush 0 messages, close() reports - %d\n"),
869 // There was a bug that return previous queue state instead of
870 // number of flushed messages. Thus, insert 2 messages != ACTIVATE
872 ACE_Message_Block
*pMB1
;
873 ACE_Message_Block
*pMB2
;
874 ACE_NEW_NORETURN (pMB1
, ACE_Message_Block (1));
875 ACE_NEW_NORETURN (pMB2
, ACE_Message_Block (1));
877 mq2
.enqueue_head (pMB1
);
878 mq2
.enqueue_head (pMB2
);
879 flushed_messages
= mq2
.close ();
881 if (flushed_messages
!= 2)
883 ACE_ERROR ((LM_ERROR
,
884 ACE_TEXT ("Closing queue should flush 2 messages, close() reports - %d\n"),
893 run_main (int argc
, ACE_TCHAR
*argv
[])
895 ACE_START_TEST (ACE_TEXT ("Message_Queue_Test"));
899 if (!ACE_OS::strcmp (argv
[1], ACE_TEXT ("-?")))
901 ACE_ERROR ((LM_ERROR
,
907 max_messages
= ACE_OS::atoi (argv
[1]);
911 int status
= prio_test ();
913 // The iterator test occasionally causes a page fault or a hang on
916 status
= iterator_test ();
918 ACE_NEW_RETURN (timer
,
923 status
= close_test ();
925 #if defined (ACE_HAS_THREADS)
927 status
= timeout_test ();
930 status
= chained_block_test ();
933 status
= single_thread_performance_test ();
935 # if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
936 // Test ACE_Message_Queue_Vx. or ACE_Message_Queue_NT
938 status
= single_thread_performance_test (1);
939 # endif /* ACE_VXWORKS */
942 status
= performance_test ();
944 # if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
945 // Test ACE_Message_Queue_Vx or ACE_Message_Queue_NT
947 status
= performance_test (1);
948 # endif /* ACE_VXWORKS */
950 if (counting_test () != 0)
952 #endif /* ACE_HAS_THREADS */
955 ACE_ERROR ((LM_ERROR
,
957 ACE_TEXT ("test failed")));