Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / ASX / Message_Queue / buffer_stream.cpp
blob77c1437ff54850e7bf106705df698a4cec753b4a
1 // This short program copies stdin to stdout via the use of an ASX
2 // Stream. It illustrates an implementation of the classic "bounded
3 // buffer" program using an ASX Stream containing two Modules. Each
4 // ACE_Module contains two Tasks. Each ACE_Task contains a
5 // ACE_Message_Queue and a pointer to a ACE_Thread_Manager. Note how
6 // the use of these reusable components reduces the reliance on global
7 // variables, as compared with the bounded_buffer.C example.
9 #include "ace/OS_main.h"
10 #include "ace/OS_NS_stdio.h"
11 #include "ace/OS_NS_string.h"
12 #include "ace/OS_NS_time.h"
13 #include "ace/OS_NS_unistd.h"
14 #include "ace/Service_Config.h"
15 #include "ace/Stream.h"
16 #include "ace/Module.h"
17 #include "ace/Task.h"
18 #include "ace/Truncate.h"
21 #if defined (ACE_HAS_THREADS)
23 typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
24 typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
25 typedef ACE_Task<ACE_MT_SYNCH> MT_Task;
27 class Common_Task : public MT_Task
28 // = TITLE
29 // Methods that are common to the producer and consumer.
31 public:
32 Common_Task () {}
34 //FUZZ: disable check_for_lack_ACE_OS
35 // ACE_Task hooks
36 virtual int open (void * = 0);
37 virtual int close (u_long = 0);
38 //FUZZ: enable check_for_lack_ACE_OS
41 // Define the Producer interface.
43 class Producer : public Common_Task
45 public:
46 Producer () {}
48 // Read data from stdin and pass to consumer.
49 virtual int svc ();
52 class Consumer : public Common_Task
53 // = TITLE
54 // Define the Consumer interface.
56 public:
57 Consumer () {}
59 virtual int put (ACE_Message_Block *mb,
60 ACE_Time_Value *tv = 0);
61 // Enqueue the message on the ACE_Message_Queue for subsequent
62 // handling in the svc() method.
64 virtual int svc ();
65 // Receive message from producer and print to stdout.
67 private:
68 ACE_Time_Value timeout_;
71 class Filter : public MT_Task
72 // = TITLE
73 // Defines a Filter that prepends a line number in front of each
74 // line.
76 public:
77 Filter (): count_ (1) {}
79 virtual int put (ACE_Message_Block *mb,
80 ACE_Time_Value *tv = 0);
81 // Change the size of the message before passing it downstream.
83 private:
84 size_t count_;
85 // Count the number of lines passing through the filter.
88 // Spawn off a new thread.
90 int
91 Common_Task::open (void *)
93 if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)
94 ACE_ERROR_RETURN ((LM_ERROR,
95 ACE_TEXT ("%p\n"),
96 ACE_TEXT ("spawn")),
97 -1);
98 return 0;
102 Common_Task::close (u_long exit_status)
104 ACE_DEBUG ((LM_DEBUG,
105 ACE_TEXT ("(%t) thread is exiting with status %d in module %s\n"),
106 exit_status,
107 this->name ()));
109 // Can do anything here that is required when a thread exits, e.g.,
110 // storing thread-specific information in some other storage
111 // location, etc.
112 return 0;
115 // The Consumer reads data from the stdin stream, creates a message,
116 // and then queues the message in the message list, where it is
117 // removed by the consumer thread. A 0-sized message is enqueued when
118 // there is no more data to read. The consumer uses this as a flag to
119 // know when to exit.
122 Producer::svc ()
124 // Keep reading stdin, until we reach EOF.
126 for (int n; ; )
128 // Allocate a new message (add one to avoid nasty boundary
129 // conditions).
131 ACE_Message_Block *mb = 0;
133 ACE_NEW_RETURN (mb,
134 ACE_Message_Block (BUFSIZ + 1),
135 -1);
137 n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), BUFSIZ);
139 if (n <= 0)
141 // Send a shutdown message to the other thread and exit.
142 mb->length (0);
144 if (this->put_next (mb) == -1)
145 ACE_ERROR ((LM_ERROR,
146 ACE_TEXT ("(%t) %p\n"),
147 ACE_TEXT ("put_next")));
148 break;
151 // Send the message to the other thread.
152 else
154 mb->wr_ptr (n);
155 // NUL-terminate the string (since we use strlen() on it
156 // later).
157 mb->rd_ptr ()[n] = '\0';
159 if (this->put_next (mb) == -1)
160 ACE_ERROR ((LM_ERROR,
161 ACE_TEXT ("(%t) %p\n"),
162 ACE_TEXT ("put_next")));
166 return 0;
169 // Simply enqueue the Message_Block into the end of the queue.
172 Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
174 return this->putq (mb, tv);
177 // The consumer dequeues a message from the ACE_Message_Queue, writes
178 // the message to the stderr stream, and deletes the message. The
179 // Consumer sends a 0-sized message to inform the consumer to stop
180 // reading and exit.
183 Consumer::svc ()
185 int result = 0;
187 // Keep looping, reading a message out of the queue, until we
188 // timeout or get a message with a length == 0, which signals us to
189 // quit.
191 for (;;)
193 ACE_Message_Block *mb = 0;
195 // Wait for upto 4 seconds.
196 this->timeout_.sec (ACE_OS::time (0) + 4);
198 result = this->getq (mb, &this->timeout_);
200 if (result == -1)
201 break;
203 int length = ACE_Utils::truncate_cast<int> (mb->length ());
205 if (length > 0)
206 ACE_OS::write (ACE_STDOUT,
207 mb->rd_ptr (),
208 ACE_OS::strlen (mb->rd_ptr ()));
210 mb->release ();
212 if (length == 0)
213 break;
216 if (result == -1 && errno == EWOULDBLOCK)
217 ACE_ERROR ((LM_ERROR,
218 ACE_TEXT ("(%t) %p\n%a"),
219 ACE_TEXT ("timed out waiting for message"),
220 1));
221 return 0;
225 Filter::put (ACE_Message_Block *mb,
226 ACE_Time_Value *tv)
228 if (mb->length () == 0)
229 return this->put_next (mb, tv);
230 else
232 char buf[BUFSIZ];
234 // Stash a copy of the buffer away.
235 ACE_OS::strncpy (buf, mb->rd_ptr (), sizeof buf);
237 // Increase the size of the buffer large enough that it will be
238 // reallocated (in order to test the reallocation mechanisms).
240 mb->size (mb->length () + BUFSIZ);
241 mb->length (mb->size ());
243 // Prepend the line count in front of the buffer.
244 ACE_OS::sprintf (mb->rd_ptr (),
245 ACE_SIZE_T_FORMAT_SPECIFIER
246 ": %s",
247 this->count_++,
248 buf);
249 return this->put_next (mb, tv);
253 // Main driver function.
256 ACE_TMAIN (int, ACE_TCHAR *argv[])
258 ACE_Service_Config daemon (argv[0]);
260 // This Stream controls hierachically-related active objects.
261 MT_Stream stream;
263 MT_Module *pm = 0;
264 MT_Module *fm = 0;
265 MT_Module *cm = 0;
267 ACE_NEW_RETURN (cm,
268 MT_Module (ACE_TEXT ("Consumer"),
269 new Consumer),
270 -1);
271 ACE_NEW_RETURN (fm,
272 MT_Module (ACE_TEXT ("Filter"),
273 new Filter),
274 -1);
275 ACE_NEW_RETURN (pm,
276 MT_Module (ACE_TEXT ("Producer"),
277 new Producer),
278 -1);
280 // Create Consumer, Filter, and Producer Modules and push them onto
281 // the Stream. All processing is performed in the Stream.
283 if (stream.push (cm) == -1)
284 ACE_ERROR_RETURN ((LM_ERROR,
285 ACE_TEXT ("%p\n"),
286 ACE_TEXT ("push")),
288 else if (stream.push (fm) == -1)
289 ACE_ERROR_RETURN ((LM_ERROR,
290 ACE_TEXT ("%p\n"),
291 ACE_TEXT ("push")),
293 else if (stream.push (pm) == -1)
294 ACE_ERROR_RETURN ((LM_ERROR,
295 ACE_TEXT ("%p\n"),
296 ACE_TEXT ("push")),
298 // Barrier synchronization: wait for the threads to exit, then exit
299 // ourselves.
300 ACE_Thread_Manager::instance ()->wait ();
301 return 0;
303 #else
305 ACE_TMAIN (int, ACE_TCHAR *[])
307 ACE_ERROR ((LM_ERROR,
308 ACE_TEXT ("threads not supported on this platform\n")));
309 return 0;
311 #endif /* ACE_HAS_THREADS */