1 // This short program copies stdin to stdout via the use of an ASX
2 // Stream. It illustrates an implementation of the classic "bounded
3 // buffer" program using an ASX Stream containing two Modules. Each
4 // ACE_Module contains two Tasks. Each ACE_Task contains a
5 // ACE_Message_Queue and a pointer to a ACE_Thread_Manager. Note how
6 // the use of these reusable components reduces the reliance on global
7 // variables, as compared with the bounded_buffer.C example.
9 #include "ace/OS_main.h"
10 #include "ace/OS_NS_stdio.h"
11 #include "ace/OS_NS_string.h"
12 #include "ace/OS_NS_time.h"
13 #include "ace/OS_NS_unistd.h"
14 #include "ace/Service_Config.h"
15 #include "ace/Stream.h"
16 #include "ace/Module.h"
18 #include "ace/Truncate.h"
21 #if defined (ACE_HAS_THREADS)
23 typedef ACE_Stream
<ACE_MT_SYNCH
> MT_Stream
;
24 typedef ACE_Module
<ACE_MT_SYNCH
> MT_Module
;
25 typedef ACE_Task
<ACE_MT_SYNCH
> MT_Task
;
27 class Common_Task
: public MT_Task
29 // Methods that are common to the producer and consumer.
34 //FUZZ: disable check_for_lack_ACE_OS
36 virtual int open (void * = 0);
37 virtual int close (u_long
= 0);
38 //FUZZ: enable check_for_lack_ACE_OS
41 // Define the Producer interface.
43 class Producer
: public Common_Task
48 // Read data from stdin and pass to consumer.
52 class Consumer
: public Common_Task
54 // Define the Consumer interface.
59 virtual int put (ACE_Message_Block
*mb
,
60 ACE_Time_Value
*tv
= 0);
61 // Enqueue the message on the ACE_Message_Queue for subsequent
62 // handling in the svc() method.
65 // Receive message from producer and print to stdout.
68 ACE_Time_Value timeout_
;
71 class Filter
: public MT_Task
73 // Defines a Filter that prepends a line number in front of each
77 Filter (): count_ (1) {}
79 virtual int put (ACE_Message_Block
*mb
,
80 ACE_Time_Value
*tv
= 0);
81 // Change the size of the message before passing it downstream.
85 // Count the number of lines passing through the filter.
88 // Spawn off a new thread.
91 Common_Task::open (void *)
93 if (this->activate (THR_NEW_LWP
| THR_DETACHED
) == -1)
94 ACE_ERROR_RETURN ((LM_ERROR
,
102 Common_Task::close (u_long exit_status
)
104 ACE_DEBUG ((LM_DEBUG
,
105 ACE_TEXT ("(%t) thread is exiting with status %d in module %s\n"),
109 // Can do anything here that is required when a thread exits, e.g.,
110 // storing thread-specific information in some other storage
115 // The Consumer reads data from the stdin stream, creates a message,
116 // and then queues the message in the message list, where it is
117 // removed by the consumer thread. A 0-sized message is enqueued when
118 // there is no more data to read. The consumer uses this as a flag to
119 // know when to exit.
124 // Keep reading stdin, until we reach EOF.
128 // Allocate a new message (add one to avoid nasty boundary
131 ACE_Message_Block
*mb
= 0;
134 ACE_Message_Block (BUFSIZ
+ 1),
137 n
= ACE_OS::read (ACE_STDIN
, mb
->wr_ptr (), BUFSIZ
);
141 // Send a shutdown message to the other thread and exit.
144 if (this->put_next (mb
) == -1)
145 ACE_ERROR ((LM_ERROR
,
146 ACE_TEXT ("(%t) %p\n"),
147 ACE_TEXT ("put_next")));
151 // Send the message to the other thread.
155 // NUL-terminate the string (since we use strlen() on it
157 mb
->rd_ptr ()[n
] = '\0';
159 if (this->put_next (mb
) == -1)
160 ACE_ERROR ((LM_ERROR
,
161 ACE_TEXT ("(%t) %p\n"),
162 ACE_TEXT ("put_next")));
169 // Simply enqueue the Message_Block into the end of the queue.
172 Consumer::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
174 return this->putq (mb
, tv
);
177 // The consumer dequeues a message from the ACE_Message_Queue, writes
178 // the message to the stderr stream, and deletes the message. The
179 // Consumer sends a 0-sized message to inform the consumer to stop
187 // Keep looping, reading a message out of the queue, until we
188 // timeout or get a message with a length == 0, which signals us to
193 ACE_Message_Block
*mb
= 0;
195 // Wait for upto 4 seconds.
196 this->timeout_
.sec (ACE_OS::time (0) + 4);
198 result
= this->getq (mb
, &this->timeout_
);
203 int length
= ACE_Utils::truncate_cast
<int> (mb
->length ());
206 ACE_OS::write (ACE_STDOUT
,
208 ACE_OS::strlen (mb
->rd_ptr ()));
216 if (result
== -1 && errno
== EWOULDBLOCK
)
217 ACE_ERROR ((LM_ERROR
,
218 ACE_TEXT ("(%t) %p\n%a"),
219 ACE_TEXT ("timed out waiting for message"),
225 Filter::put (ACE_Message_Block
*mb
,
228 if (mb
->length () == 0)
229 return this->put_next (mb
, tv
);
234 // Stash a copy of the buffer away.
235 ACE_OS::strncpy (buf
, mb
->rd_ptr (), sizeof buf
);
237 // Increase the size of the buffer large enough that it will be
238 // reallocated (in order to test the reallocation mechanisms).
240 mb
->size (mb
->length () + BUFSIZ
);
241 mb
->length (mb
->size ());
243 // Prepend the line count in front of the buffer.
244 ACE_OS::sprintf (mb
->rd_ptr (),
245 ACE_SIZE_T_FORMAT_SPECIFIER
249 return this->put_next (mb
, tv
);
253 // Main driver function.
256 ACE_TMAIN (int, ACE_TCHAR
*argv
[])
258 ACE_Service_Config
daemon (argv
[0]);
260 // This Stream controls hierachically-related active objects.
268 MT_Module (ACE_TEXT ("Consumer"),
272 MT_Module (ACE_TEXT ("Filter"),
276 MT_Module (ACE_TEXT ("Producer"),
280 // Create Consumer, Filter, and Producer Modules and push them onto
281 // the Stream. All processing is performed in the Stream.
283 if (stream
.push (cm
) == -1)
284 ACE_ERROR_RETURN ((LM_ERROR
,
288 else if (stream
.push (fm
) == -1)
289 ACE_ERROR_RETURN ((LM_ERROR
,
293 else if (stream
.push (pm
) == -1)
294 ACE_ERROR_RETURN ((LM_ERROR
,
298 // Barrier synchronization: wait for the threads to exit, then exit
300 ACE_Thread_Manager::instance ()->wait ();
305 ACE_TMAIN (int, ACE_TCHAR
*[])
307 ACE_ERROR ((LM_ERROR
,
308 ACE_TEXT ("threads not supported on this platform\n")));
311 #endif /* ACE_HAS_THREADS */