Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / ASX / Message_Queue / priority_buffer.cpp
blobc622d99844b79b4c314b4f5cc3643285e7774366
1 // This short program prints the contents of stdin to stdout sorted by
2 // the length of each line via the use of an ASX Message_Queue. It
3 // illustrates how priorities can be used for ACE Message_Queues.
5 #include "ace/OS_NS_stdio.h"
6 #include "ace/Malloc_Base.h" // To get ACE_Allocator
7 #include "ace/Message_Queue.h"
8 #include "ace/Read_Buffer.h"
9 #include "ace/Thread_Manager.h"
10 #include "ace/Service_Config.h"
11 #include "ace/Truncate.h"
14 #if defined (ACE_HAS_THREADS)
16 // Global thread manager.
17 static ACE_Thread_Manager thr_mgr;
19 // Make the queue be capable of being *very* large.
20 static const long max_queue = LONG_MAX;
22 // The consumer dequeues a message from the ACE_Message_Queue, writes
23 // the message to the stderr stream, and deletes the message. The
24 // producer sends a 0-sized message to inform the consumer to stop
25 // reading and exit.
27 static void *
28 consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
30 // Keep looping, reading a message out of the queue, until we
31 // timeout or get a message with a length == 0, which signals us to
32 // quit.
34 for (;;)
36 ACE_Message_Block *mb = 0;
38 if (msg_queue->dequeue_head (mb) == -1)
39 break;
41 int length = ACE_Utils::truncate_cast<int> (mb->length ());
43 if (length > 0)
44 ACE_OS::puts (mb->rd_ptr ());
46 // Free up the buffer memory and the Message_Block.
47 ACE_Allocator::instance ()->free (mb->rd_ptr ());
48 mb->release ();
50 if (length == 0)
51 break;
54 return 0;
57 // The producer reads data from the stdin stream, creates a message,
58 // and then queues the message in the message list, where it is
59 // removed by the consumer thread. A 0-sized message is enqueued when
60 // there is no more data to read. The consumer uses this as a flag to
61 // know when to exit.
63 static void *
64 producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
66 ACE_Read_Buffer rb (ACE_STDIN);
68 // Keep reading stdin, until we reach EOF.
70 for (;;)
72 // Allocate a new buffer.
73 char *buffer = rb.read ('\n');
75 ACE_Message_Block *mb = 0;
77 if (buffer == 0)
79 // Send a 0-sized shutdown message to the other thread and
80 // exit.
82 ACE_NEW_RETURN (mb, ACE_Message_Block ((size_t) 0), 0);
84 if (msg_queue->enqueue_tail (mb) == -1)
85 ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
86 break;
89 // Enqueue the message in priority order.
90 else
92 // Allocate a new message, but have it "borrow" its memory
93 // from the buffer.
94 ACE_NEW_RETURN (mb,
95 ACE_Message_Block (rb.size (),
96 ACE_Message_Block::MB_DATA,
98 buffer),
99 0);
100 mb->msg_priority (ACE_Utils::truncate_cast<unsigned long> (rb.size ()));
101 mb->wr_ptr (rb.size ());
103 ACE_DEBUG ((LM_DEBUG,
104 "enqueueing message of size %d\n",
105 mb->msg_priority ()));
107 // Enqueue in priority order.
108 if (msg_queue->enqueue_prio (mb) == -1)
109 ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
113 // Now read all the items out in priority order (i.e., ordered by
114 // the size of the lines!).
115 consumer (msg_queue);
117 return 0;
120 // Spawn off one thread that copies stdin to stdout in order of the
121 // size of each line.
124 ACE_TMAIN (int, ACE_TCHAR *[])
126 // Message queue.
127 ACE_Message_Queue<ACE_MT_SYNCH> msg_queue (max_queue);
129 if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue,
130 THR_NEW_LWP | THR_DETACHED) == -1)
131 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
133 // Wait for producer and consumer threads to exit.
134 thr_mgr.wait ();
135 return 0;
137 #else
139 ACE_TMAIN (int, ACE_TCHAR *[])
141 ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
142 return 0;
144 #endif /* ACE_HAS_THREADS */