Make x.0.10 publicly available
[ACE_TAO.git] / ACE / tests / Message_Queue_Test.cpp
blobce5163e5be476c0b994cadf7652d26ee33f6b650
2 //=============================================================================
3 /**
4 * @file Message_Queue_Test.cpp
6 * This is:
7 * 0) a test that ensures key ACE_Message_Queue features are
8 * working properly, including timeouts and priorities
9 * 1) a simple test of the ACE_Message_Queue that illustrates how to
10 * use the forward and reverse iterators
11 * 2) a simple performance measurement test for both single-threaded
12 * (null synch), thread-safe ACE_Message_Queues, and
13 * ACE_Message_Queue_Vx, which wraps VxWorks message queues
14 * 3) a test/usage example of ACE_Message_Queue_Vx
15 * 4) a test of the message counting in a message queue under load.
17 * @author Irfan Pyarali <irfan@cs.wustl.edu>
18 * @author David L. Levine <levine@cs.wustl.edu>
19 * @author and Douglas C. Schmidt <schmidt@vanderbilt.edu>
21 //=============================================================================
24 #include "test_config.h"
25 #include "ace/Atomic_Op.h"
26 #include "ace/Thread_Manager.h"
27 #include "ace/Message_Queue.h"
28 #include "ace/Message_Queue_NT.h"
29 #include "ace/Message_Queue_Vx.h"
30 #include "ace/Synch_Traits.h"
31 #include "ace/Null_Mutex.h"
32 #include "ace/Null_Condition.h"
33 #include "ace/High_Res_Timer.h"
34 #include "ace/Task.h"
35 #include "ace/OS_NS_stdio.h"
36 #include "ace/OS_NS_string.h"
37 #include "ace/OS_NS_sys_time.h"
38 #include "ace/OS_NS_unistd.h"
41 const ACE_TCHAR usage[] = ACE_TEXT ("usage: Message_Queue_Test <number of messages>\n");
43 using QUEUE = ACE_Message_Queue<ACE_NULL_SYNCH>;
44 using ITERATOR = ACE_Message_Queue_Iterator<ACE_NULL_SYNCH>;
45 using REVERSE_ITERATOR = ACE_Message_Queue_Reverse_Iterator<ACE_NULL_SYNCH>;
47 #if defined (ACE_HAS_WINCE)
48 static const int MESSAGE_FACTOR = 10000;
49 #else
50 static const int MESSAGE_FACTOR = 100000;
51 #endif
52 static const int MAX_MESSAGES = 10000;
53 static const int MAX_MESSAGE_SIZE = 32;
54 static const char test_message[] = "ACE_Message_Queue Test Message";
56 static int max_messages = MAX_MESSAGES;
58 // Dynamically allocate to avoid a static.
59 static ACE_High_Res_Timer *timer = 0;
61 #if defined (ACE_HAS_THREADS)
62 using SYNCH_QUEUE = ACE_Message_Queue<ACE_MT_SYNCH>;
64 struct Queue_Wrapper
66 // = TITLE
67 // Container for data passed to sender and receiver in
68 // performance test.
70 // = DESCRIPTION
71 // For use in multithreaded performance test.
73 ACE_Message_Queue_Base *q_;
74 // The message queue.
76 ACE_Message_Block **send_block_;
77 // Pointer to messages blocks for sender to send to reciever.
79 Queue_Wrapper ()
80 : q_ (0), send_block_ (0)
83 // Default constructor.
86 // For the message counting test, there are two tasks, producer and consumer.
87 // Each will spawn a number of threads, and the two tasks share a queue.
88 class Counting_Test_Producer : public ACE_Task<ACE_MT_SYNCH>
90 public:
91 Counting_Test_Producer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
92 : ACE_Task<ACE_MT_SYNCH> (0, queue), sequence_ (0), produced_ (0) {}
93 int svc () override;
95 ACE_Atomic_Op<ACE_Thread_Mutex, long> sequence_;
96 ACE_Atomic_Op<ACE_Thread_Mutex, long> produced_;
99 class Counting_Test_Consumer : public ACE_Task<ACE_MT_SYNCH>
101 public:
102 Counting_Test_Consumer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
103 : ACE_Task<ACE_MT_SYNCH> (0, queue), consumed_ (0) {}
104 int svc () override;
106 ACE_Atomic_Op<ACE_Thread_Mutex, long> consumed_;
110 Counting_Test_Producer::svc ()
112 // Going to produce a lot of blocks. Since we don't necessarily want them
113 // all consumed, there's no arrangement with the consumer to be sure that
114 // the same number produced will be consumed; the test check will compare
115 // the number produced, consumed, and remaining to be sure it ends up
116 // correct.
117 // Also, to be sure there's not just 1 producer and 1 consumer pinging
118 // back and forth, make the producers randomly delay between blocks.
119 ACE_OS::srand (static_cast<unsigned int> (ACE_OS::time ()));
120 int multiple = ACE_OS::rand () % 10;
121 int delay_ms = (ACE_OS::rand () % 10) / 2;
122 // The delay usually causes the test to time out in the automated
123 // regression testing. I just left it here in case it's needed someday.
124 delay_ms = 0;
125 long count = MESSAGE_FACTOR * (multiple ? multiple : 1);
126 long produced = 0;
127 // Some of the threads enqueue single blocks, others sequences.
128 long lsequence = ++(this->sequence_);
129 int seq = static_cast<int> (lsequence);
130 ACE_DEBUG ((LM_DEBUG,
131 ACE_TEXT ("(%t) Producer will enqueue %B blocks in seq of %d, ")
132 ACE_TEXT ("%d msec delay\n"),
133 (size_t)count,
134 seq,
135 delay_ms));
137 ACE_Message_Block *first = 0, *prev = 0, *b = 0;
138 ACE_Time_Value delay (0, delay_ms);
139 ACE_Time_Value timeout (10);
140 while (produced < count)
142 ACE_NEW_NORETURN (b, ACE_Message_Block (1));
143 if (b == 0)
145 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer out of memory\n")));
146 break;
148 first = b;
149 prev = first;
150 for (int s = 1; s < seq; ++s)
152 ACE_NEW_NORETURN (b, ACE_Message_Block (1));
153 if (b == 0)
154 break;
155 prev->next (b);
156 b->prev (prev);
157 prev = b;
159 if (b == 0)
161 if (first != b)
163 while (first->next () != 0)
165 b = first->next ();
166 first->release ();
167 first = b;
169 first->release ();
171 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer out of memory\n")));
172 break;
174 // To be sure we can keep going on slow or completed consumers, but not
175 // delay excessively if the consumers have stopped, limit the time
176 // spent waiting to 10 seconds.
177 ACE_Time_Value block = ACE_OS::gettimeofday ();
178 block += timeout;
179 if (this->putq (first, &block) == -1)
181 ACE_DEBUG ((LM_DEBUG,
182 ACE_TEXT ("(%t) Producer cannot putq; giving up\n")));
183 while (first->next () != 0)
185 b = first->next ();
186 first->release ();
187 first = b;
189 first->release ();
190 break;
192 produced += seq;
193 if (delay_ms)
194 ACE_OS::sleep (delay);
196 this->produced_ += produced;
197 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer done\n")));
198 return 0;
202 Counting_Test_Consumer::svc ()
204 // Consume lots of blocks and release them. To mimic a thread with work
205 // to do, put a small random delay between dequeuing the blocks. Consume
206 // a calculated number of blocks then stop; the test checker will determine
207 // if the number consumed plus the number remaining is correct for the
208 // number produced.
209 unsigned int seed = static_cast<unsigned int> (ACE_OS::time ());
211 int multiple = ACE_OS::rand_r (&seed) % 10;
212 int delay_ms = ACE_OS::rand_r (&seed) % 10;
213 // The delay usually causes the test to time out in the automated
214 // regression testing. I just left it here in case it's needed someday.
215 delay_ms = 0;
216 long count = MESSAGE_FACTOR * (multiple ? multiple : 1);
217 long consumed = 0;
218 ACE_DEBUG ((LM_DEBUG,
219 ACE_TEXT ("(%t) Consumer will dequeue %B blocks, ")
220 ACE_TEXT ("%d msec delay\n"),
221 (size_t)count,
222 delay_ms));
223 ACE_Message_Block *b = 0;
224 ACE_Time_Value delay (0, delay_ms);
225 ACE_Time_Value timeout (2);
226 while (consumed < count)
228 // To be sure we can wait in the case of an empty queue, but not
229 // delay excessively if the producers have stopped, limit the time
230 // spent waiting to 2 seconds.
231 ACE_Time_Value block = ACE_OS::gettimeofday ();
232 block += timeout;
233 if (this->getq (b, &block) == -1)
235 if (errno == EWOULDBLOCK)
236 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Consumer timed out\n")));
237 else
238 ACE_ERROR ((LM_ERROR,
239 ACE_TEXT ("(%t) Consumer %p\n"),
240 ACE_TEXT ("getq")));
241 break;
243 ++consumed;
244 b->release ();
245 if (delay_ms)
246 ACE_OS::sleep (delay);
248 this->consumed_ += consumed;
249 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Consumer done\n")));
250 return 0;
253 static int
254 counting_test ()
256 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Starting counting test\n")));
258 ACE_Message_Queue<ACE_MT_SYNCH> q (2 * 1024 * 1024); // 2MB high water
259 Counting_Test_Producer p (&q);
260 Counting_Test_Consumer c (&q);
261 // Activate consumers first; if the producers fail to start, consumers will
262 // stop quicker.
263 if (c.activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, 5) == -1)
264 ACE_ERROR_RETURN ((LM_ERROR,
265 ACE_TEXT ("Consumers %p\n"),
266 ACE_TEXT ("activate")),
267 -1);
268 if (p.activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, 5) == -1)
270 ACE_ERROR ((LM_ERROR,
271 ACE_TEXT ("Producers %p\n"),
272 ACE_TEXT ("activate")));
273 c.wait ();
274 return -1;
276 // Producers and consumers are both running; wait for them then
277 // check the results.
278 p.wait ();
279 c.wait ();
280 // This compare relies on the flush() method counting blocks as it
281 // walks the chain releasing them, and doesn't rely on the count.
282 int status = 0;
283 long q_count = static_cast<long> (q.message_count ());
284 long remaining = q.flush ();
285 ACE_DEBUG ((LM_DEBUG,
286 ACE_TEXT ("Queue message_count is %b; %b flushed\n"),
287 (ssize_t)q_count,
288 (ssize_t)remaining));
289 if (q_count != remaining)
291 status = -1;
292 ACE_ERROR ((LM_ERROR,
293 ACE_TEXT ("message_count and flushed should be equal!\n")));
295 long expected = p.produced_.value () - c.consumed_.value ();
296 ACE_DEBUG ((LM_DEBUG,
297 ACE_TEXT ("Produced %b, consumed %b; diff %b\n"),
298 (ssize_t)p.produced_.value (),
299 (ssize_t)c.consumed_.value (),
300 (ssize_t)expected));
301 if (expected != remaining)
303 status = -1;
304 ACE_ERROR ((LM_ERROR,
305 ACE_TEXT ("Producer-consumer diff is %b; should be %b\n"),
306 (ssize_t)expected,
307 (ssize_t)remaining));
309 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Ending counting test\n")));
310 return status;
313 #endif /* ACE_HAS_THREADS */
315 static int
316 iterator_test ()
318 const int ITERATIONS = 5;
319 ACE_TCHAR buffer[ITERATIONS][BUFSIZ];
320 // Use queue size from of 32 Kb (more if using wide-char), instead of the
321 // default of 16 Kb (defined by ACE_Message_Queue_Base::DEFAULT_HWM),
322 // so that the test runs on machines with 8Kb pagesizes.
324 // QUEUE queue (32 * 1024 * sizeof (ACE_TCHAR));
325 QUEUE queue (sizeof(buffer));
327 int i;
329 for (i = 0; i < ITERATIONS; i++)
331 ACE_OS::snprintf (buffer[i], BUFSIZ, ACE_TEXT ("%d"), i + 1);
333 ACE_Message_Block *entry = 0;
334 ACE_NEW_RETURN (entry,
335 ACE_Message_Block ((char *) buffer[i],
336 sizeof buffer[i]),
337 -1);
339 if (queue.is_full ())
340 ACE_ERROR_RETURN ((LM_ERROR,
341 ACE_TEXT ("QUEUE:: the message queue is full on iteration %u!\n"),
342 i + 1),
343 -1);
345 if (queue.enqueue (entry) == -1)
346 ACE_ERROR_RETURN ((LM_ERROR,
347 ACE_TEXT ("QUEUE::enqueue\n")),
348 -1);
351 ACE_DEBUG ((LM_DEBUG,
352 ACE_TEXT ("\nForward Iterations\n")));
354 ITERATOR iterator (queue);
356 for (ACE_Message_Block *entry = 0;
357 iterator.next (entry) != 0;
358 iterator.advance ())
359 ACE_DEBUG ((LM_DEBUG,
360 ACE_TEXT ("%s\n"),
361 entry->base ()));
364 ACE_DEBUG ((LM_DEBUG,
365 ACE_TEXT ("\nReverse Iterations\n")));
367 REVERSE_ITERATOR iterator (queue);
369 for (ACE_Message_Block *entry = 0;
370 iterator.next (entry) != 0;
371 iterator.advance ())
372 ACE_DEBUG ((LM_DEBUG,
373 ACE_TEXT ("%s\n"),
374 entry->base ()));
377 ACE_DEBUG ((LM_DEBUG,
378 ACE_TEXT ("\nForward Iterations\n")));
380 QUEUE::ITERATOR iterator (queue);
382 for (ACE_Message_Block *entry = 0;
383 iterator.next (entry) != 0;
384 iterator.advance ())
385 ACE_DEBUG ((LM_DEBUG,
386 ACE_TEXT ("%s\n"),
387 entry->base ()));
390 ACE_DEBUG ((LM_DEBUG,
391 ACE_TEXT ("\nReverse Iterations\n")));
393 QUEUE::REVERSE_ITERATOR iterator (queue);
395 for (ACE_Message_Block *entry = 0;
396 iterator.next (entry) != 0;
397 iterator.advance ())
398 ACE_DEBUG ((LM_DEBUG,
399 ACE_TEXT ("%s\n"),
400 entry->base ()));
403 return 0;
406 #if defined (ACE_HAS_THREADS)
408 static int
409 chained_block_test ()
411 QUEUE q;
412 const char * s = "123456789"; // Will be length 10 when copied to block
413 const size_t slen = 10;
414 const size_t num_blks = 10;
415 ACE_Message_Block b[num_blks];
416 size_t i;
417 int status = 0;
419 for (i = 0; i < num_blks; ++i)
421 b[i].init (slen);
422 b[i].copy (s);
425 // Test enqueueing single and chained blocks and be sure they end up with
426 // the proper enqueued block count and sizes. Then be sure they are dequeued
427 // in the proper order.
428 b[0].next (&b[1]);
429 b[1].next (&b[2]);
430 // b[3] and b[4] are unchained.
431 b[5].next (&b[6]);
432 b[6].next (&b[7]);
433 b[7].next (&b[8]);
434 // b[9] is unchained
435 q.enqueue_tail (&b[3]);
436 q.enqueue_tail (&b[4]);
437 int num = q.enqueue_head (&b[0]);
438 if (num != 5)
440 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 5; has %d\n"),
441 num));
442 status = -1;
444 num = q.enqueue_tail (&b[5]);
445 if (num != 9)
447 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 9; has %d\n"),
448 num));
449 status = -1;
451 num = q.enqueue_tail (&b[9]);
452 if (num != 10)
454 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 10; has %d\n"),
455 num));
456 status = -1;
458 size_t msgs, bytes;
459 msgs = q.message_count ();
460 bytes = q.message_bytes ();
461 if (msgs != 10 || bytes != 100)
463 ACE_ERROR ((LM_ERROR,
464 ACE_TEXT ("Chained enqueue totals: %d msgs, %d bytes; ")
465 ACE_TEXT ("should be 10 msgs, 100 bytes\n"),
466 (int)msgs, (int)bytes));
467 status = -1;
470 // Now see if we can dequeue them, checking the order.
471 ACE_Time_Value nowait (ACE_OS::gettimeofday ());
472 ACE_Message_Block *bp;
473 int qstat;
474 for (i = 0; i < num_blks; ++i)
476 qstat = q.dequeue_head (bp, &nowait);
477 if (qstat == -1)
479 ACE_ERROR ((LM_ERROR,
480 ACE_TEXT ("Checking chained blocks, pass %d: %p\n"),
481 (int)i, ACE_TEXT ("dequeue_head")));
482 status = -1;
484 else if (bp != &b[i])
486 ACE_ERROR ((LM_ERROR,
487 ACE_TEXT ("Checking chained blocks, pass %d: ")
488 ACE_TEXT ("block out of order\n"),
489 (int)i));
490 status = -1;
494 if (status == 0)
495 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Chained block test OK\n")));
496 return status;
499 static int
500 single_thread_performance_test (int queue_type = 0)
502 const char test_message[] =
503 "ACE_Message_Queue Test Message";
504 const ACE_TCHAR *message =
505 ACE_TEXT ("ACE_Message_Queue<ACE_NULL_SYNCH>, single thread");
506 int i = 0;
508 // Create a message queue.
509 ACE_Message_Queue_Base *msgq = 0;
511 if (queue_type == 0)
512 ACE_NEW_RETURN (msgq,
513 QUEUE,
514 -1);
515 #if defined (ACE_VXWORKS)
516 else
518 ACE_NEW_RETURN (msgq,
519 ACE_Message_Queue_Vx (max_messages,
520 MAX_MESSAGE_SIZE),
521 -1);
522 message = "ACE_Message_Queue_Vx, single thread test";
524 #elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
525 else
527 ACE_NEW_RETURN (msgq,
528 ACE_Message_Queue_NT,
529 -1);
530 message = ACE_TEXT ("ACE_Message_Queue_NT, single thread test");
532 #endif /* ACE_VXWORKS */
534 // Create the messages. Allocate off the heap in case messages
535 // is large relative to the amount of stack space available.
536 ACE_Message_Block **send_block = 0;
537 ACE_NEW_RETURN (send_block,
538 ACE_Message_Block *[max_messages],
539 -1);
541 for (i = 0; i < max_messages; ++i)
542 ACE_NEW_RETURN (send_block[i],
543 ACE_Message_Block (test_message,
544 MAX_MESSAGE_SIZE),
545 -1);
547 ACE_Message_Block **receive_block_p = 0;
548 ACE_NEW_RETURN (receive_block_p,
549 ACE_Message_Block *[max_messages],
550 -1);
552 #if defined (ACE_VXWORKS)
553 // Set up blocks to receive the messages. Allocate these off the
554 // heap in case messages is large relative to the amount of
555 // stack space available.
556 ACE_Message_Block *receive_block = 0;
557 ACE_NEW_RETURN (receive_block,
558 ACE_Message_Block[max_messages],
559 -1);
561 for (i = 0; i < max_messages; ++i)
563 receive_block[i].init (MAX_MESSAGE_SIZE);
565 // For VxWorks Message Queues, the receive block pointer must be
566 // assigned. It will be used by dequeue_head ().
567 receive_block_p[i] = &receive_block[i];
569 #endif /* ACE_VXWORKS */
571 timer->start ();
573 // Send/receive the messages.
574 for (i = 0; i < max_messages; ++i)
576 if (msgq->enqueue_tail (send_block[i]) == -1)
577 ACE_ERROR_RETURN ((LM_ERROR,
578 ACE_TEXT ("%p\n"),
579 ACE_TEXT ("enqueue")),
580 -1);
582 if (msgq->dequeue_head (receive_block_p[i]) == -1)
583 ACE_ERROR_RETURN ((LM_ERROR,
584 ACE_TEXT ("%p\n"),
585 ACE_TEXT ("dequeue_head")),
586 -1);
589 timer->stop ();
591 ACE_Time_Value tv;
592 timer->elapsed_time (tv);
593 ACE_DEBUG ((LM_INFO,
594 ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
595 message,
596 max_messages,
597 tv.msec (),
598 (double) tv.msec () / max_messages));
599 timer->reset ();
601 delete [] receive_block_p;
602 #if defined (ACE_VXWORKS)
603 delete [] receive_block;
604 #endif /* ACE_VXWORKS */
606 for (i = 0; i < max_messages; ++i)
607 delete send_block[i];
608 delete [] send_block;
609 delete msgq;
611 return 0;
614 static void *
615 receiver (void *arg)
617 Queue_Wrapper *queue_wrapper =
618 reinterpret_cast<Queue_Wrapper *> (arg);
619 int i;
621 ACE_Message_Block **receive_block_p = 0;
622 ACE_NEW_RETURN (receive_block_p,
623 ACE_Message_Block *[max_messages],
624 (void *) -1);
626 #if defined (ACE_VXWORKS)
627 // Set up blocks to receive the messages. Allocate these off the
628 // heap in case messages is large relative to the amount of stack
629 // space available.
630 ACE_Message_Block *receive_block;
631 ACE_NEW_RETURN (receive_block,
632 ACE_Message_Block[max_messages],
633 (void *) -1);
635 for (i = 0; i < max_messages; ++i)
637 receive_block[i].init (MAX_MESSAGE_SIZE);
639 // For VxWorks Message Queues, the receive block pointer must be
640 // assigned. It will be used by <dequeue_head>.
641 receive_block_p[i] = &receive_block[i];
643 #endif /* ACE_VXWORKS */
645 for (i = 0; i < max_messages; ++i)
646 if (queue_wrapper->q_->dequeue_head (receive_block_p[i]) == -1)
647 ACE_ERROR_RETURN ((LM_ERROR,
648 ACE_TEXT ("%p\n"),
649 ACE_TEXT ("dequeue_head")),
651 timer->stop ();
653 delete [] receive_block_p;
654 #if defined (ACE_VXWORKS)
655 delete [] receive_block;
656 #endif /* ACE_VXWORKS */
658 return 0;
661 static void *
662 sender (void *arg)
664 Queue_Wrapper *queue_wrapper =
665 reinterpret_cast<Queue_Wrapper *> (arg);
666 int i;
668 timer->start ();
670 // Send the messages.
671 for (i = 0; i < max_messages; ++i)
672 if (queue_wrapper->q_->
673 enqueue_tail (queue_wrapper->send_block_[i]) == -1)
674 ACE_ERROR_RETURN ((LM_ERROR,
675 ACE_TEXT ("%p\n"),
676 ACE_TEXT ("enqueue")),
678 return 0;
681 static
683 performance_test (int queue_type = 0)
685 Queue_Wrapper queue_wrapper;
686 const ACE_TCHAR *message =
687 ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH>");
688 int i = 0;
690 // Create the messages. Allocate off the heap in case messages is
691 // large relative to the amount of stack space available. Allocate
692 // it here instead of in the sender, so that we can delete it after
693 // the _receiver_ is done.
694 ACE_Message_Block **send_block = 0;
695 ACE_NEW_RETURN (send_block,
696 ACE_Message_Block *[max_messages],
697 -1);
699 for (i = 0; i < max_messages; ++i)
700 ACE_NEW_RETURN (send_block[i],
701 ACE_Message_Block (test_message,
702 MAX_MESSAGE_SIZE),
703 -1);
705 queue_wrapper.send_block_ = send_block;
707 if (queue_type == 0)
708 ACE_NEW_RETURN (queue_wrapper.q_,
709 SYNCH_QUEUE,
710 -1);
711 #if defined (ACE_VXWORKS)
712 else
714 ACE_NEW_RETURN (queue_wrapper.q_,
715 ACE_Message_Queue_Vx (max_messages,
716 MAX_MESSAGE_SIZE),
717 -1);
718 message = "ACE_Message_Queue_Vx";
720 #elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
721 else
723 ACE_NEW_RETURN (queue_wrapper.q_,
724 ACE_Message_Queue_NT,
725 -1);
726 message = ACE_TEXT ("ACE_Message_Queue_NT");
728 #endif /* ACE_VXWORKS */
730 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) sender,
731 &queue_wrapper,
732 THR_BOUND) == -1)
733 ACE_ERROR_RETURN ((LM_ERROR,
734 ACE_TEXT ("%p\n"),
735 ACE_TEXT ("spawning sender thread")),
736 -1);
738 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) receiver,
739 &queue_wrapper,
740 THR_BOUND) == -1)
741 ACE_ERROR_RETURN ((LM_ERROR,
742 ACE_TEXT ("%p\n"),
743 ACE_TEXT ("spawning receiver thread")),
744 -1);
746 ACE_Thread_Manager::instance ()->wait ();
747 ACE_Time_Value tv;
748 timer->elapsed_time (tv);
749 ACE_DEBUG ((LM_INFO, ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
750 message,
751 max_messages,
752 tv.msec (),
753 (double) tv.msec () / max_messages));
754 timer->reset ();
756 delete queue_wrapper.q_;
757 queue_wrapper.q_ = 0;
759 for (i = 0; i < max_messages; ++i)
760 delete send_block[i];
761 delete [] send_block;
763 return 0;
766 // Ensure that the timedout dequeue_head() sets errno code properly.
768 static int
769 timeout_test ()
771 SYNCH_QUEUE mq;
772 int status = 0;
774 if (!mq.is_empty ())
776 ACE_ERROR ((LM_ERROR,
777 ACE_TEXT ("New queue is not empty!\n")));
778 status = 1;
780 else
782 ACE_Message_Block *b;
783 ACE_Time_Value tv (ACE_OS::gettimeofday ()); // Now
785 if (mq.dequeue_head (b, &tv) != -1)
787 ACE_ERROR ((LM_ERROR,
788 ACE_TEXT ("Dequeued from empty queue!\n")));
789 status = 1;
791 else if (errno != EWOULDBLOCK)
793 ACE_ERROR ((LM_ERROR,
794 ACE_TEXT ("%p\n"),
795 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
796 status = 1;
798 else
800 ACE_DEBUG ((LM_DEBUG,
801 ACE_TEXT ("Timed dequeue test: OK\n")));
802 status = 0; // All is well
806 return status;
808 #endif /* ACE_HAS_THREADS */
810 // Check to make sure that dequeue_prio() respects FIFO ordering.
811 // @@ At some point, this function should be enhanced to do a more
812 // thorough check...
814 static int
815 prio_test ()
817 const char S1[] = "first";
818 const char S2[] = "second";
819 const int PRIORITY = 50;
820 QUEUE mq;
821 int status;
823 ACE_Message_Block mb1 (S1, sizeof S1, PRIORITY);
824 ACE_Message_Block mb2 (S2, sizeof S2, PRIORITY);
826 mq.enqueue_prio (&mb1);
827 mq.enqueue_prio (&mb2);
829 ACE_Message_Block *mb1p = 0;
830 ACE_Message_Block *mb2p = 0;
832 mq.dequeue_prio (mb1p);
833 mq.dequeue_prio (mb2p);
835 ACE_DEBUG ((LM_DEBUG, "message 1 = %C\nmessage 2 = %C\n",
836 mb1p->rd_ptr (),
837 mb2p->rd_ptr ()));
839 if (ACE_OS::strcmp (mb1p->rd_ptr (), S1) == 0
840 && ACE_OS::strcmp (mb2p->rd_ptr (), S2) == 0)
841 status = 0;
842 else
843 status = 1;
845 return status;
848 static int
849 close_test ()
851 int status = 0;
853 int flushed_messages;
855 QUEUE mq1;
856 flushed_messages = mq1.close ();
858 if (flushed_messages != 0)
860 ACE_ERROR ((LM_ERROR,
861 ACE_TEXT ("Closing queue should flush 0 messages, close() reports - %d\n"),
862 flushed_messages ));
863 status = 1;
864 return status;
867 // There was a bug that return previous queue state instead of
868 // number of flushed messages. Thus, insert 2 messages != ACTIVATE
869 // queue state
870 ACE_Message_Block *pMB1;
871 ACE_Message_Block *pMB2;
872 ACE_NEW_NORETURN (pMB1, ACE_Message_Block (1));
873 ACE_NEW_NORETURN (pMB2, ACE_Message_Block (1));
874 QUEUE mq2;
875 mq2.enqueue_head (pMB1);
876 mq2.enqueue_head (pMB2);
877 flushed_messages = mq2.close ();
879 if (flushed_messages != 2)
881 ACE_ERROR ((LM_ERROR,
882 ACE_TEXT ("Closing queue should flush 2 messages, close() reports - %d\n"),
883 flushed_messages ));
884 status = 1;
885 return status;
887 return status;
891 run_main (int argc, ACE_TCHAR *argv[])
893 ACE_START_TEST (ACE_TEXT ("Message_Queue_Test"));
895 if (argc == 2)
897 if (!ACE_OS::strcmp (argv[1], ACE_TEXT ("-?")))
899 ACE_ERROR ((LM_ERROR,
900 ACE_TEXT ("%s/n"),
901 usage));
903 else
905 max_messages = ACE_OS::atoi (argv[1]);
909 int status = prio_test ();
911 // The iterator test occasionally causes a page fault or a hang on
912 // VxWorks.
913 if (status == 0)
914 status = iterator_test ();
916 ACE_NEW_RETURN (timer,
917 ACE_High_Res_Timer,
918 -1);
920 if (status == 0)
921 status = close_test ();
923 #if defined (ACE_HAS_THREADS)
924 if (status == 0)
925 status = timeout_test ();
927 if (status == 0)
928 status = chained_block_test ();
930 if (status == 0)
931 status = single_thread_performance_test ();
933 # if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
934 // Test ACE_Message_Queue_Vx. or ACE_Message_Queue_NT
935 if (status == 0)
936 status = single_thread_performance_test (1);
937 # endif /* ACE_VXWORKS */
939 if (status == 0)
940 status = performance_test ();
942 # if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
943 // Test ACE_Message_Queue_Vx or ACE_Message_Queue_NT
944 if (status == 0)
945 status = performance_test (1);
946 # endif /* ACE_VXWORKS */
948 if (counting_test () != 0)
949 status = -1;
950 #endif /* ACE_HAS_THREADS */
952 if (status != 0)
953 ACE_ERROR ((LM_ERROR,
954 ACE_TEXT ("%p\n"),
955 ACE_TEXT ("test failed")));
956 delete timer;
957 timer = 0;
960 ACE_END_TEST;
961 return status;