1 // This test program illustrates how the ACE task workers/barrier
2 // synchronization mechanisms work in conjunction with the ACE_Task
3 // and the ACE_Thread_Manager. The manual flag not set simulates user
4 // input, if set input comes from stdin until RETURN only is entered
5 // which stops all workers via a message block of length 0. This is an
6 // alernative shutdown of workers compared to queue deactivate. The
7 // delay_put flag simulates a delay between the shutdown puts. All
8 // should work with this flag disabled! The BARRIER_TYPE is supposed
9 // to enable/disable barrier sync on each svc a worker has done.
11 #include "ace/OS_NS_string.h"
12 #include "ace/OS_NS_unistd.h"
13 #include "ace/OS_main.h"
15 #include "ace/Service_Config.h"
18 #if defined (ACE_HAS_THREADS)
20 #include "ace/Null_Barrier.h"
21 #define BARRIER_TYPE ACE_Null_Barrier
23 template <class BARRIER
>
24 class Worker_Task
: public ACE_Task
<ACE_MT_SYNCH
>
27 Worker_Task (ACE_Thread_Manager
*thr_mgr
,
29 int inp_serialize
= 1);
31 virtual int producer ();
32 // produce input for workers
34 virtual int input (ACE_Message_Block
*mb
);
35 // Fill one message block via a certain input strategy.
37 virtual int output (ACE_Message_Block
*mb
);
38 // Forward one message block via a certain output strategy to the
41 virtual int service (ACE_Message_Block
*mb
, int iter
);
42 // Perform one message block dependant service.
45 virtual int put (ACE_Message_Block
*mb
, ACE_Time_Value
*tv
=0);
48 // Iterate <n_iterations> time printing off a message and "waiting"
49 // for all other threads to complete this iteration.
51 //FUZZ: disable check_for_lack_ACE_OS
52 // = Not needed for this test.
53 virtual int open (void *) { return 0; }
54 virtual int close (u_long
)
56 //FUZZ: enable check_for_lack_ACE_OS
59 "(%t) in close of worker\n"));
64 // Number of worker threads to run.
71 template <class BARRIER
>
72 Worker_Task
<BARRIER
>::Worker_Task (ACE_Thread_Manager
*thr_mgr
,
75 : ACE_Task
<ACE_MT_SYNCH
> (thr_mgr
),
80 // Create worker threads.
81 inp_serialize_
= inp_serialize
;
83 // Use the task's message queue for serialization (default) or run
84 // service in the context of the caller thread.
86 if (nt_
> 0 && inp_serialize
== 1)
87 if (this->activate (THR_NEW_LWP
, n_threads
) == -1)
93 // Simply enqueue the Message_Block into the end of the queue.
95 template <class BARRIER
> int
96 Worker_Task
<BARRIER
>::put (ACE_Message_Block
*mb
,
101 if (this->inp_serialize_
)
102 result
= this->putq (mb
, tv
);
106 result
= this->service (mb
, iter
++);
108 if (this->output (mb
) < 0)
109 ACE_DEBUG ((LM_DEBUG
,
110 "(%t) output not connected!\n"));
117 template <class BARRIER
> int
118 Worker_Task
<BARRIER
>::service (ACE_Message_Block
*mb
,
121 size_t length
= mb
->length ();
125 ACE_DEBUG ((LM_DEBUG
,
126 "(%t) in iteration %d len=%d text got:\n",
129 ACE_OS::write (ACE_STDOUT
,
132 ACE_DEBUG ((LM_DEBUG
,
138 // Iterate <n_iterations> time printing off a message and "waiting"
139 // for all other threads to complete this iteration.
141 template <class BARRIER
> int
142 Worker_Task
<BARRIER
>::svc ()
144 // Note that the <ACE_Task::svc_run> method automatically adds us to
145 // the Thread_Manager when the thread begins.
147 // Keep looping, reading a message out of the queue, until we get a
148 // message with a length == 0, which signals us to quit.
150 for (int iter
= 1; ;iter
++)
152 ACE_Message_Block
*mb
= 0;
154 int result
= this->getq (mb
);
158 ACE_ERROR ((LM_ERROR
,
159 "(%t) in iteration %d\n",
160 "error waiting for message in iteration",
165 size_t length
= mb
->length ();
166 this->service (mb
,iter
);
170 ACE_DEBUG ((LM_DEBUG
,
171 "(%t) in iteration %d got quit, exit!\n",
177 this->barrier_
.wait ();
183 // Note that the <ACE_Task::svc_run> method automatically removes us
184 // from the Thread_Manager when the thread exits.
189 template <class BARRIER
> int
190 Worker_Task
<BARRIER
>::producer ()
192 // Keep reading stdin, until we reach EOF.
196 // Allocate a new message.
197 ACE_Message_Block
*mb
= 0;
200 ACE_Message_Block (BUFSIZ
),
203 if (this->input (mb
) == -1)
207 ACE_NOTREACHED (return 0);
210 template <class BARRIER
> int
211 Worker_Task
<BARRIER
>::output (ACE_Message_Block
*mb
)
213 return this->put_next (mb
);
216 template <class BARRIER
> int
217 Worker_Task
<BARRIER
>::input (ACE_Message_Block
*mb
)
219 ACE_Message_Block
*mb1
;
221 #if !defined (manual)
223 char str
[] = "kalle";
224 ACE_OS::strcpy (mb
->rd_ptr (), str
);
226 size_t n
= ACE_OS::strlen (str
);
232 if (l
== 0 || (l
% 100 == 0))
236 ACE_DEBUG ((LM_DEBUG
,
237 "(%t) press chars and enter to put a new message into task queue ...\n"));
238 n
= ACE_OS::read (ACE_STDIN
,
244 // Send a shutdown message to the waiting threads and exit.
245 // cout << "\nvor loop, dump of task msg queue:\n" << endl;
246 // this->msg_queue ()->dump ();
248 for (int i
= 0; i
< nt_
; i
++)
250 ACE_DEBUG ((LM_DEBUG
,
251 "(%t) eof, sending block for thread=%d\n",
255 ACE_Message_Block (2),
259 if (this->put (mb1
) == -1)
260 ACE_ERROR ((LM_ERROR
,
263 #if defined (delay_put)
264 // this sleep helps to shutdown correctly -> was an error!
266 #endif /* delay_put */
272 // Send a normal message to the waiting threads and continue
276 if (this->put (mb
) == -1)
277 ACE_ERROR ((LM_ERROR
,
285 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
287 int n_threads
= argc
> 1 ? ACE_OS::atoi (argv
[1]) : ACE_DEFAULT_THREADS
;
289 ACE_DEBUG ((LM_DEBUG
,
290 "(%t) worker threads running=%d\n",
293 Worker_Task
<BARRIER_TYPE
> worker_task (ACE_Thread_Manager::instance (),
296 worker_task
.producer ();
298 // Wait for all the threads to reach their exit point.
299 ACE_DEBUG ((LM_DEBUG
,
300 "(%t) waiting with thread manager ...\n"));
302 ACE_Thread_Manager::instance ()->wait ();
304 ACE_DEBUG ((LM_DEBUG
,
305 "(%t) done correct!\n"));
311 ACE_TMAIN (int, ACE_TCHAR
*[])
313 ACE_ERROR ((LM_ERROR
, "threads not supported on this platform\n"));
316 #endif /* ACE_HAS_THREADS */