2 //=============================================================================
4 * @file Message_Queue_Test_Ex.cpp
7 * 1. A simple test of the ACE_Message_Queue_Ex that executes
8 * a performance measurement test for both single-threaded
9 * (null synch) and thread-safe ACE_Message_Queue_Ex
11 * 2. An example of using a user-defined class to parameterize
12 * ACE_Message_Queue_Ex.
14 * @author Michael Vitlo <mvitalo@sprynet.com>
15 * @author copied the code from: Irfan Pyarali <irfan@cs.wustl.edu> and David L. Levine <levine@cs.wustl.edu>
17 //=============================================================================
20 #include "test_config.h"
21 #include "ace/Thread_Manager.h"
24 #include "ace/Message_Queue.h"
25 #include "ace/Synch_Traits.h"
26 #include "ace/Null_Mutex.h"
27 #include "ace/Null_Condition.h"
28 #include "ace/High_Res_Timer.h"
29 #include "ace/Message_Block.h"
30 #include "ace/OS_NS_sys_time.h"
31 #include "ace/Barrier.h"
32 #include "Message_Queue_Test_Ex.h" // Declares User_Class
34 const ACE_TCHAR usage
[] =
35 ACE_TEXT ("usage: Message_Queue_Test_Ex <number of messages>\n");
37 using QUEUE
= ACE_Message_Queue_Ex
<User_Class
, ACE_NULL_SYNCH
>;
39 static const int MAX_MESSAGES
= 10000;
40 static const char test_message
[] = "ACE_Message_Queue_Ex Test Message";
42 static int max_messages
= MAX_MESSAGES
;
43 static int chain_limit
= 4;
44 static ACE_Barrier
tester_barrier (2);
46 // Dynamically allocate to avoid a static.
47 static ACE_High_Res_Timer
*timer
= 0;
49 // Helper printing function
51 print_message (const ACE_TCHAR
*message
)
54 timer
->elapsed_time (tv
);
56 ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
60 (double) tv
.msec () / max_messages
));
63 #if defined (ACE_HAS_THREADS)
64 using SYNCH_QUEUE
= ACE_Message_Queue_Ex
<User_Class
, ACE_MT_SYNCH
>;
67 * Container for data passed to sender and receiver in
70 * For use in multithreaded performance test.
74 /// The message queue.
77 /// Pointer to messages blocks for sender to send to reciever.
78 User_Class
**send_block_
{};
80 /// Default constructor.
81 Queue_Wrapper () = default;
85 * Container for data passed to sender in the MQ_Ex_N_Tester
88 * For use in multithreaded performance test.
90 struct MQ_Ex_N_Tester_Wrapper
92 MQ_Ex_N_Tester
*tester_
{};
93 User_Class
*head_send_block_
{};
96 #endif /* ACE_HAS_THREADS */
98 // Encapsulates the sent messages creation and destruction
101 Send_Messages (int number_of_messages
, int chain_limit
):
103 number_of_messages_ (number_of_messages
),
104 chain_limit_ (chain_limit
)
108 int create_messages (const char test_message
[])
110 int limit
= this->number_of_messages_
/ this->chain_limit_
;
111 ACE_NEW_RETURN (this->send_block_
,
116 for (i
= 0; i
< limit
; ++i
)
118 User_Class
*&temp1
= this->send_block_
[i
];
119 ACE_NEW_RETURN (temp1
,
120 User_Class (test_message
),
122 User_Class
*tail
= temp1
;
123 for (j
= 1; j
< this->chain_limit_
; ++j
)
125 User_Class
*temp2
= 0;
126 ACE_NEW_RETURN (temp2
,
127 User_Class (test_message
),
133 this->head_send_block_
= this->send_block_
[0];
140 int limit
= this->number_of_messages_
/ this->chain_limit_
;
141 for (; i
< limit
; ++i
)
143 User_Class
*&temp1
= this->send_block_
[i
];
144 for (j
= 0; j
< this->chain_limit_
; ++j
)
146 User_Class
*temp2
= temp1
->next ();
151 delete [] this->send_block_
;
154 User_Class
* head_send_block_
;
155 User_Class
** send_block_
;
156 int number_of_messages_
;
160 // Encapsulates the received messages creation and destruction
161 struct Receive_Messages
163 Receive_Messages (int number_of_messages
) :
165 number_of_messages_ (number_of_messages
)
171 ACE_NEW_RETURN (this->receive_block_
,
172 User_Class
*[this->number_of_messages_
],
179 delete [] this->receive_block_
;
182 User_Class
**receive_block_
;
183 int number_of_messages_
;
187 single_thread_performance_test ()
189 const char test_message
[] =
190 "ACE_Message_Queue_Ex Test Message";
191 const ACE_TCHAR
*message
=
192 ACE_TEXT ("ACE_Message_Queue_Ex<ACE_NULL_SYNCH>, single thread");
194 // Create a message queue.
197 ACE_NEW_RETURN (msgq
,
201 // Create the messages. Allocate off the heap in case messages is
202 // large relative to the amount of stack space available.
203 User_Class
**send_block
= 0;
204 ACE_NEW_RETURN (send_block
,
205 User_Class
*[max_messages
],
210 for (i
= 0; i
< max_messages
; ++i
)
211 ACE_NEW_RETURN (send_block
[i
],
212 User_Class (test_message
),
215 User_Class
**receive_block_p
= 0;
216 ACE_NEW_RETURN (receive_block_p
,
217 User_Class
*[max_messages
],
222 // Send/receive the messages.
223 for (i
= 0; i
< max_messages
; ++i
)
225 if (msgq
->enqueue_tail (send_block
[i
]) == -1)
226 ACE_ERROR_RETURN ((LM_ERROR
,
228 ACE_TEXT ("enqueue")),
231 if (msgq
->dequeue_head (receive_block_p
[i
]) == -1)
232 ACE_ERROR_RETURN ((LM_ERROR
,
234 ACE_TEXT ("dequeue_head")),
239 print_message (message
);
242 delete [] receive_block_p
;
244 for (i
= 0; i
< max_messages
; ++i
)
245 delete send_block
[i
];
246 delete [] send_block
;
253 MQ_Ex_N_Tester::single_thread_performance_test ()
255 // Create the messages. Allocate off the heap in case messages is
256 // large relative to the amount of stack space available.
258 if ((0 != this->test_enqueue_tail ()) ||
259 (0 != this->test_enqueue_head ()) )
268 MQ_Ex_N_Tester::test_enqueue_tail ()
270 const ACE_TCHAR
*message
=
271 ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_NULL_SYNCH>, test_enqueue_tail");
273 // Send_Messages creates messages and deletes them when it gets out of scope
274 Send_Messages
messages (max_messages
, chain_limit
);
275 if (-1 == messages
.create_messages (test_message
))
279 Receive_Messages
r_messages (max_messages
);
280 if (-1 == r_messages
.create ())
286 int limit
= max_messages
/ chain_limit
;
288 // Send with just one call
289 for (int i
= 0; i
< limit
; ++i
)
291 if (-1 == this->st_queue_
.enqueue_tail (messages
.send_block_
[i
]))
293 ACE_ERROR_RETURN ((LM_ERROR
,
295 ACE_TEXT ("enqueue_tail_n")),
299 for (int j
= 0, k
= 0; j
< chain_limit
; ++j
, ++k
)
301 if (this->st_queue_
.dequeue_head (r_messages
.receive_block_
[k
]) == -1)
303 ACE_ERROR_RETURN ((LM_ERROR
,
305 ACE_TEXT ("dequeue_head")),
312 print_message (message
);
320 MQ_Ex_N_Tester::test_enqueue_head ()
322 const ACE_TCHAR
*message
=
323 ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_NULL_SYNCH>, test_enqueue_head");
325 // Send_Messages creates messages and deletes them when it gets out of scope
326 Send_Messages
messages (max_messages
, chain_limit
);
327 if (-1 == messages
.create_messages (test_message
))
331 Receive_Messages
r_messages (max_messages
);
332 if (-1 == r_messages
.create ())
340 int limit
= max_messages
/ chain_limit
;
343 // Send/receive the messages.
344 // Send with just one call
345 for (i
= 0; i
< limit
; ++i
)
347 if (-1 == this->st_queue_
.enqueue_head (messages
.send_block_
[i
]))
349 ACE_ERROR_RETURN ((LM_ERROR
,
351 ACE_TEXT ("enqueue_tail_n")),
355 for (j
= 0; j
< chain_limit
; ++j
, ++k
)
357 if (this->st_queue_
.dequeue_head (r_messages
.receive_block_
[k
]) == -1)
359 ACE_ERROR_RETURN ((LM_ERROR
,
361 ACE_TEXT ("dequeue_head")),
368 print_message (message
);
375 #if defined (ACE_HAS_THREADS)
380 Queue_Wrapper
*queue_wrapper
= reinterpret_cast<Queue_Wrapper
*> (arg
);
383 User_Class
**receive_block_p
= 0;
384 ACE_NEW_RETURN (receive_block_p
,
385 User_Class
*[max_messages
],
388 for (i
= 0; i
< max_messages
; ++i
)
389 if (queue_wrapper
->q_
->dequeue_head (receive_block_p
[i
]) == -1)
390 ACE_ERROR_RETURN ((LM_ERROR
,
392 ACE_TEXT ("dequeue_head")),
396 delete [] receive_block_p
;
404 Queue_Wrapper
*queue_wrapper
= reinterpret_cast<Queue_Wrapper
*> (arg
);
409 // Send the messages.
410 for (i
= 0; i
< max_messages
; ++i
)
411 if (queue_wrapper
->q_
->
412 enqueue_tail (queue_wrapper
->send_block_
[i
]) == -1)
413 ACE_ERROR_RETURN ((LM_ERROR
,
415 ACE_TEXT ("enqueue")),
423 Queue_Wrapper queue_wrapper
;
424 const ACE_TCHAR
*message
=
425 ACE_TEXT ("ACE_Message_Queue_Ex<ACE_SYNCH>");
428 // Create the messages. Allocate off the heap in case messages is
429 // large relative to the amount of stack space available. Allocate
430 // it here instead of in the sender, so that we can delete it after
431 // the _receiver_ is done.
432 User_Class
**send_block
= 0;
433 ACE_NEW_RETURN (send_block
,
434 User_Class
*[max_messages
],
437 for (i
= 0; i
< max_messages
; ++i
)
438 ACE_NEW_RETURN (send_block
[i
],
439 User_Class (test_message
),
442 queue_wrapper
.send_block_
= send_block
;
444 ACE_NEW_RETURN (queue_wrapper
.q_
,
448 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) sender
,
451 ACE_ERROR_RETURN ((LM_ERROR
,
453 ACE_TEXT ("spawning sender thread")),
456 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) receiver
,
459 ACE_ERROR_RETURN ((LM_ERROR
,
461 ACE_TEXT ("spawning receiver thread")),
464 ACE_Thread_Manager::instance ()->wait ();
465 print_message (message
);
468 delete queue_wrapper
.q_
;
469 queue_wrapper
.q_
= 0;
471 for (i
= 0; i
< max_messages
; ++i
)
472 delete send_block
[i
];
473 delete [] send_block
;
479 MQ_Ex_N_Tester::performance_test ()
481 const ACE_TCHAR
*message
=
482 ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_SYNCH>");
484 Send_Messages
messages (max_messages
, chain_limit
);
485 if (-1 == messages
.create_messages (test_message
))
490 MQ_Ex_N_Tester_Wrapper tester_wrapper
;
491 tester_wrapper
.head_send_block_
= messages
.head_send_block_
;
492 tester_wrapper
.tester_
= this;
494 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) &MQ_Ex_N_Tester::sender
,
497 ACE_ERROR_RETURN ((LM_ERROR
,
499 ACE_TEXT ("spawning sender thread")),
502 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) &MQ_Ex_N_Tester::receiver
,
505 ACE_ERROR_RETURN ((LM_ERROR
,
507 ACE_TEXT ("spawning receiver thread")),
510 ACE_Thread_Manager::instance ()->wait ();
512 print_message (message
);
520 MQ_Ex_N_Tester::receiver (void *args
)
522 MQ_Ex_N_Tester
*tester
= reinterpret_cast<MQ_Ex_N_Tester
*> (args
);
524 User_Class
**receive_block_p
= 0;
525 ACE_NEW_RETURN (receive_block_p
,
526 User_Class
*[max_messages
],
527 (ACE_THR_FUNC_RETURN
) -1);
530 tester_barrier
.wait ();
531 for (i
= 0; i
< max_messages
; ++i
)
533 if (tester
->mt_queue_
.dequeue_head (receive_block_p
[i
]) == -1)
535 ACE_ERROR ((LM_ERROR
,
537 ACE_TEXT ("dequeue_head")));
538 return (ACE_THR_FUNC_RETURN
) -1;
543 delete [] receive_block_p
;
549 MQ_Ex_N_Tester::sender (void *args
)
551 MQ_Ex_N_Tester_Wrapper
*tester_wrapper
=
552 reinterpret_cast<MQ_Ex_N_Tester_Wrapper
*> (args
);
553 MQ_Ex_N_Tester
*tester
= tester_wrapper
->tester_
;
555 Send_Messages
messages (max_messages
, chain_limit
);
556 if (-1 == messages
.create_messages (test_message
))
558 return (ACE_THR_FUNC_RETURN
) -1;
560 int limit
= max_messages
/ chain_limit
;
561 tester_barrier
.wait ();
563 // Send/receive the messages.
565 // Send with just one call
566 for (int i
= 0; i
< limit
; ++i
)
568 if (-1 == tester
->mt_queue_
.enqueue_tail (messages
.send_block_
[i
]))
570 ACE_ERROR ((LM_ERROR
,
572 ACE_TEXT ("enqueue_tail_n")));
573 return (ACE_THR_FUNC_RETURN
) -1;
579 #endif /* ACE_HAS_THREADS */
581 int basic_queue_test (ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
>& q
)
586 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("New queue is not empty!\n")));
592 ACE_Time_Value
tv (ACE_OS::gettimeofday ()); // Now
593 if (q
.dequeue_head (b
, &tv
) != -1)
595 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Dequeued from empty queue!\n")));
598 else if (errno
!= EWOULDBLOCK
)
600 ACE_ERROR ((LM_ERROR
,
602 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
607 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Timed dequeue test: OK\n")));
608 status
= 0; // All is well
615 int queue_priority_test (ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
>& q
)
619 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("Prio test queue not empty\n")), 1);
621 // Set up a few objects with names for how they should come out of the queue.
622 std::unique_ptr
<User_Class
> b1
, b2
, b3
, b4
;
623 b1
.reset (new User_Class ("first"));
624 b2
.reset (new User_Class ("second"));
625 b3
.reset (new User_Class ("third"));
626 b4
.reset (new User_Class ("fourth"));
628 ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
>::DEFAULT_PRIORITY
;
631 if (-1 == q
.enqueue_prio (b2
.get (), 0, prio
))
632 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b2")), 1);
633 if (-1 == q
.enqueue_prio (b3
.get (), 0, prio
))
634 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b3")), 1);
636 if (-1 == q
.enqueue_prio (b4
.get (), 0, prio
))
637 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b4")), 1);
639 if (-1 == q
.enqueue_prio (b1
.get (), 0, prio
))
640 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b1")), 1);
643 if (q
.dequeue_head (b
) == -1)
645 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 1")));
650 if (ACE_OS::strcmp (b
->message (), "first") != 0)
652 ACE_ERROR ((LM_ERROR
,
653 ACE_TEXT ("First dequeued was %C\n"),
658 if (q
.dequeue_head (b
) == -1)
660 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 2")));
665 if (ACE_OS::strcmp (b
->message (), "second") != 0)
667 ACE_ERROR ((LM_ERROR
,
668 ACE_TEXT ("Second dequeued was %C\n"),
673 if (q
.dequeue_head (b
) == -1)
675 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 3")));
680 if (ACE_OS::strcmp (b
->message (), "third") != 0)
682 ACE_ERROR ((LM_ERROR
,
683 ACE_TEXT ("Third dequeued was %C\n"),
688 if (q
.dequeue_head (b
) == -1)
690 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 4")));
695 if (ACE_OS::strcmp (b
->message (), "fourth") != 0)
697 ACE_ERROR ((LM_ERROR
,
698 ACE_TEXT ("Fourth dequeued was %C\n"),
705 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Priority queueing test: OK\n")));
709 class Queue_Ex_Iterator_No_Lock
710 : public ACE_Message_Queue_Iterator
<ACE_SYNCH
, ACE_System_Time_Policy
>
713 typedef ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
, ACE_System_Time_Policy
> MESSAGE_QUEUE_EX_T
;
715 explicit Queue_Ex_Iterator_No_Lock (MESSAGE_QUEUE_EX_T
& queue_in
)
716 : ACE_Message_Queue_Iterator
<ACE_SYNCH
, ACE_System_Time_Policy
> (queue_in
.queue ())
718 virtual ~Queue_Ex_Iterator_No_Lock () = default;
720 int next (User_Class
*& message_inout
)
722 if (ACE_Message_Queue_Iterator
<ACE_SYNCH
, ACE_System_Time_Policy
>::curr_
)
725 reinterpret_cast<User_Class
*> (ACE_Message_Queue_Iterator
<ACE_SYNCH
, ACE_System_Time_Policy
>::curr_
->base ());
734 return (!ACE_Message_Queue_Iterator
<ACE_SYNCH
, ACE_System_Time_Policy
>::curr_
? 1 : 0);
739 if (ACE_Message_Queue_Iterator
<ACE_SYNCH
, ACE_System_Time_Policy
>::curr_
)
741 ACE_Message_Queue_Iterator
<ACE_SYNCH
, ACE_System_Time_Policy
>::curr_
=
742 ACE_Message_Queue_Iterator
<ACE_SYNCH
, ACE_System_Time_Policy
>::curr_
->next ();
745 return (ACE_Message_Queue_Iterator
<ACE_SYNCH
, ACE_System_Time_Policy
>::curr_
? 1 : 0);
749 int queue_iterator_test (ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
>& q
)
753 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("Iterator test queue not empty\n")), 1);
755 // Set up a few objects with names for how they should come out of the queue.
756 std::unique_ptr
<User_Class
> b1
= std::make_unique
<User_Class
> ("first");
757 std::unique_ptr
<User_Class
> b2
= std::make_unique
<User_Class
> ("second");
758 std::unique_ptr
<User_Class
> b3
= std::make_unique
<User_Class
> ("third");
759 std::unique_ptr
<User_Class
> b4
= std::make_unique
<User_Class
> ("fourth");
760 if (-1 == q
.enqueue_tail (b1
.get (), 0))
761 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b1")), 1);
762 if (-1 == q
.enqueue_tail (b2
.get (), 0))
763 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b2")), 1);
764 if (-1 == q
.enqueue_tail (b3
.get (), 0))
765 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b3")), 1);
766 if (-1 == q
.enqueue_tail (b4
.get (), 0))
767 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b4")), 1);
769 User_Class
* b
= nullptr;
771 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, aGuard
, q
.lock (), 1);
773 for (Queue_Ex_Iterator_No_Lock
iterator (q
);
781 if (ACE_OS::strcmp (b
->message (), "first") != 0)
783 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("First message was %C\n"), b
->message ()));
787 else if (counter
== 4)
789 if (ACE_OS::strcmp (b
->message (), "fourth") != 0)
791 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Fourth message was %C\n"), b
->message ()));
800 while (!q
.is_empty ())
801 q
.dequeue_head (b
, 0);
804 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Iterator test: OK\n")));
809 run_main (int argc
, ACE_TCHAR
*argv
[])
811 ACE_START_TEST (ACE_TEXT ("Message_Queue_Test_Ex"));
815 if (! ACE_OS::strcmp (argv
[1], ACE_TEXT ("-?")))
817 ACE_ERROR ((LM_ERROR
,
823 max_messages
= ACE_OS::atoi (argv
[1]);
829 // Be sure that the a timed out get sets the error code properly.
830 ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
> q1
;
831 ACE_Message_Queue_Ex_N
<User_Class
, ACE_SYNCH
> q2
;
832 if (0 != basic_queue_test (q1
) ||
833 0 != basic_queue_test (q2
))
838 // Check priority operations.
839 if (0 != queue_priority_test (q1
))
844 // Check iterator operations.
845 if (0 != queue_iterator_test (q1
))
850 ACE_NEW_RETURN (timer
,
854 status
+= single_thread_performance_test ();
856 #if defined (ACE_HAS_THREADS)
857 status
+= performance_test ();
858 #endif /* ACE_HAS_THREADS */
861 MQ_Ex_N_Tester ex_n_tester
;
862 status
+= ex_n_tester
.single_thread_performance_test ();
863 #if defined (ACE_HAS_THREADS)
864 status
+= ex_n_tester
.performance_test ();
865 #endif /* ACE_HAS_THREADS */
869 ACE_ERROR ((LM_ERROR
,
871 ACE_TEXT ("test failed")));