1 // This short program copies stdin to stdout via the use of an ASX
2 // Message_Queue. It illustrates an implementation of the classic
3 // "bounded buffer" program.
5 #include "ace/Message_Queue.h"
6 #include "ace/Thread_Manager.h"
7 #include "ace/Truncate.h"
8 #include "ace/OS_NS_time.h"
9 #include "ace/OS_NS_unistd.h"
12 #if defined (ACE_HAS_THREADS)
14 // The producer reads data from the stdin stream, creates a message,
15 // and then queues the message in the message list, where it is
16 // removed by the consumer thread. A 0-sized message is enqueued when
17 // there is no more data to read. The consumer uses this as a flag to
21 producer (ACE_Message_Queue
<ACE_MT_SYNCH
> *msg_queue
)
23 // Keep reading stdin, until we reach EOF.
27 // Allocate a new message.
28 ACE_Message_Block
*mb
= 0;
30 ACE_NEW_RETURN (mb
, ACE_Message_Block (BUFSIZ
), 0);
32 n
= ACE_OS::read (ACE_STDIN
, mb
->wr_ptr (), mb
->size ());
36 // Send a shutdown message to the other thread and exit.
38 if (msg_queue
->enqueue_tail (mb
) == -1)
44 // Send the message to the other thread.
49 if (msg_queue
->enqueue_tail (mb
) == -1)
59 // The consumer dequeues a message from the ACE_Message_Queue, writes
60 // the message to the stderr stream, and deletes the message. The
61 // producer sends a 0-sized message to inform the consumer to stop
64 static void *consumer (ACE_Message_Queue
<ACE_MT_SYNCH
> *msg_queue
)
68 // Keep looping, reading a message out of the queue, until we timeout
69 // or get a message with a length == 0, which signals us to quit.
73 ACE_Message_Block
*mb
= 0;
75 ACE_Time_Value
timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds
77 result
= msg_queue
->dequeue_head (mb
, &timeout
);
82 int length
= ACE_Utils::truncate_cast
<int> (mb
->length ());
85 ACE_OS::write (ACE_STDOUT
, mb
->rd_ptr (), length
);
93 if (result
== -1 && errno
== EWOULDBLOCK
)
96 "timed out waiting for message",
101 // Spawn off two threads that copy stdin to stdout.
104 ACE_TMAIN (int, ACE_TCHAR
*[])
107 ACE_Message_Queue
<ACE_MT_SYNCH
> msg_queue
;
109 if (ACE_Thread_Manager::instance ()->spawn
110 (ACE_THR_FUNC (producer
),
112 THR_NEW_LWP
| THR_DETACHED
) == -1)
113 ACE_ERROR_RETURN ((LM_ERROR
,
117 else if (ACE_Thread_Manager::instance ()->spawn
118 (ACE_THR_FUNC (consumer
),
120 THR_NEW_LWP
| THR_DETACHED
) == -1)
121 ACE_ERROR_RETURN ((LM_ERROR
,
126 // Wait for producer and consumer threads to exit.
127 ACE_Thread_Manager::instance ()->wait ();
132 ACE_TMAIN (int, ACE_TCHAR
*[])
134 ACE_ERROR ((LM_ERROR
, "threads not supported on this platform\n"));
137 #endif /* ACE_HAS_THREADS */