Changes to attempt to silence bcc64x
[ACE_TAO.git] / ACE / examples / Threads / barrier2.cpp
blob112590a639271dcd99052427622ee1868cb4414d
1 // This test program illustrates how the ACE task workers/barrier
2 // synchronization mechanisms work in conjunction with the ACE_Task
3 // and the ACE_Thread_Manager. The manual flag not set simulates user
4 // input, if set input comes from stdin until RETURN only is entered
5 // which stops all workers via a message block of length 0. This is an
6 // alernative shutdown of workers compared to queue deactivate. The
7 // delay_put flag simulates a delay between the shutdown puts. All
8 // should work with this flag disabled! The BARRIER_TYPE is supposed
9 // to enable/disable barrier sync on each svc a worker has done.
11 #include "ace/OS_NS_string.h"
12 #include "ace/OS_NS_unistd.h"
13 #include "ace/OS_main.h"
14 #include "ace/Task.h"
15 #include "ace/Service_Config.h"
18 #if defined (ACE_HAS_THREADS)
20 #include "ace/Null_Barrier.h"
21 #define BARRIER_TYPE ACE_Null_Barrier
23 template <class BARRIER>
24 class Worker_Task : public ACE_Task<ACE_MT_SYNCH>
26 public:
27 Worker_Task (ACE_Thread_Manager *thr_mgr,
28 int n_threads,
29 int inp_serialize = 1);
31 virtual int producer ();
32 // produce input for workers
34 virtual int input (ACE_Message_Block *mb);
35 // Fill one message block via a certain input strategy.
37 virtual int output (ACE_Message_Block *mb);
38 // Forward one message block via a certain output strategy to the
39 // next task if any.
41 virtual int service (ACE_Message_Block *mb, int iter);
42 // Perform one message block dependant service.
44 private:
45 virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0);
47 virtual int svc ();
48 // Iterate <n_iterations> time printing off a message and "waiting"
49 // for all other threads to complete this iteration.
51 //FUZZ: disable check_for_lack_ACE_OS
52 // = Not needed for this test.
53 virtual int open (void *) { return 0; }
54 virtual int close (u_long)
56 //FUZZ: enable check_for_lack_ACE_OS
58 ACE_DEBUG ((LM_DEBUG,
59 "(%t) in close of worker\n"));
60 return 0;
63 int nt_;
64 // Number of worker threads to run.
66 int inp_serialize_;
68 BARRIER barrier_;
71 template <class BARRIER>
72 Worker_Task<BARRIER>::Worker_Task (ACE_Thread_Manager *thr_mgr,
73 int n_threads,
74 int inp_serialize)
75 : ACE_Task<ACE_MT_SYNCH> (thr_mgr),
76 barrier_ (n_threads)
78 nt_ = n_threads;
80 // Create worker threads.
81 inp_serialize_ = inp_serialize;
83 // Use the task's message queue for serialization (default) or run
84 // service in the context of the caller thread.
86 if (nt_ > 0 && inp_serialize == 1)
87 if (this->activate (THR_NEW_LWP, n_threads) == -1)
88 ACE_ERROR ((LM_ERROR,
89 "%p\n",
90 "activate failed"));
93 // Simply enqueue the Message_Block into the end of the queue.
95 template <class BARRIER> int
96 Worker_Task<BARRIER>::put (ACE_Message_Block *mb,
97 ACE_Time_Value *tv)
99 int result;
101 if (this->inp_serialize_)
102 result = this->putq (mb, tv);
103 else
105 static int iter = 0;
106 result = this->service (mb, iter++);
108 if (this->output (mb) < 0)
109 ACE_DEBUG ((LM_DEBUG,
110 "(%t) output not connected!\n"));
112 mb->release ();
114 return result;
117 template <class BARRIER> int
118 Worker_Task<BARRIER>::service (ACE_Message_Block *mb,
119 int iter)
121 size_t length = mb->length ();
123 if (length > 0)
125 ACE_DEBUG ((LM_DEBUG,
126 "(%t) in iteration %d len=%d text got:\n",
127 iter,
128 length));
129 ACE_OS::write (ACE_STDOUT,
130 mb->rd_ptr (),
131 length);
132 ACE_DEBUG ((LM_DEBUG,
133 "\n"));
135 return 0;
138 // Iterate <n_iterations> time printing off a message and "waiting"
139 // for all other threads to complete this iteration.
141 template <class BARRIER> int
142 Worker_Task<BARRIER>::svc ()
144 // Note that the <ACE_Task::svc_run> method automatically adds us to
145 // the Thread_Manager when the thread begins.
147 // Keep looping, reading a message out of the queue, until we get a
148 // message with a length == 0, which signals us to quit.
150 for (int iter = 1; ;iter++)
152 ACE_Message_Block *mb = 0;
154 int result = this->getq (mb);
156 if (result == -1)
158 ACE_ERROR ((LM_ERROR,
159 "(%t) in iteration %d\n",
160 "error waiting for message in iteration",
161 iter));
162 break;
165 size_t length = mb->length ();
166 this->service (mb,iter);
168 if (length == 0)
170 ACE_DEBUG ((LM_DEBUG,
171 "(%t) in iteration %d got quit, exit!\n",
172 iter));
173 mb->release ();
174 break;
177 this->barrier_.wait ();
178 this->output (mb);
180 mb->release ();
183 // Note that the <ACE_Task::svc_run> method automatically removes us
184 // from the Thread_Manager when the thread exits.
186 return 0;
189 template <class BARRIER> int
190 Worker_Task<BARRIER>::producer ()
192 // Keep reading stdin, until we reach EOF.
194 for (;;)
196 // Allocate a new message.
197 ACE_Message_Block *mb = 0;
199 ACE_NEW_RETURN (mb,
200 ACE_Message_Block (BUFSIZ),
201 -1);
203 if (this->input (mb) == -1)
204 return -1;
207 ACE_NOTREACHED (return 0);
210 template <class BARRIER> int
211 Worker_Task<BARRIER>::output (ACE_Message_Block *mb)
213 return this->put_next (mb);
216 template <class BARRIER> int
217 Worker_Task<BARRIER>::input (ACE_Message_Block *mb)
219 ACE_Message_Block *mb1;
221 #if !defined (manual)
222 static int l = 0;
223 char str[] = "kalle";
224 ACE_OS::strcpy (mb->rd_ptr (), str);
226 size_t n = ACE_OS::strlen (str);
228 if (l == 1000)
229 n = 1;
230 l++;
232 if (l == 0 || (l % 100 == 0))
233 ACE_OS::sleep (5);
234 if (n <= 1)
235 #else
236 ACE_DEBUG ((LM_DEBUG,
237 "(%t) press chars and enter to put a new message into task queue ...\n"));
238 n = ACE_OS::read (ACE_STDIN,
239 mb->rd_ptr (),
240 mb->size ());
241 if (n <= 1)
242 #endif /* manual */
244 // Send a shutdown message to the waiting threads and exit.
245 // cout << "\nvor loop, dump of task msg queue:\n" << endl;
246 // this->msg_queue ()->dump ();
248 for (int i = 0; i < nt_; i++)
250 ACE_DEBUG ((LM_DEBUG,
251 "(%t) eof, sending block for thread=%d\n",
252 i + 1));
254 ACE_NEW_RETURN (mb1,
255 ACE_Message_Block (2),
256 -1);
257 mb1->length (0);
259 if (this->put (mb1) == -1)
260 ACE_ERROR ((LM_ERROR,
261 "(%t) %p\n",
262 "put"));
263 #if defined (delay_put)
264 // this sleep helps to shutdown correctly -> was an error!
265 ACE_OS::sleep (1);
266 #endif /* delay_put */
268 return -1;
270 else
272 // Send a normal message to the waiting threads and continue
273 // producing.
274 mb->wr_ptr (n);
276 if (this->put (mb) == -1)
277 ACE_ERROR ((LM_ERROR,
278 "(%t) %p\n",
279 "put"));
281 return 0;
285 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
287 int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS;
289 ACE_DEBUG ((LM_DEBUG,
290 "(%t) worker threads running=%d\n",
291 n_threads));
293 Worker_Task<BARRIER_TYPE> worker_task (ACE_Thread_Manager::instance (),
294 /* n_threads */ 0,
296 worker_task.producer ();
298 // Wait for all the threads to reach their exit point.
299 ACE_DEBUG ((LM_DEBUG,
300 "(%t) waiting with thread manager ...\n"));
302 ACE_Thread_Manager::instance ()->wait ();
304 ACE_DEBUG ((LM_DEBUG,
305 "(%t) done correct!\n"));
306 return 0;
309 #else
311 ACE_TMAIN (int, ACE_TCHAR *[])
313 ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
314 return 0;
316 #endif /* ACE_HAS_THREADS */