2 //=============================================================================
4 * @file Buffer_Stream_Test.cpp
6 * This program illustrates an implementation of the classic
7 * "bounded buffer" program using an ASX STREAM containing two
8 * Modules. Each ACE_Module contains two Tasks. Each ACE_Task
9 * contains a ACE_Message_Queue and a pointer to a
10 * ACE_Thread_Manager. Note how the use of these reusable
11 * components reduces the reliance on global variables.
13 * @author Prashant Jain <pjain@cs.wustl.edu> and Doug Schmidt <d.schmidt@vanderbilt.edu>
15 //=============================================================================
18 #include "test_config.h"
19 #include "ace/Stream.h"
20 #include "ace/Module.h"
22 #include "ace/OS_NS_string.h"
23 #include "ace/OS_NS_time.h"
26 #if defined (ACE_HAS_THREADS)
28 static const char ACE_ALPHABET
[] = "abcdefghijklmnopqrstuvwxyz";
30 using MT_Stream
= ACE_Stream
<ACE_MT_SYNCH
>;
31 using MT_Module
= ACE_Module
<ACE_MT_SYNCH
>;
32 using MT_Task
= ACE_Task
<ACE_MT_SYNCH
>;
37 * @brief Methods that are common to the Supplier and consumer.
39 class Common_Task
: public MT_Task
44 //FUZZ: disable check_for_lack_ACE_OS
46 ///FUZZ: enable check_for_lack_ACE_OS
47 int open (void * = 0) override
;
48 int close (u_long
= 0) override
;
54 * @brief Define the Supplier interface.
56 class Supplier
: public Common_Task
61 /// Read data from stdin and pass to consumer.
68 * @brief Define the Consumer interface.
70 class Consumer
: public Common_Task
75 /// Enqueue the message on the ACE_Message_Queue for subsequent
76 /// handling in the svc() method.
77 int put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
= 0) override
;
79 /// Receive message from Supplier and print to stdout.
82 /// Amount of time to wait for a timeout.
83 ACE_Time_Value timeout_
;
86 // Spawn off a new thread.
89 Common_Task::open (void *)
91 if (this->activate (THR_NEW_LWP
| THR_DETACHED
) == -1)
92 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), -1);
97 Common_Task::close (u_long exit_status
)
100 ACE_TEXT ("(%t) thread is exiting with status %d in module %s\n"),
104 // Can do anything here that is required when a thread exits, e.g.,
105 // storing thread-specific information in some other storage
110 // The Supplier reads data from the stdin stream, creates a message,
111 // and then queues the message in the message list, where it is
112 // removed by the consumer thread. A 0-sized message is enqueued when
113 // there is no more data to read. The consumer uses this as a flag to
114 // know when to exit.
119 ACE_Message_Block
*mb
= 0;
121 // Send one message for each letter of the alphabet, then send an empty
122 // message to mark the end.
123 for (const char *c
= ACE_ALPHABET
; *c
!= '\0'; c
++)
125 // Allocate a new message.
131 ACE_Message_Block (2),
133 ACE_OS::strcpy (mb
->wr_ptr (), d
);
137 if (this->put_next (mb
) == -1)
138 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"),
139 ACE_TEXT ("put_next")));
142 ACE_NEW_RETURN(mb
, ACE_Message_Block
, -1);
143 if (this->put_next (mb
) == -1)
144 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("put_next")));
150 Consumer::put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
)
152 // Simply enqueue the Message_Block into the end of the queue.
153 return this->putq (mb
, tv
);
156 // The consumer dequeues a message from the ACE_Message_Queue, writes
157 // the message to the stderr stream, and deletes the message. The
158 // Consumer sends a 0-sized message to inform the consumer to stop
164 ACE_Message_Block
*mb
= 0;
166 const char *c
= ACE_ALPHABET
;
169 // Keep looping, reading a message out of the queue, until we
170 // timeout or get a message with a length == 0, which signals us to
175 this->timeout_
.set (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds
177 result
= this->getq (mb
, &this->timeout_
);
182 size_t const length
= mb
->length ();
186 output
= mb
->rd_ptr ();
187 ACE_TEST_ASSERT (*c
== output
[0]);
196 ACE_TEST_ASSERT (result
== 0 || errno
== EWOULDBLOCK
);
200 #endif /* ACE_HAS_THREADS */
202 // Main driver function.
205 run_main (int, ACE_TCHAR
*[])
207 ACE_START_TEST (ACE_TEXT ("Buffer_Stream_Test"));
209 #if defined (ACE_HAS_THREADS)
210 // Control hierachically-related active objects.
215 // Allocate the Consumer and Supplier modules.
216 ACE_NEW_RETURN (cm
, MT_Module (ACE_TEXT ("Consumer"), new Consumer
), -1);
217 ACE_NEW_RETURN (sm
, MT_Module (ACE_TEXT ("Supplier"), new Supplier
), -1);
219 // Create Supplier and Consumer Modules and push them onto the
220 // Stream. All processing is performed in the Stream.
222 if (stream
.push (cm
) == -1)
223 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("push")), 1);
224 else if (stream
.push (sm
) == -1)
225 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("push")), 1);
227 // Barrier synchronization: wait for the threads to exit, then exit
229 ACE_Thread_Manager::instance ()->wait ();
232 ACE_TEXT ("threads not supported on this platform\n")));
233 #endif /* ACE_HAS_THREADS */