ACE+TAO-7_0_8
[ACE_TAO.git] / ACE / tests / Message_Queue_Test_Ex.cpp
blob3182d2c6d0bb4d8242996fd0f80fa7d5ccbf7878
2 //=============================================================================
3 /**
4 * @file Message_Queue_Test_Ex.cpp
6 * This is:
7 * 1. A simple test of the ACE_Message_Queue_Ex that executes
8 * a performance measurement test for both single-threaded
9 * (null synch) and thread-safe ACE_Message_Queue_Ex
10 * instantiations.
11 * 2. An example of using a user-defined class to parameterize
12 * ACE_Message_Queue_Ex.
14 * @author Michael Vitlo <mvitalo@sprynet.com>
15 * @author copied the code from: Irfan Pyarali <irfan@cs.wustl.edu> and David L. Levine <levine@cs.wustl.edu>
17 //=============================================================================
20 #include "test_config.h"
21 #include "ace/Thread_Manager.h"
23 #include "ace/Auto_Ptr.h"
24 #include "ace/Message_Queue.h"
25 #include "ace/Synch_Traits.h"
26 #include "ace/Null_Mutex.h"
27 #include "ace/Null_Condition.h"
28 #include "ace/High_Res_Timer.h"
29 #include "ace/Message_Block.h"
30 #include "ace/OS_NS_sys_time.h"
31 #include "ace/Barrier.h"
32 #include "Message_Queue_Test_Ex.h" // Declares User_Class
34 const ACE_TCHAR usage[] =
35 ACE_TEXT ("usage: Message_Queue_Test_Ex <number of messages>\n");
37 using QUEUE = ACE_Message_Queue_Ex<User_Class, ACE_NULL_SYNCH>;
39 static const int MAX_MESSAGES = 10000;
40 static const char test_message[] = "ACE_Message_Queue_Ex Test Message";
42 static int max_messages = MAX_MESSAGES;
43 static int chain_limit = 4;
44 static ACE_Barrier tester_barrier (2);
46 // Dynamically allocate to avoid a static.
47 static ACE_High_Res_Timer *timer = 0;
49 // Helper printing function
50 static void
51 print_message (const ACE_TCHAR *message)
53 ACE_Time_Value tv;
54 timer->elapsed_time (tv);
55 ACE_DEBUG ((LM_INFO,
56 ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
57 message,
58 max_messages,
59 tv.msec (),
60 (double) tv.msec () / max_messages));
63 #if defined (ACE_HAS_THREADS)
64 using SYNCH_QUEUE = ACE_Message_Queue_Ex<User_Class, ACE_MT_SYNCH>;
66 /**
67 * Container for data passed to sender and receiver in
68 * performance test.
70 * For use in multithreaded performance test.
72 struct Queue_Wrapper
74 /// The message queue.
75 SYNCH_QUEUE *q_;
77 /// Pointer to messages blocks for sender to send to reciever.
78 User_Class **send_block_;
80 /// Default constructor.
81 Queue_Wrapper ()
82 : q_ (0), send_block_ (0)
87 struct MQ_Ex_N_Tester_Wrapper
89 // = TITLE
90 // Container for data passed to sender in the MQ_Ex_N_Tester
91 // performance test.
93 // = DESCRIPTION
94 // For use in multithreaded performance test.
95 MQ_Ex_N_Tester *tester_;
96 User_Class *head_send_block_;
99 #endif /* ACE_HAS_THREADS */
101 // Encapsulates the sent messages creation and destruction
102 struct Send_Messages
104 Send_Messages (int number_of_messages, int chain_limit):
105 send_block_ (0),
106 number_of_messages_ (number_of_messages),
107 chain_limit_ (chain_limit)
111 int create_messages (const char test_message[])
113 int limit = this->number_of_messages_ / this->chain_limit_;
114 ACE_NEW_RETURN (this->send_block_,
115 User_Class *[limit],
116 -1);
118 int i, j;
119 for (i = 0; i < limit; ++i)
121 User_Class *&temp1 = this->send_block_[i];
122 ACE_NEW_RETURN (temp1,
123 User_Class (test_message),
124 -1);
125 User_Class *tail = temp1;
126 for (j = 1; j < this->chain_limit_; ++j)
128 User_Class *temp2 = 0;
129 ACE_NEW_RETURN (temp2,
130 User_Class (test_message),
131 -1);
132 tail->next (temp2);
133 tail = temp2;
136 this->head_send_block_ = this->send_block_[0];
137 return 0;
140 ~Send_Messages ()
142 int j, i = 0;
143 int limit = this->number_of_messages_ / this->chain_limit_;
144 for (; i < limit; ++i)
146 User_Class *&temp1 = this->send_block_[i];
147 for (j = 0; j < this->chain_limit_; ++j)
149 User_Class *temp2 = temp1->next ();
150 delete temp1;
151 temp1 = temp2;
154 delete [] this->send_block_;
157 User_Class * head_send_block_;
158 User_Class ** send_block_;
159 int number_of_messages_;
160 int chain_limit_;
163 // Encapsulates the received messages creation and destruction
164 struct Receive_Messages
166 Receive_Messages (int number_of_messages) :
167 receive_block_ (0),
168 number_of_messages_ (number_of_messages)
172 int create ()
174 ACE_NEW_RETURN (this->receive_block_,
175 User_Class *[this->number_of_messages_],
176 -1);
177 return 0;
180 ~Receive_Messages ()
182 delete [] this->receive_block_;
185 User_Class **receive_block_;
186 int number_of_messages_;
189 static int
190 single_thread_performance_test ()
192 const char test_message[] =
193 "ACE_Message_Queue_Ex Test Message";
194 const ACE_TCHAR *message =
195 ACE_TEXT ("ACE_Message_Queue_Ex<ACE_NULL_SYNCH>, single thread");
197 // Create a message queue.
198 QUEUE *msgq = 0;
200 ACE_NEW_RETURN (msgq,
201 QUEUE,
202 -1);
204 // Create the messages. Allocate off the heap in case messages is
205 // large relative to the amount of stack space available.
206 User_Class **send_block = 0;
207 ACE_NEW_RETURN (send_block,
208 User_Class *[max_messages],
209 -1);
211 int i = 0;
213 for (i = 0; i < max_messages; ++i)
214 ACE_NEW_RETURN (send_block[i],
215 User_Class (test_message),
216 -1);
218 User_Class **receive_block_p = 0;
219 ACE_NEW_RETURN (receive_block_p,
220 User_Class *[max_messages],
221 -1);
223 timer->start ();
225 // Send/receive the messages.
226 for (i = 0; i < max_messages; ++i)
228 if (msgq->enqueue_tail (send_block[i]) == -1)
229 ACE_ERROR_RETURN ((LM_ERROR,
230 ACE_TEXT ("%p\n"),
231 ACE_TEXT ("enqueue")),
232 -1);
234 if (msgq->dequeue_head (receive_block_p[i]) == -1)
235 ACE_ERROR_RETURN ((LM_ERROR,
236 ACE_TEXT ("%p\n"),
237 ACE_TEXT ("dequeue_head")),
238 -1);
241 timer->stop ();
242 print_message (message);
243 timer->reset ();
245 delete [] receive_block_p;
247 for (i = 0; i < max_messages; ++i)
248 delete send_block[i];
249 delete [] send_block;
250 delete msgq;
252 return 0;
256 MQ_Ex_N_Tester::single_thread_performance_test ()
258 // Create the messages. Allocate off the heap in case messages is
259 // large relative to the amount of stack space available.
261 if ((0 != this->test_enqueue_tail ()) ||
262 (0 != this->test_enqueue_head ()) )
264 return -1;
267 return 0;
271 MQ_Ex_N_Tester::test_enqueue_tail ()
273 const ACE_TCHAR *message =
274 ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_NULL_SYNCH>, test_enqueue_tail");
276 // Send_Messages creates messages and deletes them when it gets out of scope
277 Send_Messages messages (max_messages, chain_limit);
278 if (-1 == messages.create_messages (test_message))
280 return -1;
282 Receive_Messages r_messages (max_messages);
283 if (-1 == r_messages.create ())
285 return -1;
288 // prepare
289 int limit = max_messages / chain_limit;
290 timer->start ();
291 // Send with just one call
292 for (int i = 0; i < limit; ++i)
294 if (-1 == this->st_queue_.enqueue_tail (messages.send_block_[i]))
296 ACE_ERROR_RETURN ((LM_ERROR,
297 ACE_TEXT ("%p\n"),
298 ACE_TEXT ("enqueue_tail_n")),
299 -1);
302 for (int j = 0, k = 0; j < chain_limit; ++j, ++k)
304 if (this->st_queue_.dequeue_head (r_messages.receive_block_[k]) == -1)
306 ACE_ERROR_RETURN ((LM_ERROR,
307 ACE_TEXT ("%p\n"),
308 ACE_TEXT ("dequeue_head")),
309 -1);
313 timer->stop ();
315 print_message (message);
317 timer->reset ();
319 return 0;
323 MQ_Ex_N_Tester::test_enqueue_head ()
325 const ACE_TCHAR *message =
326 ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_NULL_SYNCH>, test_enqueue_head");
328 // Send_Messages creates messages and deletes them when it gets out of scope
329 Send_Messages messages (max_messages, chain_limit);
330 if (-1 == messages.create_messages (test_message))
332 return -1;
334 Receive_Messages r_messages (max_messages);
335 if (-1 == r_messages.create ())
337 return -1;
340 // prepare
341 int i, j, k = 0;
343 int limit = max_messages / chain_limit;
344 timer->start ();
346 // Send/receive the messages.
347 // Send with just one call
348 for (i = 0; i < limit; ++i)
350 if (-1 == this->st_queue_.enqueue_head (messages.send_block_[i]))
352 ACE_ERROR_RETURN ((LM_ERROR,
353 ACE_TEXT ("%p\n"),
354 ACE_TEXT ("enqueue_tail_n")),
355 -1);
358 for (j = 0; j < chain_limit; ++j, ++k)
360 if (this->st_queue_.dequeue_head (r_messages.receive_block_[k]) == -1)
362 ACE_ERROR_RETURN ((LM_ERROR,
363 ACE_TEXT ("%p\n"),
364 ACE_TEXT ("dequeue_head")),
365 -1);
369 timer->stop ();
371 print_message (message);
373 timer->reset ();
375 return 0;
379 #if defined (ACE_HAS_THREADS)
381 static void *
382 receiver (void *arg)
384 Queue_Wrapper *queue_wrapper = reinterpret_cast<Queue_Wrapper *> (arg);
385 int i;
387 User_Class **receive_block_p = 0;
388 ACE_NEW_RETURN (receive_block_p,
389 User_Class *[max_messages],
390 (void *) -1);
392 for (i = 0; i < max_messages; ++i)
393 if (queue_wrapper->q_->dequeue_head (receive_block_p[i]) == -1)
394 ACE_ERROR_RETURN ((LM_ERROR,
395 ACE_TEXT ("%p\n"),
396 ACE_TEXT ("dequeue_head")),
398 timer->stop ();
400 delete [] receive_block_p;
402 return 0;
405 static void *
406 sender (void *arg)
408 Queue_Wrapper *queue_wrapper = reinterpret_cast<Queue_Wrapper *> (arg);
409 int i;
411 timer->start ();
413 // Send the messages.
414 for (i = 0; i < max_messages; ++i)
415 if (queue_wrapper->q_->
416 enqueue_tail (queue_wrapper->send_block_[i]) == -1)
417 ACE_ERROR_RETURN ((LM_ERROR,
418 ACE_TEXT ("%p\n"),
419 ACE_TEXT ("enqueue")),
421 return 0;
424 static int
425 performance_test ()
427 Queue_Wrapper queue_wrapper;
428 const ACE_TCHAR *message =
429 ACE_TEXT ("ACE_Message_Queue_Ex<ACE_SYNCH>");
430 int i = 0;
432 // Create the messages. Allocate off the heap in case messages is
433 // large relative to the amount of stack space available. Allocate
434 // it here instead of in the sender, so that we can delete it after
435 // the _receiver_ is done.
436 User_Class **send_block = 0;
437 ACE_NEW_RETURN (send_block,
438 User_Class *[max_messages],
439 -1);
441 for (i = 0; i < max_messages; ++i)
442 ACE_NEW_RETURN (send_block[i],
443 User_Class (test_message),
444 -1);
446 queue_wrapper.send_block_ = send_block;
448 ACE_NEW_RETURN (queue_wrapper.q_,
449 SYNCH_QUEUE,
450 -1);
452 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) sender,
453 &queue_wrapper,
454 THR_BOUND) == -1)
455 ACE_ERROR_RETURN ((LM_ERROR,
456 ACE_TEXT ("%p\n"),
457 ACE_TEXT ("spawning sender thread")),
458 -1);
460 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) receiver,
461 &queue_wrapper,
462 THR_BOUND) == -1)
463 ACE_ERROR_RETURN ((LM_ERROR,
464 ACE_TEXT ("%p\n"),
465 ACE_TEXT ("spawning receiver thread")),
466 -1);
468 ACE_Thread_Manager::instance ()->wait ();
469 print_message (message);
470 timer->reset ();
472 delete queue_wrapper.q_;
473 queue_wrapper.q_ = 0;
475 for (i = 0; i < max_messages; ++i)
476 delete send_block[i];
477 delete [] send_block;
479 return 0;
483 MQ_Ex_N_Tester::performance_test ()
485 const ACE_TCHAR *message =
486 ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_SYNCH>");
488 Send_Messages messages (max_messages, chain_limit);
489 if (-1 == messages.create_messages (test_message))
491 return -1;
494 MQ_Ex_N_Tester_Wrapper tester_wrapper;
495 tester_wrapper.head_send_block_ = messages.head_send_block_;
496 tester_wrapper.tester_ = this;
498 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) &MQ_Ex_N_Tester::sender,
499 &tester_wrapper,
500 THR_BOUND) == -1)
501 ACE_ERROR_RETURN ((LM_ERROR,
502 ACE_TEXT ("%p\n"),
503 ACE_TEXT ("spawning sender thread")),
504 -1);
506 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) &MQ_Ex_N_Tester::receiver,
507 this,
508 THR_BOUND) == -1)
509 ACE_ERROR_RETURN ((LM_ERROR,
510 ACE_TEXT ("%p\n"),
511 ACE_TEXT ("spawning receiver thread")),
512 -1);
514 ACE_Thread_Manager::instance ()->wait ();
516 print_message (message);
518 timer->reset ();
520 return 0;
523 ACE_THR_FUNC_RETURN
524 MQ_Ex_N_Tester::receiver (void *args)
526 MQ_Ex_N_Tester *tester = reinterpret_cast<MQ_Ex_N_Tester *> (args);
528 User_Class **receive_block_p = 0;
529 ACE_NEW_RETURN (receive_block_p,
530 User_Class *[max_messages],
531 (ACE_THR_FUNC_RETURN) -1);
533 int i;
534 tester_barrier.wait ();
535 for (i = 0; i < max_messages; ++i)
537 if (tester->mt_queue_.dequeue_head (receive_block_p[i]) == -1)
539 ACE_ERROR ((LM_ERROR,
540 ACE_TEXT ("%p\n"),
541 ACE_TEXT ("dequeue_head")));
542 return (ACE_THR_FUNC_RETURN) -1;
545 timer->stop ();
547 delete [] receive_block_p;
549 return 0;
552 ACE_THR_FUNC_RETURN
553 MQ_Ex_N_Tester::sender (void *args)
555 MQ_Ex_N_Tester_Wrapper *tester_wrapper =
556 reinterpret_cast<MQ_Ex_N_Tester_Wrapper *> (args);
557 MQ_Ex_N_Tester *tester = tester_wrapper->tester_;
559 Send_Messages messages (max_messages, chain_limit);
560 if (-1 == messages.create_messages (test_message))
562 return (ACE_THR_FUNC_RETURN) -1;
564 int limit = max_messages / chain_limit;
565 tester_barrier.wait ();
566 timer->start ();
567 // Send/receive the messages.
568 timer->start ();
569 // Send with just one call
570 for (int i = 0; i < limit; ++i)
572 if (-1 == tester->mt_queue_.enqueue_tail (messages.send_block_[i]))
574 ACE_ERROR ((LM_ERROR,
575 ACE_TEXT ("%p\n"),
576 ACE_TEXT ("enqueue_tail_n")));
577 return (ACE_THR_FUNC_RETURN) -1;
580 return 0;
583 #endif /* ACE_HAS_THREADS */
585 int basic_queue_test (ACE_Message_Queue_Ex<User_Class, ACE_SYNCH>& q)
587 int status = 0;
588 if (!q.is_empty ())
590 ACE_ERROR ((LM_ERROR, ACE_TEXT ("New queue is not empty!\n")));
591 status = 1;
593 else
595 User_Class *b;
596 ACE_Time_Value tv (ACE_OS::gettimeofday ()); // Now
597 if (q.dequeue_head (b, &tv) != -1)
599 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Dequeued from empty queue!\n")));
600 status = 1;
602 else if (errno != EWOULDBLOCK)
604 ACE_ERROR ((LM_ERROR,
605 ACE_TEXT ("%p\n"),
606 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
607 status = 1;
609 else
611 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Timed dequeue test: OK\n")));
612 status = 0; // All is well
616 return status;
619 int queue_priority_test (ACE_Message_Queue_Ex<User_Class, ACE_SYNCH>& q)
621 int status = 0;
622 if (!q.is_empty ())
623 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Prio test queue not empty\n")), 1);
625 // Set up a few objects with names for how they should come out of the queue.
626 ACE_Auto_Basic_Ptr<User_Class> b1, b2, b3, b4;
627 b1.reset (new User_Class ("first"));
628 b2.reset (new User_Class ("second"));
629 b3.reset (new User_Class ("third"));
630 b4.reset (new User_Class ("fourth"));
631 unsigned long prio =
632 ACE_Message_Queue_Ex<User_Class, ACE_SYNCH>::DEFAULT_PRIORITY;
634 prio += 1;
635 if (-1 == q.enqueue_prio (b2.get (), 0, prio))
636 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("b2")), 1);
637 if (-1 == q.enqueue_prio (b3.get (), 0, prio))
638 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("b3")), 1);
639 prio -= 1;
640 if (-1 == q.enqueue_prio (b4.get (), 0, prio))
641 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("b4")), 1);
642 prio += 5;
643 if (-1 == q.enqueue_prio (b1.get (), 0, prio))
644 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("b1")), 1);
646 User_Class *b = 0;
647 if (q.dequeue_head (b) == -1)
649 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 1")));
650 ++status;
652 else
654 if (ACE_OS::strcmp (b->message (), "first") != 0)
656 ACE_ERROR ((LM_ERROR,
657 ACE_TEXT ("First dequeued was %C\n"),
658 b->message ()));
659 ++status;
662 if (q.dequeue_head (b) == -1)
664 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 2")));
665 ++status;
667 else
669 if (ACE_OS::strcmp (b->message (), "second") != 0)
671 ACE_ERROR ((LM_ERROR,
672 ACE_TEXT ("Second dequeued was %C\n"),
673 b->message ()));
674 ++status;
677 if (q.dequeue_head (b) == -1)
679 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 3")));
680 ++status;
682 else
684 if (ACE_OS::strcmp (b->message (), "third") != 0)
686 ACE_ERROR ((LM_ERROR,
687 ACE_TEXT ("Third dequeued was %C\n"),
688 b->message ()));
689 ++status;
692 if (q.dequeue_head (b) == -1)
694 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 4")));
695 ++status;
697 else
699 if (ACE_OS::strcmp (b->message (), "fourth") != 0)
701 ACE_ERROR ((LM_ERROR,
702 ACE_TEXT ("Fourth dequeued was %C\n"),
703 b->message ()));
704 ++status;
708 if (status == 0)
709 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Priority queueing test: OK\n")));
710 return status;
714 run_main (int argc, ACE_TCHAR *argv[])
716 ACE_START_TEST (ACE_TEXT ("Message_Queue_Test_Ex"));
718 if (argc == 2)
720 if (! ACE_OS::strcmp (argv[1], ACE_TEXT ("-?")))
722 ACE_ERROR ((LM_ERROR,
723 ACE_TEXT ("%s/n"),
724 usage));
726 else
728 max_messages = ACE_OS::atoi (argv[1]);
732 int status = 0;
734 // Be sure that the a timed out get sets the error code properly.
735 ACE_Message_Queue_Ex<User_Class, ACE_SYNCH> q1;
736 ACE_Message_Queue_Ex_N<User_Class, ACE_SYNCH> q2;
737 if (0 != basic_queue_test (q1) ||
738 0 != basic_queue_test (q2))
740 ++status;
743 // Check priority operations.
744 if (0 != queue_priority_test (q1))
746 ++status;
749 ACE_NEW_RETURN (timer,
750 ACE_High_Res_Timer,
751 -1);
753 status += single_thread_performance_test ();
755 #if defined (ACE_HAS_THREADS)
756 status += performance_test ();
757 #endif /* ACE_HAS_THREADS */
760 MQ_Ex_N_Tester ex_n_tester;
761 status += ex_n_tester.single_thread_performance_test ();
762 #if defined (ACE_HAS_THREADS)
763 status += ex_n_tester.performance_test ();
764 #endif /* ACE_HAS_THREADS */
767 if (status != 0)
768 ACE_ERROR ((LM_ERROR,
769 ACE_TEXT ("%p\n"),
770 ACE_TEXT ("test failed")));
771 delete timer;
772 timer = 0;
774 ACE_END_TEST;
775 return status;