1 // This short program prints the contents of stdin to stdout sorted by
2 // the length of each line via the use of an ASX Message_Queue. It
3 // illustrates how priorities can be used for ACE Message_Queues.
5 #include "ace/OS_NS_stdio.h"
6 #include "ace/Malloc_Base.h" // To get ACE_Allocator
7 #include "ace/Message_Queue.h"
8 #include "ace/Read_Buffer.h"
9 #include "ace/Thread_Manager.h"
10 #include "ace/Service_Config.h"
11 #include "ace/Truncate.h"
14 #if defined (ACE_HAS_THREADS)
16 // Global thread manager.
17 static ACE_Thread_Manager thr_mgr
;
19 // Make the queue be capable of being *very* large.
20 static const long max_queue
= LONG_MAX
;
22 // The consumer dequeues a message from the ACE_Message_Queue, writes
23 // the message to the stderr stream, and deletes the message. The
24 // producer sends a 0-sized message to inform the consumer to stop
28 consumer (ACE_Message_Queue
<ACE_MT_SYNCH
> *msg_queue
)
30 // Keep looping, reading a message out of the queue, until we
31 // timeout or get a message with a length == 0, which signals us to
36 ACE_Message_Block
*mb
= 0;
38 if (msg_queue
->dequeue_head (mb
) == -1)
41 int length
= ACE_Utils::truncate_cast
<int> (mb
->length ());
44 ACE_OS::puts (mb
->rd_ptr ());
46 // Free up the buffer memory and the Message_Block.
47 ACE_Allocator::instance ()->free (mb
->rd_ptr ());
57 // The producer reads data from the stdin stream, creates a message,
58 // and then queues the message in the message list, where it is
59 // removed by the consumer thread. A 0-sized message is enqueued when
60 // there is no more data to read. The consumer uses this as a flag to
64 producer (ACE_Message_Queue
<ACE_MT_SYNCH
> *msg_queue
)
66 ACE_Read_Buffer
rb (ACE_STDIN
);
68 // Keep reading stdin, until we reach EOF.
72 // Allocate a new buffer.
73 char *buffer
= rb
.read ('\n');
75 ACE_Message_Block
*mb
= 0;
79 // Send a 0-sized shutdown message to the other thread and
82 ACE_NEW_RETURN (mb
, ACE_Message_Block ((size_t) 0), 0);
84 if (msg_queue
->enqueue_tail (mb
) == -1)
85 ACE_ERROR ((LM_ERROR
, "(%t) %p\n", "put_next"));
89 // Enqueue the message in priority order.
92 // Allocate a new message, but have it "borrow" its memory
95 ACE_Message_Block (rb
.size (),
96 ACE_Message_Block::MB_DATA
,
100 mb
->msg_priority (ACE_Utils::truncate_cast
<unsigned long> (rb
.size ()));
101 mb
->wr_ptr (rb
.size ());
103 ACE_DEBUG ((LM_DEBUG
,
104 "enqueueing message of size %d\n",
105 mb
->msg_priority ()));
107 // Enqueue in priority order.
108 if (msg_queue
->enqueue_prio (mb
) == -1)
109 ACE_ERROR ((LM_ERROR
, "(%t) %p\n", "put_next"));
113 // Now read all the items out in priority order (i.e., ordered by
114 // the size of the lines!).
115 consumer (msg_queue
);
120 // Spawn off one thread that copies stdin to stdout in order of the
121 // size of each line.
124 ACE_TMAIN (int, ACE_TCHAR
*[])
127 ACE_Message_Queue
<ACE_MT_SYNCH
> msg_queue (max_queue
);
129 if (thr_mgr
.spawn (ACE_THR_FUNC (producer
), (void *) &msg_queue
,
130 THR_NEW_LWP
| THR_DETACHED
) == -1)
131 ACE_ERROR_RETURN ((LM_ERROR
, "%p\n", "spawn"), 1);
133 // Wait for producer and consumer threads to exit.
139 ACE_TMAIN (int, ACE_TCHAR
*[])
141 ACE_ERROR ((LM_ERROR
, "threads not supported on this platform\n"));
144 #endif /* ACE_HAS_THREADS */