Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / ASX / Message_Queue / bounded_buffer.cpp
blob1024ab549da728617e910dc3581d9afa304a17b7
1 // This short program copies stdin to stdout via the use of an ASX
2 // Message_Queue. It illustrates an implementation of the classic
3 // "bounded buffer" program.
5 #include "ace/Message_Queue.h"
6 #include "ace/Thread_Manager.h"
7 #include "ace/Truncate.h"
8 #include "ace/OS_NS_time.h"
9 #include "ace/OS_NS_unistd.h"
12 #if defined (ACE_HAS_THREADS)
14 // The producer reads data from the stdin stream, creates a message,
15 // and then queues the message in the message list, where it is
16 // removed by the consumer thread. A 0-sized message is enqueued when
17 // there is no more data to read. The consumer uses this as a flag to
18 // know when to exit.
20 static void *
21 producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
23 // Keep reading stdin, until we reach EOF.
25 for (int n; ; )
27 // Allocate a new message.
28 ACE_Message_Block *mb = 0;
30 ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ), 0);
32 n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), mb->size ());
34 if (n <= 0)
36 // Send a shutdown message to the other thread and exit.
37 mb->length (0);
38 if (msg_queue->enqueue_tail (mb) == -1)
39 ACE_ERROR ((LM_ERROR,
40 "(%t) %p\n",
41 "put_next"));
42 break;
44 // Send the message to the other thread.
45 else
47 mb->msg_priority (n);
48 mb->wr_ptr (n);
49 if (msg_queue->enqueue_tail (mb) == -1)
50 ACE_ERROR ((LM_ERROR,
51 "(%t) %p\n",
52 "put_next"));
56 return 0;
59 // The consumer dequeues a message from the ACE_Message_Queue, writes
60 // the message to the stderr stream, and deletes the message. The
61 // producer sends a 0-sized message to inform the consumer to stop
62 // reading and exit.
64 static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
66 int result = 0;
68 // Keep looping, reading a message out of the queue, until we timeout
69 // or get a message with a length == 0, which signals us to quit.
71 for (;;)
73 ACE_Message_Block *mb = 0;
75 ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds
77 result = msg_queue->dequeue_head (mb, &timeout);
79 if (result == -1)
80 break;
82 int length = ACE_Utils::truncate_cast<int> (mb->length ());
84 if (length > 0)
85 ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
87 mb->release ();
89 if (length == 0)
90 break;
93 if (result == -1 && errno == EWOULDBLOCK)
94 ACE_ERROR ((LM_ERROR,
95 "(%t) %p\n%a",
96 "timed out waiting for message",
97 1));
98 return 0;
101 // Spawn off two threads that copy stdin to stdout.
104 ACE_TMAIN (int, ACE_TCHAR *[])
106 // Message list.
107 ACE_Message_Queue<ACE_MT_SYNCH> msg_queue;
109 if (ACE_Thread_Manager::instance ()->spawn
110 (ACE_THR_FUNC (producer),
111 (void *) &msg_queue,
112 THR_NEW_LWP | THR_DETACHED) == -1)
113 ACE_ERROR_RETURN ((LM_ERROR,
114 "%p\n",
115 "spawn"),
117 else if (ACE_Thread_Manager::instance ()->spawn
118 (ACE_THR_FUNC (consumer),
119 (void *) &msg_queue,
120 THR_NEW_LWP | THR_DETACHED) == -1)
121 ACE_ERROR_RETURN ((LM_ERROR,
122 "%p\n",
123 "spawn"),
126 // Wait for producer and consumer threads to exit.
127 ACE_Thread_Manager::instance ()->wait ();
128 return 0;
130 #else
132 ACE_TMAIN (int, ACE_TCHAR *[])
134 ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
135 return 0;
137 #endif /* ACE_HAS_THREADS */