Also use Objects as part of an operation but as a result don't generate Any operation...
[ACE_TAO.git] / ACE / tests / Buffer_Stream_Test.cpp
blob5f543167bd62ed49b2a6fc91c4d46145c7c21a01
2 //=============================================================================
3 /**
4 * @file Buffer_Stream_Test.cpp
6 * This program illustrates an implementation of the classic
7 * "bounded buffer" program using an ASX STREAM containing two
8 * Modules. Each ACE_Module contains two Tasks. Each ACE_Task
9 * contains a ACE_Message_Queue and a pointer to a
10 * ACE_Thread_Manager. Note how the use of these reusable
11 * components reduces the reliance on global variables.
13 * @author Prashant Jain <pjain@cs.wustl.edu> and Doug Schmidt <d.schmidt@vanderbilt.edu>
15 //=============================================================================
18 #include "test_config.h"
19 #include "ace/Stream.h"
20 #include "ace/Module.h"
21 #include "ace/Task.h"
22 #include "ace/OS_NS_string.h"
23 #include "ace/OS_NS_time.h"
27 #if defined (ACE_HAS_THREADS)
29 static const char ACE_ALPHABET[] = "abcdefghijklmnopqrstuvwxyz";
31 typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
32 typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
33 typedef ACE_Task<ACE_MT_SYNCH> MT_Task;
35 /**
36 * @class Common_Task
38 * @brief Methods that are common to the Supplier and consumer.
40 class Common_Task : public MT_Task
42 public:
43 Common_Task (void) {}
45 //FUZZ: disable check_for_lack_ACE_OS
46 // = ACE_Task hooks.
47 ///FUZZ: enable check_for_lack_ACE_OS
48 virtual int open (void * = 0);
49 virtual int close (u_long = 0);
52 /**
53 * @class Supplier
55 * @brief Define the Supplier interface.
57 class Supplier : public Common_Task
59 public:
60 Supplier (void) {}
62 /// Read data from stdin and pass to consumer.
63 virtual int svc (void);
66 /**
67 * @class Consumer
69 * @brief Define the Consumer interface.
71 class Consumer : public Common_Task
73 public:
74 Consumer (void) {}
76 /// Enqueue the message on the ACE_Message_Queue for subsequent
77 /// handling in the svc() method.
78 virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
80 /// Receive message from Supplier and print to stdout.
81 virtual int svc (void);
82 private:
84 /// Amount of time to wait for a timeout.
85 ACE_Time_Value timeout_;
88 // Spawn off a new thread.
90 int
91 Common_Task::open (void *)
93 if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)
94 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), -1);
95 return 0;
98 int
99 Common_Task::close (u_long exit_status)
101 ACE_DEBUG ((LM_DEBUG,
102 ACE_TEXT ("(%t) thread is exiting with status %d in module %s\n"),
103 exit_status,
104 this->name ()));
106 // Can do anything here that is required when a thread exits, e.g.,
107 // storing thread-specific information in some other storage
108 // location, etc.
109 return 0;
112 // The Supplier reads data from the stdin stream, creates a message,
113 // and then queues the message in the message list, where it is
114 // removed by the consumer thread. A 0-sized message is enqueued when
115 // there is no more data to read. The consumer uses this as a flag to
116 // know when to exit.
119 Supplier::svc (void)
121 ACE_Message_Block *mb = 0;
123 // Send one message for each letter of the alphabet, then send an empty
124 // message to mark the end.
125 for (const char *c = ACE_ALPHABET; *c != '\0'; c++)
127 // Allocate a new message.
128 char d[2];
129 d[0] = *c;
130 d[1] = '\0';
132 ACE_NEW_RETURN (mb,
133 ACE_Message_Block (2),
134 -1);
135 ACE_OS::strcpy (mb->wr_ptr (), d);
137 mb->wr_ptr (2);
139 if (this->put_next (mb) == -1)
140 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
141 ACE_TEXT ("put_next")));
144 ACE_NEW_RETURN(mb, ACE_Message_Block, -1);
145 if (this->put_next (mb) == -1)
146 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("put_next")));
148 return 0;
152 Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
154 // Simply enqueue the Message_Block into the end of the queue.
155 return this->putq (mb, tv);
158 // The consumer dequeues a message from the ACE_Message_Queue, writes
159 // the message to the stderr stream, and deletes the message. The
160 // Consumer sends a 0-sized message to inform the consumer to stop
161 // reading and exit.
164 Consumer::svc (void)
166 ACE_Message_Block *mb = 0;
167 int result;
168 const char *c = ACE_ALPHABET;
169 char *output = 0;
171 // Keep looping, reading a message out of the queue, until we
172 // timeout or get a message with a length == 0, which signals us to
173 // quit.
175 for (;;)
177 this->timeout_.set (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds
179 result = this->getq (mb, &this->timeout_);
181 if (result == -1)
182 break;
184 size_t const length = mb->length ();
186 if (length > 0)
188 output = mb->rd_ptr ();
189 ACE_TEST_ASSERT (*c == output[0]);
190 c++;
192 mb->release ();
194 if (length == 0)
195 break;
198 ACE_TEST_ASSERT (result == 0 || errno == EWOULDBLOCK);
199 return 0;
202 #endif /* ACE_HAS_THREADS */
204 // Main driver function.
207 run_main (int, ACE_TCHAR *[])
209 ACE_START_TEST (ACE_TEXT ("Buffer_Stream_Test"));
211 #if defined (ACE_HAS_THREADS)
212 // Control hierachically-related active objects.
213 MT_Stream stream;
214 MT_Module *cm = 0;
215 MT_Module *sm = 0;
217 // Allocate the Consumer and Supplier modules.
218 ACE_NEW_RETURN (cm, MT_Module (ACE_TEXT ("Consumer"), new Consumer), -1);
219 ACE_NEW_RETURN (sm, MT_Module (ACE_TEXT ("Supplier"), new Supplier), -1);
221 // Create Supplier and Consumer Modules and push them onto the
222 // Stream. All processing is performed in the Stream.
224 if (stream.push (cm) == -1)
225 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("push")), 1);
226 else if (stream.push (sm) == -1)
227 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("push")), 1);
229 // Barrier synchronization: wait for the threads to exit, then exit
230 // ourselves.
231 ACE_Thread_Manager::instance ()->wait ();
232 #else
233 ACE_ERROR ((LM_INFO,
234 ACE_TEXT ("threads not supported on this platform\n")));
235 #endif /* ACE_HAS_THREADS */
236 ACE_END_TEST;
237 return 0;