Also use Objects as part of an operation but as a result don't generate Any operation...
[ACE_TAO.git] / ACE / tests / Message_Queue_Test.cpp
blob391d73f284ae417a4a36fe9830edfa6245157cb9
2 //=============================================================================
3 /**
4 * @file Message_Queue_Test.cpp
6 * This is:
7 * 0) a test that ensures key ACE_Message_Queue features are
8 * working properly, including timeouts and priorities
9 * 1) a simple test of the ACE_Message_Queue that illustrates how to
10 * use the forward and reverse iterators
11 * 2) a simple performance measurement test for both single-threaded
12 * (null synch), thread-safe ACE_Message_Queues, and
13 * ACE_Message_Queue_Vx, which wraps VxWorks message queues
14 * 3) a test/usage example of ACE_Message_Queue_Vx
15 * 4) a test of the message counting in a message queue under load.
17 * @author Irfan Pyarali <irfan@cs.wustl.edu>
18 * @author David L. Levine <levine@cs.wustl.edu>
19 * @author and Douglas C. Schmidt <schmidt@vanderbilt.edu>
21 //=============================================================================
24 #include "test_config.h"
25 #include "ace/Atomic_Op.h"
26 #include "ace/Thread_Manager.h"
27 #include "ace/Message_Queue.h"
28 #include "ace/Message_Queue_NT.h"
29 #include "ace/Message_Queue_Vx.h"
30 #include "ace/Synch_Traits.h"
31 #include "ace/Null_Mutex.h"
32 #include "ace/Null_Condition.h"
33 #include "ace/High_Res_Timer.h"
34 #include "ace/Task.h"
35 #include "ace/OS_NS_stdio.h"
36 #include "ace/OS_NS_string.h"
37 #include "ace/OS_NS_sys_time.h"
38 #include "ace/OS_NS_unistd.h"
42 const ACE_TCHAR usage[] = ACE_TEXT ("usage: Message_Queue_Test <number of messages>\n");
44 typedef ACE_Message_Queue<ACE_NULL_SYNCH> QUEUE;
45 typedef ACE_Message_Queue_Iterator<ACE_NULL_SYNCH> ITERATOR;
46 typedef ACE_Message_Queue_Reverse_Iterator<ACE_NULL_SYNCH> REVERSE_ITERATOR;
48 #if defined (ACE_HAS_WINCE)
49 static const int MESSAGE_FACTOR = 10000;
50 #else
51 static const int MESSAGE_FACTOR = 100000;
52 #endif
53 static const int MAX_MESSAGES = 10000;
54 static const int MAX_MESSAGE_SIZE = 32;
55 static const char test_message[] = "ACE_Message_Queue Test Message";
57 static int max_messages = MAX_MESSAGES;
59 // Dynamically allocate to avoid a static.
60 static ACE_High_Res_Timer *timer = 0;
62 #if defined (ACE_HAS_THREADS)
63 typedef ACE_Message_Queue<ACE_MT_SYNCH> SYNCH_QUEUE;
65 struct Queue_Wrapper
67 // = TITLE
68 // Container for data passed to sender and receiver in
69 // performance test.
71 // = DESCRIPTION
72 // For use in multithreaded performance test.
74 ACE_Message_Queue_Base *q_;
75 // The message queue.
77 ACE_Message_Block **send_block_;
78 // Pointer to messages blocks for sender to send to reciever.
80 Queue_Wrapper (void)
81 : q_ (0), send_block_ (0)
84 // Default constructor.
87 // For the message counting test, there are two tasks, producer and consumer.
88 // Each will spawn a number of threads, and the two tasks share a queue.
89 class Counting_Test_Producer : public ACE_Task<ACE_MT_SYNCH>
91 public:
92 Counting_Test_Producer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
93 : ACE_Task<ACE_MT_SYNCH> (0, queue), sequence_ (0), produced_ (0) {}
94 virtual int svc (void);
96 ACE_Atomic_Op<ACE_Thread_Mutex, long> sequence_;
97 ACE_Atomic_Op<ACE_Thread_Mutex, long> produced_;
100 class Counting_Test_Consumer : public ACE_Task<ACE_MT_SYNCH>
102 public:
103 Counting_Test_Consumer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
104 : ACE_Task<ACE_MT_SYNCH> (0, queue), consumed_ (0) {}
105 virtual int svc (void);
107 ACE_Atomic_Op<ACE_Thread_Mutex, long> consumed_;
111 Counting_Test_Producer::svc (void)
113 // Going to produce a lot of blocks. Since we don't necessarily want them
114 // all consumed, there's no arrangement with the consumer to be sure that
115 // the same number produced will be consumed; the test check will compare
116 // the number produced, consumed, and remaining to be sure it ends up
117 // correct.
118 // Also, to be sure there's not just 1 producer and 1 consumer pinging
119 // back and forth, make the producers randomly delay between blocks.
120 ACE_OS::srand (static_cast<unsigned int> (ACE_OS::time ()));
121 int multiple = ACE_OS::rand () % 10;
122 int delay_ms = (ACE_OS::rand () % 10) / 2;
123 // The delay usually causes the test to time out in the automated
124 // regression testing. I just left it here in case it's needed someday.
125 delay_ms = 0;
126 long count = MESSAGE_FACTOR * (multiple ? multiple : 1);
127 long produced = 0;
128 // Some of the threads enqueue single blocks, others sequences.
129 long lsequence = ++(this->sequence_);
130 int seq = static_cast<int> (lsequence);
131 ACE_DEBUG ((LM_DEBUG,
132 ACE_TEXT ("(%t) Producer will enqueue %B blocks in seq of %d, ")
133 ACE_TEXT ("%d msec delay\n"),
134 (size_t)count,
135 seq,
136 delay_ms));
138 ACE_Message_Block *first = 0, *prev = 0, *b = 0;
139 ACE_Time_Value delay (0, delay_ms);
140 ACE_Time_Value timeout (10);
141 while (produced < count)
143 ACE_NEW_NORETURN (b, ACE_Message_Block (1));
144 if (b == 0)
146 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer out of memory\n")));
147 break;
149 first = b;
150 prev = first;
151 for (int s = 1; s < seq; ++s)
153 ACE_NEW_NORETURN (b, ACE_Message_Block (1));
154 if (b == 0)
155 break;
156 prev->next (b);
157 b->prev (prev);
158 prev = b;
160 if (b == 0)
162 if (first != b)
164 while (first->next () != 0)
166 b = first->next ();
167 first->release ();
168 first = b;
170 first->release ();
172 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer out of memory\n")));
173 break;
175 // To be sure we can keep going on slow or completed consumers, but not
176 // delay excessively if the consumers have stopped, limit the time
177 // spent waiting to 10 seconds.
178 ACE_Time_Value block = ACE_OS::gettimeofday ();
179 block += timeout;
180 if (this->putq (first, &block) == -1)
182 ACE_DEBUG ((LM_DEBUG,
183 ACE_TEXT ("(%t) Producer cannot putq; giving up\n")));
184 while (first->next () != 0)
186 b = first->next ();
187 first->release ();
188 first = b;
190 first->release ();
191 break;
193 produced += seq;
194 if (delay_ms)
195 ACE_OS::sleep (delay);
197 this->produced_ += produced;
198 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer done\n")));
199 return 0;
203 Counting_Test_Consumer::svc (void)
205 // Consume lots of blocks and release them. To mimic a thread with work
206 // to do, put a small random delay between dequeuing the blocks. Consume
207 // a calculated number of blocks then stop; the test checker will determine
208 // if the number consumed plus the number remaining is correct for the
209 // number produced.
210 unsigned int seed = static_cast<unsigned int> (ACE_OS::time ());
212 int multiple = ACE_OS::rand_r (&seed) % 10;
213 int delay_ms = ACE_OS::rand_r (&seed) % 10;
214 // The delay usually causes the test to time out in the automated
215 // regression testing. I just left it here in case it's needed someday.
216 delay_ms = 0;
217 long count = MESSAGE_FACTOR * (multiple ? multiple : 1);
218 long consumed = 0;
219 ACE_DEBUG ((LM_DEBUG,
220 ACE_TEXT ("(%t) Consumer will dequeue %B blocks, ")
221 ACE_TEXT ("%d msec delay\n"),
222 (size_t)count,
223 delay_ms));
224 ACE_Message_Block *b = 0;
225 ACE_Time_Value delay (0, delay_ms);
226 ACE_Time_Value timeout (2);
227 while (consumed < count)
229 // To be sure we can wait in the case of an empty queue, but not
230 // delay excessively if the producers have stopped, limit the time
231 // spent waiting to 2 seconds.
232 ACE_Time_Value block = ACE_OS::gettimeofday ();
233 block += timeout;
234 if (this->getq (b, &block) == -1)
236 if (errno == EWOULDBLOCK)
237 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Consumer timed out\n")));
238 else
239 ACE_ERROR ((LM_ERROR,
240 ACE_TEXT ("(%t) Consumer %p\n"),
241 ACE_TEXT ("getq")));
242 break;
244 ++consumed;
245 b->release ();
246 if (delay_ms)
247 ACE_OS::sleep (delay);
249 this->consumed_ += consumed;
250 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Consumer done\n")));
251 return 0;
254 static int
255 counting_test (void)
257 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Starting counting test\n")));
259 ACE_Message_Queue<ACE_MT_SYNCH> q (2 * 1024 * 1024); // 2MB high water
260 Counting_Test_Producer p (&q);
261 Counting_Test_Consumer c (&q);
262 // Activate consumers first; if the producers fail to start, consumers will
263 // stop quicker.
264 if (c.activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, 5) == -1)
265 ACE_ERROR_RETURN ((LM_ERROR,
266 ACE_TEXT ("Consumers %p\n"),
267 ACE_TEXT ("activate")),
268 -1);
269 if (p.activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, 5) == -1)
271 ACE_ERROR ((LM_ERROR,
272 ACE_TEXT ("Producers %p\n"),
273 ACE_TEXT ("activate")));
274 c.wait ();
275 return -1;
277 // Producers and consumers are both running; wait for them then
278 // check the results.
279 p.wait ();
280 c.wait ();
281 // This compare relies on the flush() method counting blocks as it
282 // walks the chain releasing them, and doesn't rely on the count.
283 int status = 0;
284 long q_count = static_cast<long> (q.message_count ());
285 long remaining = q.flush ();
286 ACE_DEBUG ((LM_DEBUG,
287 ACE_TEXT ("Queue message_count is %b; %b flushed\n"),
288 (ssize_t)q_count,
289 (ssize_t)remaining));
290 if (q_count != remaining)
292 status = -1;
293 ACE_ERROR ((LM_ERROR,
294 ACE_TEXT ("message_count and flushed should be equal!\n")));
296 long expected = p.produced_.value () - c.consumed_.value ();
297 ACE_DEBUG ((LM_DEBUG,
298 ACE_TEXT ("Produced %b, consumed %b; diff %b\n"),
299 (ssize_t)p.produced_.value (),
300 (ssize_t)c.consumed_.value (),
301 (ssize_t)expected));
302 if (expected != remaining)
304 status = -1;
305 ACE_ERROR ((LM_ERROR,
306 ACE_TEXT ("Producer-consumer diff is %b; should be %b\n"),
307 (ssize_t)expected,
308 (ssize_t)remaining));
310 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Ending counting test\n")));
311 return status;
314 #endif /* ACE_HAS_THREADS */
316 static int
317 iterator_test (void)
319 const int ITERATIONS = 5;
320 ACE_TCHAR buffer[ITERATIONS][BUFSIZ];
321 // Use queue size from of 32 Kb (more if using wide-char), instead of the
322 // default of 16 Kb (defined by ACE_Message_Queue_Base::DEFAULT_HWM),
323 // so that the test runs on machines with 8Kb pagesizes.
325 // QUEUE queue (32 * 1024 * sizeof (ACE_TCHAR));
326 QUEUE queue (sizeof(buffer));
328 int i;
330 for (i = 0; i < ITERATIONS; i++)
332 ACE_OS::snprintf (buffer[i], BUFSIZ, ACE_TEXT ("%d"), i + 1);
334 ACE_Message_Block *entry = 0;
335 ACE_NEW_RETURN (entry,
336 ACE_Message_Block ((char *) buffer[i],
337 sizeof buffer[i]),
338 -1);
340 if (queue.is_full ())
341 ACE_ERROR_RETURN ((LM_ERROR,
342 ACE_TEXT ("QUEUE:: the message queue is full on iteration %u!\n"),
343 i + 1),
344 -1);
346 if (queue.enqueue (entry) == -1)
347 ACE_ERROR_RETURN ((LM_ERROR,
348 ACE_TEXT ("QUEUE::enqueue\n")),
349 -1);
352 ACE_DEBUG ((LM_DEBUG,
353 ACE_TEXT ("\nForward Iterations\n")));
355 ITERATOR iterator (queue);
357 for (ACE_Message_Block *entry = 0;
358 iterator.next (entry) != 0;
359 iterator.advance ())
360 ACE_DEBUG ((LM_DEBUG,
361 ACE_TEXT ("%s\n"),
362 entry->base ()));
365 ACE_DEBUG ((LM_DEBUG,
366 ACE_TEXT ("\nReverse Iterations\n")));
368 REVERSE_ITERATOR iterator (queue);
370 for (ACE_Message_Block *entry = 0;
371 iterator.next (entry) != 0;
372 iterator.advance ())
373 ACE_DEBUG ((LM_DEBUG,
374 ACE_TEXT ("%s\n"),
375 entry->base ()));
378 ACE_DEBUG ((LM_DEBUG,
379 ACE_TEXT ("\nForward Iterations\n")));
381 QUEUE::ITERATOR iterator (queue);
383 for (ACE_Message_Block *entry = 0;
384 iterator.next (entry) != 0;
385 iterator.advance ())
386 ACE_DEBUG ((LM_DEBUG,
387 ACE_TEXT ("%s\n"),
388 entry->base ()));
391 ACE_DEBUG ((LM_DEBUG,
392 ACE_TEXT ("\nReverse Iterations\n")));
394 QUEUE::REVERSE_ITERATOR iterator (queue);
396 for (ACE_Message_Block *entry = 0;
397 iterator.next (entry) != 0;
398 iterator.advance ())
399 ACE_DEBUG ((LM_DEBUG,
400 ACE_TEXT ("%s\n"),
401 entry->base ()));
404 return 0;
407 #if defined (ACE_HAS_THREADS)
409 static int
410 chained_block_test (void)
413 QUEUE q;
414 const char * s = "123456789"; // Will be length 10 when copied to block
415 const size_t slen = 10;
416 const size_t num_blks = 10;
417 ACE_Message_Block b[num_blks];
418 size_t i;
419 int status = 0;
421 for (i = 0; i < num_blks; ++i)
423 b[i].init (slen);
424 b[i].copy (s);
427 // Test enqueueing single and chained blocks and be sure they end up with
428 // the proper enqueued block count and sizes. Then be sure they are dequeued
429 // in the proper order.
430 b[0].next (&b[1]);
431 b[1].next (&b[2]);
432 // b[3] and b[4] are unchained.
433 b[5].next (&b[6]);
434 b[6].next (&b[7]);
435 b[7].next (&b[8]);
436 // b[9] is unchained
437 q.enqueue_tail (&b[3]);
438 q.enqueue_tail (&b[4]);
439 int num = q.enqueue_head (&b[0]);
440 if (num != 5)
442 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 5; has %d\n"),
443 num));
444 status = -1;
446 num = q.enqueue_tail (&b[5]);
447 if (num != 9)
449 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 9; has %d\n"),
450 num));
451 status = -1;
453 num = q.enqueue_tail (&b[9]);
454 if (num != 10)
456 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 10; has %d\n"),
457 num));
458 status = -1;
460 size_t msgs, bytes;
461 msgs = q.message_count ();
462 bytes = q.message_bytes ();
463 if (msgs != 10 || bytes != 100)
465 ACE_ERROR ((LM_ERROR,
466 ACE_TEXT ("Chained enqueue totals: %d msgs, %d bytes; ")
467 ACE_TEXT ("should be 10 msgs, 100 bytes\n"),
468 (int)msgs, (int)bytes));
469 status = -1;
472 // Now see if we can dequeue them, checking the order.
473 ACE_Time_Value nowait (ACE_OS::gettimeofday ());
474 ACE_Message_Block *bp;
475 int qstat;
476 for (i = 0; i < num_blks; ++i)
478 qstat = q.dequeue_head (bp, &nowait);
479 if (qstat == -1)
481 ACE_ERROR ((LM_ERROR,
482 ACE_TEXT ("Checking chained blocks, pass %d: %p\n"),
483 (int)i, ACE_TEXT ("dequeue_head")));
484 status = -1;
486 else if (bp != &b[i])
488 ACE_ERROR ((LM_ERROR,
489 ACE_TEXT ("Checking chained blocks, pass %d: ")
490 ACE_TEXT ("block out of order\n"),
491 (int)i));
492 status = -1;
496 if (status == 0)
497 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Chained block test OK\n")));
498 return status;
501 static int
502 single_thread_performance_test (int queue_type = 0)
504 const char test_message[] =
505 "ACE_Message_Queue Test Message";
506 const ACE_TCHAR *message =
507 ACE_TEXT ("ACE_Message_Queue<ACE_NULL_SYNCH>, single thread");
508 int i = 0;
510 // Create a message queue.
511 ACE_Message_Queue_Base *msgq = 0;
513 if (queue_type == 0)
514 ACE_NEW_RETURN (msgq,
515 QUEUE,
516 -1);
517 #if defined (ACE_VXWORKS)
518 else
520 ACE_NEW_RETURN (msgq,
521 ACE_Message_Queue_Vx (max_messages,
522 MAX_MESSAGE_SIZE),
523 -1);
524 message = "ACE_Message_Queue_Vx, single thread test";
526 #elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
527 else
529 ACE_NEW_RETURN (msgq,
530 ACE_Message_Queue_NT,
531 -1);
532 message = ACE_TEXT ("ACE_Message_Queue_NT, single thread test");
534 #endif /* ACE_VXWORKS */
536 // Create the messages. Allocate off the heap in case messages
537 // is large relative to the amount of stack space available.
538 ACE_Message_Block **send_block = 0;
539 ACE_NEW_RETURN (send_block,
540 ACE_Message_Block *[max_messages],
541 -1);
543 for (i = 0; i < max_messages; ++i)
544 ACE_NEW_RETURN (send_block[i],
545 ACE_Message_Block (test_message,
546 MAX_MESSAGE_SIZE),
547 -1);
549 ACE_Message_Block **receive_block_p = 0;
550 ACE_NEW_RETURN (receive_block_p,
551 ACE_Message_Block *[max_messages],
552 -1);
554 #if defined (ACE_VXWORKS)
555 // Set up blocks to receive the messages. Allocate these off the
556 // heap in case messages is large relative to the amount of
557 // stack space available.
558 ACE_Message_Block *receive_block = 0;
559 ACE_NEW_RETURN (receive_block,
560 ACE_Message_Block[max_messages],
561 -1);
563 for (i = 0; i < max_messages; ++i)
565 receive_block[i].init (MAX_MESSAGE_SIZE);
567 // For VxWorks Message Queues, the receive block pointer must be
568 // assigned. It will be used by dequeue_head ().
569 receive_block_p[i] = &receive_block[i];
571 #endif /* ACE_VXWORKS */
573 timer->start ();
575 // Send/receive the messages.
576 for (i = 0; i < max_messages; ++i)
578 if (msgq->enqueue_tail (send_block[i]) == -1)
579 ACE_ERROR_RETURN ((LM_ERROR,
580 ACE_TEXT ("%p\n"),
581 ACE_TEXT ("enqueue")),
582 -1);
584 if (msgq->dequeue_head (receive_block_p[i]) == -1)
585 ACE_ERROR_RETURN ((LM_ERROR,
586 ACE_TEXT ("%p\n"),
587 ACE_TEXT ("dequeue_head")),
588 -1);
591 timer->stop ();
593 ACE_Time_Value tv;
594 timer->elapsed_time (tv);
595 ACE_DEBUG ((LM_INFO,
596 ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
597 message,
598 max_messages,
599 tv.msec (),
600 (double) tv.msec () / max_messages));
601 timer->reset ();
603 delete [] receive_block_p;
604 #if defined (ACE_VXWORKS)
605 delete [] receive_block;
606 #endif /* ACE_VXWORKS */
608 for (i = 0; i < max_messages; ++i)
609 delete send_block[i];
610 delete [] send_block;
611 delete msgq;
613 return 0;
616 static void *
617 receiver (void *arg)
619 Queue_Wrapper *queue_wrapper =
620 reinterpret_cast<Queue_Wrapper *> (arg);
621 int i;
623 ACE_Message_Block **receive_block_p = 0;
624 ACE_NEW_RETURN (receive_block_p,
625 ACE_Message_Block *[max_messages],
626 (void *) -1);
628 #if defined (ACE_VXWORKS)
629 // Set up blocks to receive the messages. Allocate these off the
630 // heap in case messages is large relative to the amount of stack
631 // space available.
632 ACE_Message_Block *receive_block;
633 ACE_NEW_RETURN (receive_block,
634 ACE_Message_Block[max_messages],
635 (void *) -1);
637 for (i = 0; i < max_messages; ++i)
639 receive_block[i].init (MAX_MESSAGE_SIZE);
641 // For VxWorks Message Queues, the receive block pointer must be
642 // assigned. It will be used by <dequeue_head>.
643 receive_block_p[i] = &receive_block[i];
645 #endif /* ACE_VXWORKS */
647 for (i = 0; i < max_messages; ++i)
648 if (queue_wrapper->q_->dequeue_head (receive_block_p[i]) == -1)
649 ACE_ERROR_RETURN ((LM_ERROR,
650 ACE_TEXT ("%p\n"),
651 ACE_TEXT ("dequeue_head")),
653 timer->stop ();
655 delete [] receive_block_p;
656 #if defined (ACE_VXWORKS)
657 delete [] receive_block;
658 #endif /* ACE_VXWORKS */
660 return 0;
663 static void *
664 sender (void *arg)
666 Queue_Wrapper *queue_wrapper =
667 reinterpret_cast<Queue_Wrapper *> (arg);
668 int i;
670 timer->start ();
672 // Send the messages.
673 for (i = 0; i < max_messages; ++i)
674 if (queue_wrapper->q_->
675 enqueue_tail (queue_wrapper->send_block_[i]) == -1)
676 ACE_ERROR_RETURN ((LM_ERROR,
677 ACE_TEXT ("%p\n"),
678 ACE_TEXT ("enqueue")),
680 return 0;
683 static
685 performance_test (int queue_type = 0)
687 Queue_Wrapper queue_wrapper;
688 const ACE_TCHAR *message =
689 ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH>");
690 int i = 0;
692 // Create the messages. Allocate off the heap in case messages is
693 // large relative to the amount of stack space available. Allocate
694 // it here instead of in the sender, so that we can delete it after
695 // the _receiver_ is done.
696 ACE_Message_Block **send_block = 0;
697 ACE_NEW_RETURN (send_block,
698 ACE_Message_Block *[max_messages],
699 -1);
701 for (i = 0; i < max_messages; ++i)
702 ACE_NEW_RETURN (send_block[i],
703 ACE_Message_Block (test_message,
704 MAX_MESSAGE_SIZE),
705 -1);
707 queue_wrapper.send_block_ = send_block;
709 if (queue_type == 0)
710 ACE_NEW_RETURN (queue_wrapper.q_,
711 SYNCH_QUEUE,
712 -1);
713 #if defined (ACE_VXWORKS)
714 else
716 ACE_NEW_RETURN (queue_wrapper.q_,
717 ACE_Message_Queue_Vx (max_messages,
718 MAX_MESSAGE_SIZE),
719 -1);
720 message = "ACE_Message_Queue_Vx";
722 #elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
723 else
725 ACE_NEW_RETURN (queue_wrapper.q_,
726 ACE_Message_Queue_NT,
727 -1);
728 message = ACE_TEXT ("ACE_Message_Queue_NT");
730 #endif /* ACE_VXWORKS */
732 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) sender,
733 &queue_wrapper,
734 THR_BOUND) == -1)
735 ACE_ERROR_RETURN ((LM_ERROR,
736 ACE_TEXT ("%p\n"),
737 ACE_TEXT ("spawning sender thread")),
738 -1);
740 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) receiver,
741 &queue_wrapper,
742 THR_BOUND) == -1)
743 ACE_ERROR_RETURN ((LM_ERROR,
744 ACE_TEXT ("%p\n"),
745 ACE_TEXT ("spawning receiver thread")),
746 -1);
748 ACE_Thread_Manager::instance ()->wait ();
749 ACE_Time_Value tv;
750 timer->elapsed_time (tv);
751 ACE_DEBUG ((LM_INFO, ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
752 message,
753 max_messages,
754 tv.msec (),
755 (double) tv.msec () / max_messages));
756 timer->reset ();
758 delete queue_wrapper.q_;
759 queue_wrapper.q_ = 0;
761 for (i = 0; i < max_messages; ++i)
762 delete send_block[i];
763 delete [] send_block;
765 return 0;
768 // Ensure that the timedout dequeue_head() sets errno code properly.
770 static int
771 timeout_test (void)
773 SYNCH_QUEUE mq;
774 int status = 0;
776 if (!mq.is_empty ())
778 ACE_ERROR ((LM_ERROR,
779 ACE_TEXT ("New queue is not empty!\n")));
780 status = 1;
782 else
784 ACE_Message_Block *b;
785 ACE_Time_Value tv (ACE_OS::gettimeofday ()); // Now
787 if (mq.dequeue_head (b, &tv) != -1)
789 ACE_ERROR ((LM_ERROR,
790 ACE_TEXT ("Dequeued from empty queue!\n")));
791 status = 1;
793 else if (errno != EWOULDBLOCK)
795 ACE_ERROR ((LM_ERROR,
796 ACE_TEXT ("%p\n"),
797 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
798 status = 1;
800 else
802 ACE_DEBUG ((LM_DEBUG,
803 ACE_TEXT ("Timed dequeue test: OK\n")));
804 status = 0; // All is well
808 return status;
810 #endif /* ACE_HAS_THREADS */
812 // Check to make sure that dequeue_prio() respects FIFO ordering.
813 // @@ At some point, this function should be enhanced to do a more
814 // thorough check...
816 static int
817 prio_test (void)
819 const char S1[] = "first";
820 const char S2[] = "second";
821 const int PRIORITY = 50;
822 QUEUE mq;
823 int status;
825 ACE_Message_Block mb1 (S1, sizeof S1, PRIORITY);
826 ACE_Message_Block mb2 (S2, sizeof S2, PRIORITY);
828 mq.enqueue_prio (&mb1);
829 mq.enqueue_prio (&mb2);
831 ACE_Message_Block *mb1p = 0;
832 ACE_Message_Block *mb2p = 0;
834 mq.dequeue_prio (mb1p);
835 mq.dequeue_prio (mb2p);
837 ACE_DEBUG ((LM_DEBUG, "message 1 = %C\nmessage 2 = %C\n",
838 mb1p->rd_ptr (),
839 mb2p->rd_ptr ()));
841 if (ACE_OS::strcmp (mb1p->rd_ptr (), S1) == 0
842 && ACE_OS::strcmp (mb2p->rd_ptr (), S2) == 0)
843 status = 0;
844 else
845 status = 1;
847 return status;
850 static int
851 close_test (void)
853 int status = 0;
855 int flushed_messages;
857 QUEUE mq1;
858 flushed_messages = mq1.close ();
860 if (flushed_messages != 0)
862 ACE_ERROR ((LM_ERROR,
863 ACE_TEXT ("Closing queue should flush 0 messages, close() reports - %d\n"),
864 flushed_messages ));
865 status = 1;
866 return status;
869 // There was a bug that return previous queue state instead of
870 // number of flushed messages. Thus, insert 2 messages != ACTIVATE
871 // queue state
872 ACE_Message_Block *pMB1;
873 ACE_Message_Block *pMB2;
874 ACE_NEW_NORETURN (pMB1, ACE_Message_Block (1));
875 ACE_NEW_NORETURN (pMB2, ACE_Message_Block (1));
876 QUEUE mq2;
877 mq2.enqueue_head (pMB1);
878 mq2.enqueue_head (pMB2);
879 flushed_messages = mq2.close ();
881 if (flushed_messages != 2)
883 ACE_ERROR ((LM_ERROR,
884 ACE_TEXT ("Closing queue should flush 2 messages, close() reports - %d\n"),
885 flushed_messages ));
886 status = 1;
887 return status;
889 return status;
893 run_main (int argc, ACE_TCHAR *argv[])
895 ACE_START_TEST (ACE_TEXT ("Message_Queue_Test"));
897 if (argc == 2)
899 if (!ACE_OS::strcmp (argv[1], ACE_TEXT ("-?")))
901 ACE_ERROR ((LM_ERROR,
902 ACE_TEXT ("%s/n"),
903 usage));
905 else
907 max_messages = ACE_OS::atoi (argv[1]);
911 int status = prio_test ();
913 // The iterator test occasionally causes a page fault or a hang on
914 // VxWorks.
915 if (status == 0)
916 status = iterator_test ();
918 ACE_NEW_RETURN (timer,
919 ACE_High_Res_Timer,
920 -1);
922 if (status == 0)
923 status = close_test ();
925 #if defined (ACE_HAS_THREADS)
926 if (status == 0)
927 status = timeout_test ();
929 if (status == 0)
930 status = chained_block_test ();
932 if (status == 0)
933 status = single_thread_performance_test ();
935 # if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
936 // Test ACE_Message_Queue_Vx. or ACE_Message_Queue_NT
937 if (status == 0)
938 status = single_thread_performance_test (1);
939 # endif /* ACE_VXWORKS */
941 if (status == 0)
942 status = performance_test ();
944 # if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
945 // Test ACE_Message_Queue_Vx or ACE_Message_Queue_NT
946 if (status == 0)
947 status = performance_test (1);
948 # endif /* ACE_VXWORKS */
950 if (counting_test () != 0)
951 status = -1;
952 #endif /* ACE_HAS_THREADS */
954 if (status != 0)
955 ACE_ERROR ((LM_ERROR,
956 ACE_TEXT ("%p\n"),
957 ACE_TEXT ("test failed")));
958 delete timer;
959 timer = 0;
963 ACE_END_TEST;
964 return status;