2 //=============================================================================
4 * @file Message_Queue_Test_Ex.cpp
7 * 1. A simple test of the ACE_Message_Queue_Ex that executes
8 * a performance measurement test for both single-threaded
9 * (null synch) and thread-safe ACE_Message_Queue_Ex
11 * 2. An example of using a user-defined class to parameterize
12 * ACE_Message_Queue_Ex.
14 * @author Michael Vitlo <mvitalo@sprynet.com>
15 * @author copied the code from: Irfan Pyarali <irfan@cs.wustl.edu> and David L. Levine <levine@cs.wustl.edu>
17 //=============================================================================
20 #include "test_config.h"
21 #include "ace/Thread_Manager.h"
23 #include "ace/Auto_Ptr.h"
24 #include "ace/Message_Queue.h"
25 #include "ace/Synch_Traits.h"
26 #include "ace/Null_Mutex.h"
27 #include "ace/Null_Condition.h"
28 #include "ace/High_Res_Timer.h"
29 #include "ace/Message_Block.h"
30 #include "ace/OS_NS_sys_time.h"
31 #include "ace/Barrier.h"
32 #include "Message_Queue_Test_Ex.h" // Declares User_Class
34 const ACE_TCHAR usage
[] =
35 ACE_TEXT ("usage: Message_Queue_Test_Ex <number of messages>\n");
37 using QUEUE
= ACE_Message_Queue_Ex
<User_Class
, ACE_NULL_SYNCH
>;
39 static const int MAX_MESSAGES
= 10000;
40 static const char test_message
[] = "ACE_Message_Queue_Ex Test Message";
42 static int max_messages
= MAX_MESSAGES
;
43 static int chain_limit
= 4;
44 static ACE_Barrier
tester_barrier (2);
46 // Dynamically allocate to avoid a static.
47 static ACE_High_Res_Timer
*timer
= 0;
49 // Helper printing function
51 print_message (const ACE_TCHAR
*message
)
54 timer
->elapsed_time (tv
);
56 ACE_TEXT ("%s: %u messages took %u msec (%f msec/message)\n"),
60 (double) tv
.msec () / max_messages
));
63 #if defined (ACE_HAS_THREADS)
64 using SYNCH_QUEUE
= ACE_Message_Queue_Ex
<User_Class
, ACE_MT_SYNCH
>;
67 * Container for data passed to sender and receiver in
70 * For use in multithreaded performance test.
74 /// The message queue.
77 /// Pointer to messages blocks for sender to send to reciever.
78 User_Class
**send_block_
;
80 /// Default constructor.
82 : q_ (0), send_block_ (0)
87 struct MQ_Ex_N_Tester_Wrapper
90 // Container for data passed to sender in the MQ_Ex_N_Tester
94 // For use in multithreaded performance test.
95 MQ_Ex_N_Tester
*tester_
;
96 User_Class
*head_send_block_
;
99 #endif /* ACE_HAS_THREADS */
101 // Encapsulates the sent messages creation and destruction
104 Send_Messages (int number_of_messages
, int chain_limit
):
106 number_of_messages_ (number_of_messages
),
107 chain_limit_ (chain_limit
)
111 int create_messages (const char test_message
[])
113 int limit
= this->number_of_messages_
/ this->chain_limit_
;
114 ACE_NEW_RETURN (this->send_block_
,
119 for (i
= 0; i
< limit
; ++i
)
121 User_Class
*&temp1
= this->send_block_
[i
];
122 ACE_NEW_RETURN (temp1
,
123 User_Class (test_message
),
125 User_Class
*tail
= temp1
;
126 for (j
= 1; j
< this->chain_limit_
; ++j
)
128 User_Class
*temp2
= 0;
129 ACE_NEW_RETURN (temp2
,
130 User_Class (test_message
),
136 this->head_send_block_
= this->send_block_
[0];
143 int limit
= this->number_of_messages_
/ this->chain_limit_
;
144 for (; i
< limit
; ++i
)
146 User_Class
*&temp1
= this->send_block_
[i
];
147 for (j
= 0; j
< this->chain_limit_
; ++j
)
149 User_Class
*temp2
= temp1
->next ();
154 delete [] this->send_block_
;
157 User_Class
* head_send_block_
;
158 User_Class
** send_block_
;
159 int number_of_messages_
;
163 // Encapsulates the received messages creation and destruction
164 struct Receive_Messages
166 Receive_Messages (int number_of_messages
) :
168 number_of_messages_ (number_of_messages
)
174 ACE_NEW_RETURN (this->receive_block_
,
175 User_Class
*[this->number_of_messages_
],
182 delete [] this->receive_block_
;
185 User_Class
**receive_block_
;
186 int number_of_messages_
;
190 single_thread_performance_test ()
192 const char test_message
[] =
193 "ACE_Message_Queue_Ex Test Message";
194 const ACE_TCHAR
*message
=
195 ACE_TEXT ("ACE_Message_Queue_Ex<ACE_NULL_SYNCH>, single thread");
197 // Create a message queue.
200 ACE_NEW_RETURN (msgq
,
204 // Create the messages. Allocate off the heap in case messages is
205 // large relative to the amount of stack space available.
206 User_Class
**send_block
= 0;
207 ACE_NEW_RETURN (send_block
,
208 User_Class
*[max_messages
],
213 for (i
= 0; i
< max_messages
; ++i
)
214 ACE_NEW_RETURN (send_block
[i
],
215 User_Class (test_message
),
218 User_Class
**receive_block_p
= 0;
219 ACE_NEW_RETURN (receive_block_p
,
220 User_Class
*[max_messages
],
225 // Send/receive the messages.
226 for (i
= 0; i
< max_messages
; ++i
)
228 if (msgq
->enqueue_tail (send_block
[i
]) == -1)
229 ACE_ERROR_RETURN ((LM_ERROR
,
231 ACE_TEXT ("enqueue")),
234 if (msgq
->dequeue_head (receive_block_p
[i
]) == -1)
235 ACE_ERROR_RETURN ((LM_ERROR
,
237 ACE_TEXT ("dequeue_head")),
242 print_message (message
);
245 delete [] receive_block_p
;
247 for (i
= 0; i
< max_messages
; ++i
)
248 delete send_block
[i
];
249 delete [] send_block
;
256 MQ_Ex_N_Tester::single_thread_performance_test ()
258 // Create the messages. Allocate off the heap in case messages is
259 // large relative to the amount of stack space available.
261 if ((0 != this->test_enqueue_tail ()) ||
262 (0 != this->test_enqueue_head ()) )
271 MQ_Ex_N_Tester::test_enqueue_tail ()
273 const ACE_TCHAR
*message
=
274 ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_NULL_SYNCH>, test_enqueue_tail");
276 // Send_Messages creates messages and deletes them when it gets out of scope
277 Send_Messages
messages (max_messages
, chain_limit
);
278 if (-1 == messages
.create_messages (test_message
))
282 Receive_Messages
r_messages (max_messages
);
283 if (-1 == r_messages
.create ())
289 int limit
= max_messages
/ chain_limit
;
291 // Send with just one call
292 for (int i
= 0; i
< limit
; ++i
)
294 if (-1 == this->st_queue_
.enqueue_tail (messages
.send_block_
[i
]))
296 ACE_ERROR_RETURN ((LM_ERROR
,
298 ACE_TEXT ("enqueue_tail_n")),
302 for (int j
= 0, k
= 0; j
< chain_limit
; ++j
, ++k
)
304 if (this->st_queue_
.dequeue_head (r_messages
.receive_block_
[k
]) == -1)
306 ACE_ERROR_RETURN ((LM_ERROR
,
308 ACE_TEXT ("dequeue_head")),
315 print_message (message
);
323 MQ_Ex_N_Tester::test_enqueue_head ()
325 const ACE_TCHAR
*message
=
326 ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_NULL_SYNCH>, test_enqueue_head");
328 // Send_Messages creates messages and deletes them when it gets out of scope
329 Send_Messages
messages (max_messages
, chain_limit
);
330 if (-1 == messages
.create_messages (test_message
))
334 Receive_Messages
r_messages (max_messages
);
335 if (-1 == r_messages
.create ())
343 int limit
= max_messages
/ chain_limit
;
346 // Send/receive the messages.
347 // Send with just one call
348 for (i
= 0; i
< limit
; ++i
)
350 if (-1 == this->st_queue_
.enqueue_head (messages
.send_block_
[i
]))
352 ACE_ERROR_RETURN ((LM_ERROR
,
354 ACE_TEXT ("enqueue_tail_n")),
358 for (j
= 0; j
< chain_limit
; ++j
, ++k
)
360 if (this->st_queue_
.dequeue_head (r_messages
.receive_block_
[k
]) == -1)
362 ACE_ERROR_RETURN ((LM_ERROR
,
364 ACE_TEXT ("dequeue_head")),
371 print_message (message
);
378 #if defined (ACE_HAS_THREADS)
383 Queue_Wrapper
*queue_wrapper
= reinterpret_cast<Queue_Wrapper
*> (arg
);
386 User_Class
**receive_block_p
= 0;
387 ACE_NEW_RETURN (receive_block_p
,
388 User_Class
*[max_messages
],
391 for (i
= 0; i
< max_messages
; ++i
)
392 if (queue_wrapper
->q_
->dequeue_head (receive_block_p
[i
]) == -1)
393 ACE_ERROR_RETURN ((LM_ERROR
,
395 ACE_TEXT ("dequeue_head")),
399 delete [] receive_block_p
;
407 Queue_Wrapper
*queue_wrapper
= reinterpret_cast<Queue_Wrapper
*> (arg
);
412 // Send the messages.
413 for (i
= 0; i
< max_messages
; ++i
)
414 if (queue_wrapper
->q_
->
415 enqueue_tail (queue_wrapper
->send_block_
[i
]) == -1)
416 ACE_ERROR_RETURN ((LM_ERROR
,
418 ACE_TEXT ("enqueue")),
426 Queue_Wrapper queue_wrapper
;
427 const ACE_TCHAR
*message
=
428 ACE_TEXT ("ACE_Message_Queue_Ex<ACE_SYNCH>");
431 // Create the messages. Allocate off the heap in case messages is
432 // large relative to the amount of stack space available. Allocate
433 // it here instead of in the sender, so that we can delete it after
434 // the _receiver_ is done.
435 User_Class
**send_block
= 0;
436 ACE_NEW_RETURN (send_block
,
437 User_Class
*[max_messages
],
440 for (i
= 0; i
< max_messages
; ++i
)
441 ACE_NEW_RETURN (send_block
[i
],
442 User_Class (test_message
),
445 queue_wrapper
.send_block_
= send_block
;
447 ACE_NEW_RETURN (queue_wrapper
.q_
,
451 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) sender
,
454 ACE_ERROR_RETURN ((LM_ERROR
,
456 ACE_TEXT ("spawning sender thread")),
459 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) receiver
,
462 ACE_ERROR_RETURN ((LM_ERROR
,
464 ACE_TEXT ("spawning receiver thread")),
467 ACE_Thread_Manager::instance ()->wait ();
468 print_message (message
);
471 delete queue_wrapper
.q_
;
472 queue_wrapper
.q_
= 0;
474 for (i
= 0; i
< max_messages
; ++i
)
475 delete send_block
[i
];
476 delete [] send_block
;
482 MQ_Ex_N_Tester::performance_test ()
484 const ACE_TCHAR
*message
=
485 ACE_TEXT ("ACE_Message_Queue_Ex_N<ACE_SYNCH>");
487 Send_Messages
messages (max_messages
, chain_limit
);
488 if (-1 == messages
.create_messages (test_message
))
493 MQ_Ex_N_Tester_Wrapper tester_wrapper
;
494 tester_wrapper
.head_send_block_
= messages
.head_send_block_
;
495 tester_wrapper
.tester_
= this;
497 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) &MQ_Ex_N_Tester::sender
,
500 ACE_ERROR_RETURN ((LM_ERROR
,
502 ACE_TEXT ("spawning sender thread")),
505 if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC
) &MQ_Ex_N_Tester::receiver
,
508 ACE_ERROR_RETURN ((LM_ERROR
,
510 ACE_TEXT ("spawning receiver thread")),
513 ACE_Thread_Manager::instance ()->wait ();
515 print_message (message
);
523 MQ_Ex_N_Tester::receiver (void *args
)
525 MQ_Ex_N_Tester
*tester
= reinterpret_cast<MQ_Ex_N_Tester
*> (args
);
527 User_Class
**receive_block_p
= 0;
528 ACE_NEW_RETURN (receive_block_p
,
529 User_Class
*[max_messages
],
530 (ACE_THR_FUNC_RETURN
) -1);
533 tester_barrier
.wait ();
534 for (i
= 0; i
< max_messages
; ++i
)
536 if (tester
->mt_queue_
.dequeue_head (receive_block_p
[i
]) == -1)
538 ACE_ERROR ((LM_ERROR
,
540 ACE_TEXT ("dequeue_head")));
541 return (ACE_THR_FUNC_RETURN
) -1;
546 delete [] receive_block_p
;
552 MQ_Ex_N_Tester::sender (void *args
)
554 MQ_Ex_N_Tester_Wrapper
*tester_wrapper
=
555 reinterpret_cast<MQ_Ex_N_Tester_Wrapper
*> (args
);
556 MQ_Ex_N_Tester
*tester
= tester_wrapper
->tester_
;
558 Send_Messages
messages (max_messages
, chain_limit
);
559 if (-1 == messages
.create_messages (test_message
))
561 return (ACE_THR_FUNC_RETURN
) -1;
563 int limit
= max_messages
/ chain_limit
;
564 tester_barrier
.wait ();
566 // Send/receive the messages.
568 // Send with just one call
569 for (int i
= 0; i
< limit
; ++i
)
571 if (-1 == tester
->mt_queue_
.enqueue_tail (messages
.send_block_
[i
]))
573 ACE_ERROR ((LM_ERROR
,
575 ACE_TEXT ("enqueue_tail_n")));
576 return (ACE_THR_FUNC_RETURN
) -1;
582 #endif /* ACE_HAS_THREADS */
584 int basic_queue_test (ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
>& q
)
589 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("New queue is not empty!\n")));
595 ACE_Time_Value
tv (ACE_OS::gettimeofday ()); // Now
596 if (q
.dequeue_head (b
, &tv
) != -1)
598 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Dequeued from empty queue!\n")));
601 else if (errno
!= EWOULDBLOCK
)
603 ACE_ERROR ((LM_ERROR
,
605 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
610 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Timed dequeue test: OK\n")));
611 status
= 0; // All is well
618 int queue_priority_test (ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
>& q
)
622 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("Prio test queue not empty\n")), 1);
624 // Set up a few objects with names for how they should come out of the queue.
625 ACE_Auto_Basic_Ptr
<User_Class
> b1
, b2
, b3
, b4
;
626 b1
.reset (new User_Class ("first"));
627 b2
.reset (new User_Class ("second"));
628 b3
.reset (new User_Class ("third"));
629 b4
.reset (new User_Class ("fourth"));
631 ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
>::DEFAULT_PRIORITY
;
634 if (-1 == q
.enqueue_prio (b2
.get (), 0, prio
))
635 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b2")), 1);
636 if (-1 == q
.enqueue_prio (b3
.get (), 0, prio
))
637 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b3")), 1);
639 if (-1 == q
.enqueue_prio (b4
.get (), 0, prio
))
640 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b4")), 1);
642 if (-1 == q
.enqueue_prio (b1
.get (), 0, prio
))
643 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("b1")), 1);
646 if (q
.dequeue_head (b
) == -1)
648 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 1")));
653 if (ACE_OS::strcmp (b
->message (), "first") != 0)
655 ACE_ERROR ((LM_ERROR
,
656 ACE_TEXT ("First dequeued was %C\n"),
661 if (q
.dequeue_head (b
) == -1)
663 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 2")));
668 if (ACE_OS::strcmp (b
->message (), "second") != 0)
670 ACE_ERROR ((LM_ERROR
,
671 ACE_TEXT ("Second dequeued was %C\n"),
676 if (q
.dequeue_head (b
) == -1)
678 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 3")));
683 if (ACE_OS::strcmp (b
->message (), "third") != 0)
685 ACE_ERROR ((LM_ERROR
,
686 ACE_TEXT ("Third dequeued was %C\n"),
691 if (q
.dequeue_head (b
) == -1)
693 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("dequeue 4")));
698 if (ACE_OS::strcmp (b
->message (), "fourth") != 0)
700 ACE_ERROR ((LM_ERROR
,
701 ACE_TEXT ("Fourth dequeued was %C\n"),
708 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Priority queueing test: OK\n")));
713 run_main (int argc
, ACE_TCHAR
*argv
[])
715 ACE_START_TEST (ACE_TEXT ("Message_Queue_Test_Ex"));
719 if (! ACE_OS::strcmp (argv
[1], ACE_TEXT ("-?")))
721 ACE_ERROR ((LM_ERROR
,
727 max_messages
= ACE_OS::atoi (argv
[1]);
733 // Be sure that the a timed out get sets the error code properly.
734 ACE_Message_Queue_Ex
<User_Class
, ACE_SYNCH
> q1
;
735 ACE_Message_Queue_Ex_N
<User_Class
, ACE_SYNCH
> q2
;
736 if (0 != basic_queue_test (q1
) ||
737 0 != basic_queue_test (q2
))
742 // Check priority operations.
743 if (0 != queue_priority_test (q1
))
748 ACE_NEW_RETURN (timer
,
752 status
+= single_thread_performance_test ();
754 #if defined (ACE_HAS_THREADS)
755 status
+= performance_test ();
756 #endif /* ACE_HAS_THREADS */
759 MQ_Ex_N_Tester ex_n_tester
;
760 status
+= ex_n_tester
.single_thread_performance_test ();
761 #if defined (ACE_HAS_THREADS)
762 status
+= ex_n_tester
.performance_test ();
763 #endif /* ACE_HAS_THREADS */
767 ACE_ERROR ((LM_ERROR
,
769 ACE_TEXT ("test failed")));