Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Priority_Buffer_Test.cpp
blob57d257d8b3761aa4d2f5be246bd70e1b7c7fe2cd
2 //=============================================================================
3 /**
4 * @file Priority_Buffer_Test.cpp
6 * This is a simple test to illustrate the priority mechanism of
7 * <ACE_Message_Queue>s. The producer uses an <ACE_Message_Queue>
8 * to enqueue a bunch of messages with different priorities which
9 * are then dequeued by the consumer.
11 * @author Prashant Jain <pjain@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
13 //=============================================================================
16 #include "test_config.h"
17 #include "ace/Message_Queue.h"
18 #include "ace/Thread_Manager.h"
22 #if defined (ACE_HAS_THREADS)
24 static const char ACE_ALPHABET[] = "abcdefghijklmnopqrstuvwxyz";
26 // Global message count.
27 static int message_count = 0;
29 // Make the queue be capable of being *very* large.
30 static const long max_queue = LONG_MAX;
32 // The consumer dequeues a message from the ACE_Message_Queue, writes
33 // the message to the stderr stream, and deletes the message. The
34 // producer sends a 0-sized message to inform the consumer to stop
35 // reading and exit.
37 static void *
38 consumer (void *args)
40 ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue =
41 reinterpret_cast<ACE_Message_Queue<ACE_MT_SYNCH> *> (args);
43 u_long cur_priority = 27;
44 ACE_UNUSED_ARG (cur_priority);
45 // To suppress ghs warning about unused local variable
46 // "cur_priority".
48 int local_count = 0;
50 // Keep looping, reading a message out of the queue, until we get a
51 // message with a length == 0, which signals us to quit.
52 for (char c = 'z'; ; c--)
54 ACE_Message_Block *mb = 0;
56 int result = msg_queue->dequeue_head (mb);
58 if (result == -1)
59 break;
61 local_count++;
63 size_t length = mb->length ();
65 if (length > 0)
67 // This isn't a "shutdown" message, so process it
68 // "normally."
69 ACE_TEST_ASSERT (c == *mb->rd_ptr ());
70 ACE_TEST_ASSERT (mb->msg_priority () < cur_priority);
71 cur_priority = mb->msg_priority ();
74 // Free up the buffer memory and the Message_Block. Note that
75 // the destructor of Message Block will delete the the actual
76 // buffer.
77 mb->release ();
79 if (length == 0)
80 // This was a "shutdown" message, so break out of the loop.
81 break;
84 ACE_TEST_ASSERT (local_count == message_count);
85 return 0;
88 // The producer reads data from the stdin stream, creates a message,
89 // and then queues the message in the message list, where it is
90 // removed by the consumer thread. A 0-sized message is enqueued when
91 // there is no more data to read. The consumer uses this as a flag to
92 // know when to exit.
94 static void *
95 producer (void *args)
97 ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue =
98 reinterpret_cast<ACE_Message_Queue<ACE_MT_SYNCH> *> (args);
100 ACE_Message_Block *mb = 0;
102 for (const char *c = ACE_ALPHABET; *c != '\0'; c++)
104 ++message_count;
106 // Allocate a new message
108 ACE_NEW_RETURN (mb,
109 ACE_Message_Block (1),
111 *mb->wr_ptr () = *c;
113 // Set the priority.
114 mb->msg_priority (message_count);
115 mb->wr_ptr (1);
117 // Enqueue in priority order.
118 if (msg_queue->enqueue_prio (mb) == -1)
119 ACE_ERROR_RETURN ((LM_ERROR,
120 ACE_TEXT ("(%t) %p\n"),
121 ACE_TEXT ("put_next")),
125 // Now send a 0-sized shutdown message to the other thread
126 ACE_NEW_RETURN (mb,
127 ACE_Message_Block ((size_t) 0),
130 if (msg_queue->enqueue_tail (mb) == -1)
131 ACE_ERROR ((LM_ERROR,
132 ACE_TEXT ("(%t) %p\n"),
133 ACE_TEXT ("put_next")));
135 ++message_count;
137 // Now read all the items out in priority order (i.e., ordered by
138 // the size of the lines!).
139 consumer (msg_queue);
141 return 0;
144 #endif /* ACE_HAS_THREADS */
146 // Spawn off one thread that copies stdin to stdout in order of the
147 // size of each line.
150 run_main (int, ACE_TCHAR *[])
152 ACE_START_TEST (ACE_TEXT ("Priority_Buffer_Test"));
154 #if defined (ACE_HAS_THREADS)
155 // Message queue.
156 ACE_Message_Queue<ACE_MT_SYNCH> msg_queue (max_queue);
158 if (ACE_Thread_Manager::instance ()->spawn
159 (ACE_THR_FUNC (producer),
160 (void *) &msg_queue,
161 THR_NEW_LWP | THR_DETACHED) == -1)
162 ACE_ERROR_RETURN ((LM_ERROR,
163 ACE_TEXT ("%p\n"),
164 ACE_TEXT ("spawn")),
167 // Wait for producer and consumer threads to exit.
168 ACE_Thread_Manager::instance ()->wait ();
169 #else
170 ACE_ERROR ((LM_INFO,
171 ACE_TEXT ("threads not supported on this platform\n")));
172 #endif /* ACE_HAS_THREADS */
173 ACE_END_TEST;
174 return 0;