Changes to attempt to silence bcc64x
[ACE_TAO.git] / ACE / tests / Message_Queue_Test.cpp
blobbe63d886a1a6d3775a6b706645cae17a2904d444
2 //=============================================================================
3 /**
4 * @file Message_Queue_Test.cpp
6 * This is:
7 * 0) a test that ensures key ACE_Message_Queue features are
8 * working properly, including timeouts and priorities
9 * 1) a simple test of the ACE_Message_Queue that illustrates how to
10 * use the forward and reverse iterators
11 * 2) a simple performance measurement test for both single-threaded
12 * (null synch), thread-safe ACE_Message_Queues, and
13 * ACE_Message_Queue_Vx, which wraps VxWorks message queues
14 * 3) a test/usage example of ACE_Message_Queue_Vx
15 * 4) a test of the message counting in a message queue under load.
17 * @author Irfan Pyarali <irfan@cs.wustl.edu>
18 * @author David L. Levine <levine@cs.wustl.edu>
19 * @author and Douglas C. Schmidt <schmidt@vanderbilt.edu>
21 //=============================================================================
24 #include "test_config.h"
25 #include "ace/Atomic_Op.h"
26 #include "ace/Thread_Manager.h"
27 #include "ace/Message_Queue.h"
28 #include "ace/Message_Queue_NT.h"
29 #include "ace/Message_Queue_Vx.h"
30 #include "ace/Synch_Traits.h"
31 #include "ace/Null_Mutex.h"
32 #include "ace/Null_Condition.h"
33 #include "ace/High_Res_Timer.h"
34 #include "ace/Task.h"
35 #include "ace/OS_NS_stdio.h"
36 #include "ace/OS_NS_string.h"
37 #include "ace/OS_NS_sys_time.h"
38 #include "ace/OS_NS_unistd.h"
41 const ACE_TCHAR usage[] = ACE_TEXT ("usage: Message_Queue_Test <number of messages>\n");
43 using QUEUE = ACE_Message_Queue<ACE_NULL_SYNCH>;
44 using ITERATOR = ACE_Message_Queue_Iterator<ACE_NULL_SYNCH>;
45 using REVERSE_ITERATOR = ACE_Message_Queue_Reverse_Iterator<ACE_NULL_SYNCH>;
47 static const int MESSAGE_FACTOR = 100000;
48 static const int MAX_MESSAGES = 10000;
49 static const int MAX_MESSAGE_SIZE = 32;
50 static const char test_message[] = "ACE_Message_Queue Test Message";
52 static int max_messages = MAX_MESSAGES;
54 // Dynamically allocate to avoid a static.
55 static ACE_High_Res_Timer *timer = 0;
57 #if defined (ACE_HAS_THREADS)
58 using SYNCH_QUEUE = ACE_Message_Queue<ACE_MT_SYNCH>;
60 struct Queue_Wrapper
62 // = TITLE
63 // Container for data passed to sender and receiver in
64 // performance test.
66 // = DESCRIPTION
67 // For use in multithreaded performance test.
69 ACE_Message_Queue_Base *q_;
70 // The message queue.
72 ACE_Message_Block **send_block_;
73 // Pointer to messages blocks for sender to send to reciever.
75 Queue_Wrapper ()
76 : q_ (0), send_block_ (0)
79 // Default constructor.
82 // For the message counting test, there are two tasks, producer and consumer.
83 // Each will spawn a number of threads, and the two tasks share a queue.
84 class Counting_Test_Producer : public ACE_Task<ACE_MT_SYNCH>
86 public:
87 Counting_Test_Producer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
88 : ACE_Task<ACE_MT_SYNCH> (0, queue), sequence_ (0), produced_ (0) {}
89 int svc () override;
91 ACE_Atomic_Op<ACE_Thread_Mutex, long> sequence_;
92 ACE_Atomic_Op<ACE_Thread_Mutex, long> produced_;
95 class Counting_Test_Consumer : public ACE_Task<ACE_MT_SYNCH>
97 public:
98 Counting_Test_Consumer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
99 : ACE_Task<ACE_MT_SYNCH> (0, queue), consumed_ (0) {}
100 int svc () override;
102 ACE_Atomic_Op<ACE_Thread_Mutex, long> consumed_;
106 Counting_Test_Producer::svc ()
108 // Going to produce a lot of blocks. Since we don't necessarily want them
109 // all consumed, there's no arrangement with the consumer to be sure that
110 // the same number produced will be consumed; the test check will compare
111 // the number produced, consumed, and remaining to be sure it ends up
112 // correct.
113 // Also, to be sure there's not just 1 producer and 1 consumer pinging
114 // back and forth, make the producers randomly delay between blocks.
115 ACE_OS::srand (static_cast<unsigned int> (ACE_OS::time ()));
116 int multiple = ACE_OS::rand () % 10;
117 int delay_ms = (ACE_OS::rand () % 10) / 2;
118 // The delay usually causes the test to time out in the automated
119 // regression testing. I just left it here in case it's needed someday.
120 delay_ms = 0;
121 long count = MESSAGE_FACTOR * (multiple ? multiple : 1);
122 long produced = 0;
123 // Some of the threads enqueue single blocks, others sequences.
124 long lsequence = ++(this->sequence_);
125 int seq = static_cast<int> (lsequence);
126 ACE_DEBUG ((LM_DEBUG,
127 ACE_TEXT ("(%t) Producer will enqueue %B blocks in seq of %d, ")
128 ACE_TEXT ("%d msec delay\n"),
129 (size_t)count,
130 seq,
131 delay_ms));
133 ACE_Message_Block *first = 0, *prev = 0, *b = 0;
134 ACE_Time_Value delay (0, delay_ms);
135 ACE_Time_Value timeout (10);
136 while (produced < count)
138 ACE_NEW_NORETURN (b, ACE_Message_Block (1));
139 if (b == 0)
141 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer out of memory\n")));
142 break;
144 first = b;
145 prev = first;
146 for (int s = 1; s < seq; ++s)
148 ACE_NEW_NORETURN (b, ACE_Message_Block (1));
149 if (b == 0)
150 break;
151 prev->next (b);
152 b->prev (prev);
153 prev = b;
155 if (b == 0)
157 if (first != b)
159 while (first->next () != 0)
161 b = first->next ();
162 first->release ();
163 first = b;
165 first->release ();
167 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer out of memory\n")));
168 break;
170 // To be sure we can keep going on slow or completed consumers, but not
171 // delay excessively if the consumers have stopped, limit the time
172 // spent waiting to 10 seconds.
173 ACE_Time_Value block = ACE_OS::gettimeofday ();
174 block += timeout;
175 if (this->putq (first, &block) == -1)
177 ACE_DEBUG ((LM_DEBUG,
178 ACE_TEXT ("(%t) Producer cannot putq; giving up\n")));
179 while (first->next () != 0)
181 b = first->next ();
182 first->release ();
183 first = b;
185 first->release ();
186 break;
188 produced += seq;
189 if (delay_ms)
190 ACE_OS::sleep (delay);
192 this->produced_ += produced;
193 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Producer done\n")));
194 return 0;
198 Counting_Test_Consumer::svc ()
200 // Consume lots of blocks and release them. To mimic a thread with work
201 // to do, put a small random delay between dequeuing the blocks. Consume
202 // a calculated number of blocks then stop; the test checker will determine
203 // if the number consumed plus the number remaining is correct for the
204 // number produced.
205 unsigned int seed = static_cast<unsigned int> (ACE_OS::time ());
207 int multiple = ACE_OS::rand_r (&seed) % 10;
208 int delay_ms = ACE_OS::rand_r (&seed) % 10;
209 // The delay usually causes the test to time out in the automated
210 // regression testing. I just left it here in case it's needed someday.
211 delay_ms = 0;
212 long count = MESSAGE_FACTOR * (multiple ? multiple : 1);
213 long consumed = 0;
214 ACE_DEBUG ((LM_DEBUG,
215 ACE_TEXT ("(%t) Consumer will dequeue %B blocks, ")
216 ACE_TEXT ("%d msec delay\n"),
217 (size_t)count,
218 delay_ms));
219 ACE_Message_Block *b = 0;
220 ACE_Time_Value delay (0, delay_ms);
221 ACE_Time_Value timeout (2);
222 while (consumed < count)
224 // To be sure we can wait in the case of an empty queue, but not
225 // delay excessively if the producers have stopped, limit the time
226 // spent waiting to 2 seconds.
227 ACE_Time_Value block = ACE_OS::gettimeofday ();
228 block += timeout;
229 if (this->getq (b, &block) == -1)
231 if (errno == EWOULDBLOCK)
232 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Consumer timed out\n")));
233 else
234 ACE_ERROR ((LM_ERROR,
235 ACE_TEXT ("(%t) Consumer %p\n"),
236 ACE_TEXT ("getq")));
237 break;
239 ++consumed;
240 b->release ();
241 if (delay_ms)
242 ACE_OS::sleep (delay);
244 this->consumed_ += consumed;
245 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Consumer done\n")));
246 return 0;
249 static int
250 counting_test ()
252 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Starting counting test\n")));
254 ACE_Message_Queue<ACE_MT_SYNCH> q (2 * 1024 * 1024); // 2MB high water
255 Counting_Test_Producer p (&q);
256 Counting_Test_Consumer c (&q);
257 // Activate consumers first; if the producers fail to start, consumers will
258 // stop quicker.
259 if (c.activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, 5) == -1)
260 ACE_ERROR_RETURN ((LM_ERROR,
261 ACE_TEXT ("Consumers %p\n"),
262 ACE_TEXT ("activate")),
263 -1);
264 if (p.activate (THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, 5) == -1)
266 ACE_ERROR ((LM_ERROR,
267 ACE_TEXT ("Producers %p\n"),
268 ACE_TEXT ("activate")));
269 c.wait ();
270 return -1;
272 // Producers and consumers are both running; wait for them then
273 // check the results.
274 p.wait ();
275 c.wait ();
276 // This compare relies on the flush() method counting blocks as it
277 // walks the chain releasing them, and doesn't rely on the count.
278 int status = 0;
279 long q_count = static_cast<long> (q.message_count ());
280 long remaining = q.flush ();
281 ACE_DEBUG ((LM_DEBUG,
282 ACE_TEXT ("Queue message_count is %b; %b flushed\n"),
283 (ssize_t)q_count,
284 (ssize_t)remaining));
285 if (q_count != remaining)
287 status = -1;
288 ACE_ERROR ((LM_ERROR,
289 ACE_TEXT ("message_count and flushed should be equal!\n")));
291 long expected = p.produced_.value () - c.consumed_.value ();
292 ACE_DEBUG ((LM_DEBUG,
293 ACE_TEXT ("Produced %b, consumed %b; diff %b\n"),
294 (ssize_t)p.produced_.value (),
295 (ssize_t)c.consumed_.value (),
296 (ssize_t)expected));
297 if (expected != remaining)
299 status = -1;
300 ACE_ERROR ((LM_ERROR,
301 ACE_TEXT ("Producer-consumer diff is %b; should be %b\n"),
302 (ssize_t)expected,
303 (ssize_t)remaining));
305 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Ending counting test\n")));
306 return status;
309 #endif /* ACE_HAS_THREADS */
311 static int
312 iterator_test ()
314 const int ITERATIONS = 5;
315 ACE_TCHAR buffer[ITERATIONS][BUFSIZ];
316 // Use queue size from of 32 Kb (more if using wide-char), instead of the
317 // default of 16 Kb (defined by ACE_Message_Queue_Base::DEFAULT_HWM),
318 // so that the test runs on machines with 8Kb pagesizes.
320 // QUEUE queue (32 * 1024 * sizeof (ACE_TCHAR));
321 QUEUE queue (sizeof(buffer));
323 int i;
325 for (i = 0; i < ITERATIONS; i++)
327 ACE_OS::snprintf (buffer[i], BUFSIZ, ACE_TEXT ("%d"), i + 1);
329 ACE_Message_Block *entry = 0;
330 ACE_NEW_RETURN (entry,
331 ACE_Message_Block ((char *) buffer[i],
332 sizeof buffer[i]),
333 -1);
335 if (queue.is_full ())
336 ACE_ERROR_RETURN ((LM_ERROR,
337 ACE_TEXT ("QUEUE:: the message queue is full on iteration %u!\n"),
338 i + 1),
339 -1);
341 if (queue.enqueue (entry) == -1)
342 ACE_ERROR_RETURN ((LM_ERROR,
343 ACE_TEXT ("QUEUE::enqueue\n")),
344 -1);
347 ACE_DEBUG ((LM_DEBUG,
348 ACE_TEXT ("\nForward Iterations\n")));
350 ITERATOR iterator (queue);
352 for (ACE_Message_Block *entry = 0;
353 iterator.next (entry) != 0;
354 iterator.advance ())
355 ACE_DEBUG ((LM_DEBUG,
356 ACE_TEXT ("%s\n"),
357 entry->base ()));
360 ACE_DEBUG ((LM_DEBUG,
361 ACE_TEXT ("\nReverse Iterations\n")));
363 REVERSE_ITERATOR iterator (queue);
365 for (ACE_Message_Block *entry = 0;
366 iterator.next (entry) != 0;
367 iterator.advance ())
368 ACE_DEBUG ((LM_DEBUG,
369 ACE_TEXT ("%s\n"),
370 entry->base ()));
373 ACE_DEBUG ((LM_DEBUG,
374 ACE_TEXT ("\nForward Iterations\n")));
376 QUEUE::ITERATOR iterator (queue);
378 for (ACE_Message_Block *entry = 0;
379 iterator.next (entry) != 0;
380 iterator.advance ())
381 ACE_DEBUG ((LM_DEBUG,
382 ACE_TEXT ("%s\n"),
383 entry->base ()));
386 ACE_DEBUG ((LM_DEBUG,
387 ACE_TEXT ("\nReverse Iterations\n")));
389 QUEUE::REVERSE_ITERATOR iterator (queue);
391 for (ACE_Message_Block *entry = 0;
392 iterator.next (entry) != 0;
393 iterator.advance ())
394 ACE_DEBUG ((LM_DEBUG,
395 ACE_TEXT ("%s\n"),
396 entry->base ()));
399 return 0;
402 #if defined (ACE_HAS_THREADS)
404 static int
405 chained_block_test ()
407 QUEUE q;
408 const char * s = "123456789"; // Will be length 10 when copied to block
409 const size_t slen = 10;
410 const size_t num_blks = 10;
411 ACE_Message_Block b[num_blks];
412 size_t i;
413 int status = 0;
415 for (i = 0; i < num_blks; ++i)
417 b[i].init (slen);
418 b[i].copy (s);
421 // Test enqueueing single and chained blocks and be sure they end up with
422 // the proper enqueued block count and sizes. Then be sure they are dequeued
423 // in the proper order.
424 b[0].next (&b[1]);
425 b[1].next (&b[2]);
426 // b[3] and b[4] are unchained.
427 b[5].next (&b[6]);
428 b[6].next (&b[7]);
429 b[7].next (&b[8]);
430 // b[9] is unchained
431 q.enqueue_tail (&b[3]);
432 q.enqueue_tail (&b[4]);
433 int num = q.enqueue_head (&b[0]);
434 if (num != 5)
436 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 5; has %d\n"),
437 num));
438 status = -1;
440 num = q.enqueue_tail (&b[5]);
441 if (num != 9)
443 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 9; has %d\n"),
444 num));
445 status = -1;
447 num = q.enqueue_tail (&b[9]);
448 if (num != 10)
450 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 10; has %d\n"),
451 num));
452 status = -1;
454 size_t msgs, bytes;
455 msgs = q.message_count ();
456 bytes = q.message_bytes ();
457 if (msgs != 10 || bytes != 100)
459 ACE_ERROR ((LM_ERROR,
460 ACE_TEXT ("Chained enqueue totals: %d msgs, %d bytes; ")
461 ACE_TEXT ("should be 10 msgs, 100 bytes\n"),
462 (int)msgs, (int)bytes));
463 status = -1;
466 // Now see if we can dequeue them, checking the order.
467 ACE_Time_Value nowait (ACE_OS::gettimeofday ());
468 ACE_Message_Block *bp;
469 int qstat;
470 for (i = 0; i < num_blks; ++i)
472 qstat = q.dequeue_head (bp, &nowait);
473 if (qstat == -1)
475 ACE_ERROR ((LM_ERROR,
476 ACE_TEXT ("Checking chained blocks, pass %d: %p\n"),
477 (int)i, ACE_TEXT ("dequeue_head")));
478 status = -1;
480 else if (bp != &b[i])
482 ACE_ERROR ((LM_ERROR,
483 ACE_TEXT ("Checking chained blocks, pass %d: ")
484 ACE_TEXT ("block out of order\n"),
485 (int)i));
486 status = -1;
490 if (status == 0)
491 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Chained block test OK\n")));
492 return status;
495 static int
496 single_thread_performance_test (int queue_type = 0)
498 const char test_message[] =
499 "ACE_Message_Queue Test Message";
500 const ACE_TCHAR *message =
501 ACE_TEXT ("ACE_Message_Queue<ACE_NULL_SYNCH>, single thread");
502 int i = 0;
504 // Create a message queue.
505 ACE_Message_Queue_Base *msgq = 0;
507 if (queue_type == 0)
508 ACE_NEW_RETURN (msgq,
509 QUEUE,
510 -1);
511 #if defined (ACE_VXWORKS)
512 else
514 ACE_NEW_RETURN (msgq,
515 ACE_Message_Queue_Vx (max_messages,
516 MAX_MESSAGE_SIZE),
517 -1);
518 message = "ACE_Message_Queue_Vx, single thread test";
520 #elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
521 else
523 ACE_NEW_RETURN (msgq,
524 ACE_Message_Queue_NT,
525 -1);
526 message = ACE_TEXT ("ACE_Message_Queue_NT, single thread test");
528 #endif /* ACE_VXWORKS */
530 // Create the messages. Allocate off the heap in case messages
531 // is large relative to the amount of stack space available.
532 ACE_Message_Block **send_block = 0;
533 ACE_NEW_RETURN (send_block,
534 ACE_Message_Block *[max_messages],
535 -1);
537 for (i = 0; i < max_messages; ++i)
538 ACE_NEW_RETURN (send_block[i],
539 ACE_Message_Block (test_message,
540 MAX_MESSAGE_SIZE),
541 -1);
543 ACE_Message_Block **receive_block_p = 0;
544 ACE_NEW_RETURN (receive_block_p,
545 ACE_Message_Block *[max_messages],
546 -1);
548 #if defined (ACE_VXWORKS)
549 // Set up blocks to receive the messages. Allocate these off the
550 // heap in case messages is large relative to the amount of
551 // stack space available.
552 ACE_Message_Block *receive_block = 0;
553 ACE_NEW_RETURN (receive_block,
554 ACE_Message_Block[max_messages],
555 -1);
557 for (i = 0; i < max_messages; ++i)
559 receive_block[i].init (MAX_MESSAGE_SIZE);
561 // For VxWorks Message Queues, the receive block pointer must be
562 // assigned. It will be used by dequeue_head ().
563 receive_block_p[i] = &receive_block[i];
565 #endif /* ACE_VXWORKS */
567 timer->start ();
569 // Send/receive the messages.
570 for (i = 0; i < max_messages; ++i)
572 if (msgq->enqueue_tail (send_block[i]) == -1)
573 ACE_ERROR_RETURN ((LM_ERROR,
574 ACE_TEXT ("%p\n"),
575 ACE_TEXT ("enqueue")),
576 -1);
578 if (msgq->dequeue_head (receive_block_p[i]) == -1)
579 ACE_ERROR_RETURN ((LM_ERROR,
580 ACE_TEXT ("%p\n"),
581 ACE_TEXT ("dequeue_head")),
582 -1);
585 timer->stop ();
587 ACE_Time_Value tv;
588 timer->elapsed_time (tv);
589 ACE_DEBUG ((LM_INFO,
590 ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
591 message,
592 max_messages,
593 tv.msec (),
594 (double) tv.msec () / max_messages));
595 timer->reset ();
597 delete [] receive_block_p;
598 #if defined (ACE_VXWORKS)
599 delete [] receive_block;
600 #endif /* ACE_VXWORKS */
602 for (i = 0; i < max_messages; ++i)
603 delete send_block[i];
604 delete [] send_block;
605 delete msgq;
607 return 0;
610 static void *
611 receiver (void *arg)
613 Queue_Wrapper *queue_wrapper =
614 reinterpret_cast<Queue_Wrapper *> (arg);
615 int i;
617 ACE_Message_Block **receive_block_p = 0;
618 ACE_NEW_RETURN (receive_block_p,
619 ACE_Message_Block *[max_messages],
620 (void *) -1);
622 #if defined (ACE_VXWORKS)
623 // Set up blocks to receive the messages. Allocate these off the
624 // heap in case messages is large relative to the amount of stack
625 // space available.
626 ACE_Message_Block *receive_block;
627 ACE_NEW_RETURN (receive_block,
628 ACE_Message_Block[max_messages],
629 (void *) -1);
631 for (i = 0; i < max_messages; ++i)
633 receive_block[i].init (MAX_MESSAGE_SIZE);
635 // For VxWorks Message Queues, the receive block pointer must be
636 // assigned. It will be used by <dequeue_head>.
637 receive_block_p[i] = &receive_block[i];
639 #endif /* ACE_VXWORKS */
641 for (i = 0; i < max_messages; ++i)
642 if (queue_wrapper->q_->dequeue_head (receive_block_p[i]) == -1)
643 ACE_ERROR_RETURN ((LM_ERROR,
644 ACE_TEXT ("%p\n"),
645 ACE_TEXT ("dequeue_head")),
647 timer->stop ();
649 delete [] receive_block_p;
650 #if defined (ACE_VXWORKS)
651 delete [] receive_block;
652 #endif /* ACE_VXWORKS */
654 return 0;
657 static void *
658 sender (void *arg)
660 Queue_Wrapper *queue_wrapper =
661 reinterpret_cast<Queue_Wrapper *> (arg);
662 int i;
664 timer->start ();
666 // Send the messages.
667 for (i = 0; i < max_messages; ++i)
668 if (queue_wrapper->q_->
669 enqueue_tail (queue_wrapper->send_block_[i]) == -1)
670 ACE_ERROR_RETURN ((LM_ERROR,
671 ACE_TEXT ("%p\n"),
672 ACE_TEXT ("enqueue")),
674 return 0;
677 static
679 performance_test (int queue_type = 0)
681 Queue_Wrapper queue_wrapper;
682 const ACE_TCHAR *message =
683 ACE_TEXT ("ACE_Message_Queue<ACE_SYNCH>");
684 int i = 0;
686 // Create the messages. Allocate off the heap in case messages is
687 // large relative to the amount of stack space available. Allocate
688 // it here instead of in the sender, so that we can delete it after
689 // the _receiver_ is done.
690 ACE_Message_Block **send_block = 0;
691 ACE_NEW_RETURN (send_block,
692 ACE_Message_Block *[max_messages],
693 -1);
695 for (i = 0; i < max_messages; ++i)
696 ACE_NEW_RETURN (send_block[i],
697 ACE_Message_Block (test_message,
698 MAX_MESSAGE_SIZE),
699 -1);
701 queue_wrapper.send_block_ = send_block;
703 if (queue_type == 0)
704 ACE_NEW_RETURN (queue_wrapper.q_,
705 SYNCH_QUEUE,
706 -1);
707 #if defined (ACE_VXWORKS)
708 else
710 ACE_NEW_RETURN (queue_wrapper.q_,
711 ACE_Message_Queue_Vx (max_messages,
712 MAX_MESSAGE_SIZE),
713 -1);
714 message = "ACE_Message_Queue_Vx";
716 #elif defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
717 else
719 ACE_NEW_RETURN (queue_wrapper.q_,
720 ACE_Message_Queue_NT,
721 -1);
722 message = ACE_TEXT ("ACE_Message_Queue_NT");
724 #endif /* ACE_VXWORKS */
726 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) sender,
727 &queue_wrapper,
728 THR_BOUND) == -1)
729 ACE_ERROR_RETURN ((LM_ERROR,
730 ACE_TEXT ("%p\n"),
731 ACE_TEXT ("spawning sender thread")),
732 -1);
734 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) receiver,
735 &queue_wrapper,
736 THR_BOUND) == -1)
737 ACE_ERROR_RETURN ((LM_ERROR,
738 ACE_TEXT ("%p\n"),
739 ACE_TEXT ("spawning receiver thread")),
740 -1);
742 ACE_Thread_Manager::instance ()->wait ();
743 ACE_Time_Value tv;
744 timer->elapsed_time (tv);
745 ACE_DEBUG ((LM_INFO, ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
746 message,
747 max_messages,
748 tv.msec (),
749 (double) tv.msec () / max_messages));
750 timer->reset ();
752 delete queue_wrapper.q_;
753 queue_wrapper.q_ = 0;
755 for (i = 0; i < max_messages; ++i)
756 delete send_block[i];
757 delete [] send_block;
759 return 0;
762 // Ensure that the timedout dequeue_head() sets errno code properly.
764 static int
765 timeout_test ()
767 SYNCH_QUEUE mq;
768 int status = 0;
770 if (!mq.is_empty ())
772 ACE_ERROR ((LM_ERROR,
773 ACE_TEXT ("New queue is not empty!\n")));
774 status = 1;
776 else
778 ACE_Message_Block *b;
779 ACE_Time_Value tv (ACE_OS::gettimeofday ()); // Now
781 if (mq.dequeue_head (b, &tv) != -1)
783 ACE_ERROR ((LM_ERROR,
784 ACE_TEXT ("Dequeued from empty queue!\n")));
785 status = 1;
787 else if (errno != EWOULDBLOCK)
789 ACE_ERROR ((LM_ERROR,
790 ACE_TEXT ("%p\n"),
791 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
792 status = 1;
794 else
796 ACE_DEBUG ((LM_DEBUG,
797 ACE_TEXT ("Timed dequeue test: OK\n")));
798 status = 0; // All is well
802 return status;
804 #endif /* ACE_HAS_THREADS */
806 // Check to make sure that dequeue_prio() respects FIFO ordering.
807 // @@ At some point, this function should be enhanced to do a more
808 // thorough check...
810 static int
811 prio_test ()
813 const char S1[] = "first";
814 const char S2[] = "second";
815 const int PRIORITY = 50;
816 QUEUE mq;
817 int status;
819 ACE_Message_Block mb1 (S1, sizeof S1, PRIORITY);
820 ACE_Message_Block mb2 (S2, sizeof S2, PRIORITY);
822 mq.enqueue_prio (&mb1);
823 mq.enqueue_prio (&mb2);
825 ACE_Message_Block *mb1p = 0;
826 ACE_Message_Block *mb2p = 0;
828 mq.dequeue_prio (mb1p);
829 mq.dequeue_prio (mb2p);
831 ACE_DEBUG ((LM_DEBUG, "message 1 = %C\nmessage 2 = %C\n",
832 mb1p->rd_ptr (),
833 mb2p->rd_ptr ()));
835 if (ACE_OS::strcmp (mb1p->rd_ptr (), S1) == 0
836 && ACE_OS::strcmp (mb2p->rd_ptr (), S2) == 0)
837 status = 0;
838 else
839 status = 1;
841 return status;
844 static int
845 close_test ()
847 int status = 0;
849 int flushed_messages;
851 QUEUE mq1;
852 flushed_messages = mq1.close ();
854 if (flushed_messages != 0)
856 ACE_ERROR ((LM_ERROR,
857 ACE_TEXT ("Closing queue should flush 0 messages, close() reports - %d\n"),
858 flushed_messages ));
859 status = 1;
860 return status;
863 // There was a bug that return previous queue state instead of
864 // number of flushed messages. Thus, insert 2 messages != ACTIVATE
865 // queue state
866 ACE_Message_Block *pMB1;
867 ACE_Message_Block *pMB2;
868 ACE_NEW_NORETURN (pMB1, ACE_Message_Block (1));
869 ACE_NEW_NORETURN (pMB2, ACE_Message_Block (1));
870 QUEUE mq2;
871 mq2.enqueue_head (pMB1);
872 mq2.enqueue_head (pMB2);
873 flushed_messages = mq2.close ();
875 if (flushed_messages != 2)
877 ACE_ERROR ((LM_ERROR,
878 ACE_TEXT ("Closing queue should flush 2 messages, close() reports - %d\n"),
879 flushed_messages ));
880 status = 1;
881 return status;
883 return status;
887 run_main (int argc, ACE_TCHAR *argv[])
889 ACE_START_TEST (ACE_TEXT ("Message_Queue_Test"));
891 if (argc == 2)
893 if (!ACE_OS::strcmp (argv[1], ACE_TEXT ("-?")))
895 ACE_ERROR ((LM_ERROR,
896 ACE_TEXT ("%s/n"),
897 usage));
899 else
901 max_messages = ACE_OS::atoi (argv[1]);
905 int status = prio_test ();
907 // The iterator test occasionally causes a page fault or a hang on
908 // VxWorks.
909 if (status == 0)
910 status = iterator_test ();
912 ACE_NEW_RETURN (timer,
913 ACE_High_Res_Timer,
914 -1);
916 if (status == 0)
917 status = close_test ();
919 #if defined (ACE_HAS_THREADS)
920 if (status == 0)
921 status = timeout_test ();
923 if (status == 0)
924 status = chained_block_test ();
926 if (status == 0)
927 status = single_thread_performance_test ();
929 # if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
930 // Test ACE_Message_Queue_Vx. or ACE_Message_Queue_NT
931 if (status == 0)
932 status = single_thread_performance_test (1);
933 # endif /* ACE_VXWORKS */
935 if (status == 0)
936 status = performance_test ();
938 # if defined (ACE_VXWORKS) || defined (ACE_HAS_WIN32_OVERLAPPED_IO)
939 // Test ACE_Message_Queue_Vx or ACE_Message_Queue_NT
940 if (status == 0)
941 status = performance_test (1);
942 # endif /* ACE_VXWORKS */
944 if (counting_test () != 0)
945 status = -1;
946 #endif /* ACE_HAS_THREADS */
948 if (status != 0)
949 ACE_ERROR ((LM_ERROR,
950 ACE_TEXT ("%p\n"),
951 ACE_TEXT ("test failed")));
952 delete timer;
953 timer = 0;
956 ACE_END_TEST;
957 return status;