2 //=============================================================================
4 * @file Priority_Buffer_Test.cpp
6 * This is a simple test to illustrate the priority mechanism of
7 * <ACE_Message_Queue>s. The producer uses an <ACE_Message_Queue>
8 * to enqueue a bunch of messages with different priorities which
9 * are then dequeued by the consumer.
11 * @author Prashant Jain <pjain@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
13 //=============================================================================
16 #include "test_config.h"
17 #include "ace/Message_Queue.h"
18 #include "ace/Thread_Manager.h"
21 #if defined (ACE_HAS_THREADS)
23 static const char ACE_ALPHABET
[] = "abcdefghijklmnopqrstuvwxyz";
25 // Global message count.
26 static int message_count
= 0;
28 // Make the queue be capable of being *very* large.
29 static const long max_queue
= LONG_MAX
;
31 // The consumer dequeues a message from the ACE_Message_Queue, writes
32 // the message to the stderr stream, and deletes the message. The
33 // producer sends a 0-sized message to inform the consumer to stop
39 ACE_Message_Queue
<ACE_MT_SYNCH
> *msg_queue
=
40 reinterpret_cast<ACE_Message_Queue
<ACE_MT_SYNCH
> *> (args
);
42 u_long cur_priority
= 27;
43 ACE_UNUSED_ARG (cur_priority
);
44 // To suppress ghs warning about unused local variable
49 // Keep looping, reading a message out of the queue, until we get a
50 // message with a length == 0, which signals us to quit.
51 for (char c
= 'z'; ; c
--)
53 ACE_Message_Block
*mb
= 0;
55 int result
= msg_queue
->dequeue_head (mb
);
62 size_t length
= mb
->length ();
66 // This isn't a "shutdown" message, so process it
68 ACE_TEST_ASSERT (c
== *mb
->rd_ptr ());
69 ACE_TEST_ASSERT (mb
->msg_priority () < cur_priority
);
70 cur_priority
= mb
->msg_priority ();
73 // Free up the buffer memory and the Message_Block. Note that
74 // the destructor of Message Block will delete the the actual
79 // This was a "shutdown" message, so break out of the loop.
83 ACE_TEST_ASSERT (local_count
== message_count
);
87 // The producer reads data from the stdin stream, creates a message,
88 // and then queues the message in the message list, where it is
89 // removed by the consumer thread. A 0-sized message is enqueued when
90 // there is no more data to read. The consumer uses this as a flag to
96 ACE_Message_Queue
<ACE_MT_SYNCH
> *msg_queue
=
97 reinterpret_cast<ACE_Message_Queue
<ACE_MT_SYNCH
> *> (args
);
99 ACE_Message_Block
*mb
= 0;
101 for (const char *c
= ACE_ALPHABET
; *c
!= '\0'; c
++)
105 // Allocate a new message
108 ACE_Message_Block (1),
113 mb
->msg_priority (message_count
);
116 // Enqueue in priority order.
117 if (msg_queue
->enqueue_prio (mb
) == -1)
118 ACE_ERROR_RETURN ((LM_ERROR
,
119 ACE_TEXT ("(%t) %p\n"),
120 ACE_TEXT ("put_next")),
124 // Now send a 0-sized shutdown message to the other thread
126 ACE_Message_Block ((size_t) 0),
129 if (msg_queue
->enqueue_tail (mb
) == -1)
130 ACE_ERROR ((LM_ERROR
,
131 ACE_TEXT ("(%t) %p\n"),
132 ACE_TEXT ("put_next")));
136 // Now read all the items out in priority order (i.e., ordered by
137 // the size of the lines!).
138 consumer (msg_queue
);
143 #endif /* ACE_HAS_THREADS */
145 // Spawn off one thread that copies stdin to stdout in order of the
146 // size of each line.
149 run_main (int, ACE_TCHAR
*[])
151 ACE_START_TEST (ACE_TEXT ("Priority_Buffer_Test"));
153 #if defined (ACE_HAS_THREADS)
155 ACE_Message_Queue
<ACE_MT_SYNCH
> msg_queue (max_queue
);
157 if (ACE_Thread_Manager::instance ()->spawn
158 (ACE_THR_FUNC (producer
),
160 THR_NEW_LWP
| THR_DETACHED
) == -1)
161 ACE_ERROR_RETURN ((LM_ERROR
,
166 // Wait for producer and consumer threads to exit.
167 ACE_Thread_Manager::instance ()->wait ();
170 ACE_TEXT ("threads not supported on this platform\n")));
171 #endif /* ACE_HAS_THREADS */