1 // This test program illustrates how the <ACE_Task> synchronization
2 // mechanisms work in conjunction with the <ACE_Thread_Manager>. If
3 // the <manual> flag is set input comes from stdin until the user
4 // enters a return -- otherwise, the input is generated automatically.
5 // All worker threads shutdown when they receive a message block of
8 // This code is original based on a test program written by Karlheinz
9 // Dorn <Karlheinz.Dorn@med.siemens.de>. It was modified to utilize
10 // more ACE features by Doug Schmidt <d.schmidt@vanderbilt.edu>.
12 #include "ace/OS_NS_stdio.h"
13 #include "ace/OS_NS_string.h"
14 #include "ace/OS_NS_unistd.h"
15 #include "ace/OS_main.h"
17 #include "ace/Service_Config.h"
20 #if defined (ACE_HAS_THREADS)
22 // Default number of iterations to run the test.
23 static int n_iterations
= 100;
25 // Controls whether the input is generated "manually" or automatically.
26 static int manual
= 0;
28 class Thread_Pool
: public ACE_Task
<ACE_MT_SYNCH
>
31 // Defines a thread pool abstraction based on the <ACE_Task>.
33 Thread_Pool (ACE_Thread_Manager
*thr_mgr
,
35 // Constructor activates <n_threads> in the thread pool.
41 // Iterate <n_iterations> time printing off a message and "waiting"
42 // for all other threads to complete this iteration.
44 virtual int put (ACE_Message_Block
*mb
,
45 ACE_Time_Value
*tv
= 0);
46 // This allows the producer to pass messages to the <Thread_Pool>.
49 //FUZZ: disable check_for_lack_ACE_OS
50 virtual int close (u_long
);
52 //FUZZ: enable check_for_lack_ACE_OS
56 Thread_Pool::close (u_long
)
59 "(%t) worker thread closing down\n"));
63 Thread_Pool::Thread_Pool (ACE_Thread_Manager
*thr_mgr
,
65 : ACE_Task
<ACE_MT_SYNCH
> (thr_mgr
)
67 // Create the pool of worker threads.
68 if (this->activate (THR_NEW_LWP
,
75 Thread_Pool::~Thread_Pool ()
79 // Simply enqueue the Message_Block into the end of the queue.
82 Thread_Pool::put (ACE_Message_Block
*mb
,
85 return this->putq (mb
, tv
);
88 // Iterate <n_iterations> time printing off a message and "waiting"
89 // for all other threads to complete this iteration.
94 // Note that the <ACE_Task::svc_run> method automatically adds us to
95 // the Thread_Manager when the thread begins.
99 // Keep looping, reading a message out of the queue, until we get a
100 // message with a length == 0, which signals us to quit.
104 ACE_Message_Block
*mb
= 0;
106 ACE_DEBUG ((LM_DEBUG
,
107 "(%t) in iteration %d before getq ()\n",
110 if (this->getq (mb
) == -1)
112 ACE_ERROR ((LM_ERROR
,
113 "(%t) in iteration %d, got result -1, exiting\n",
118 size_t length
= mb
->length ();
121 ACE_DEBUG ((LM_DEBUG
,
122 "(%t) in iteration %d, length = %d, text = \"%*s\"\n",
128 // We're responsible for deallocating this.
133 //FUZZ: disable check_for_NULL
134 ACE_DEBUG ((LM_DEBUG
,
135 "(%t) in iteration %d, got NULL message, exiting\n",
137 //FUZZ: enable check_for_NULL
143 // Note that the <ACE_Task::svc_run> method automatically removes us
144 // from the <ACE_Thread_Manager> when the thread exits.
149 producer (Thread_Pool
&thread_pool
)
151 ACE_DEBUG ((LM_DEBUG
,
152 "(%t) producer start, generating data for the <Thread_Pool>\n"));
153 // thread_pool.dump ();
157 // Allocate a new message.
158 ACE_Message_Block
*mb
= 0;
160 ACE_Message_Block (BUFSIZ
));
164 ACE_DEBUG ((LM_DEBUG
,
165 "(%t) enter a new message for the task pool..."));
166 n
= ACE_OS::read (ACE_STDIN
,
172 static int count
= 0;
174 ACE_OS::sprintf (mb
->rd_ptr (),
177 n
= ACE_Utils::truncate_cast
<int> (ACE_OS::strlen (mb
->rd_ptr ()));
179 if (count
== n_iterations
)
180 n
= 1; // Indicate that we need to shut down.
184 if (count
== 0 || (count
% 20 == 0))
190 // Send a normal message to the waiting threads and continue
194 // Pass the message to the Thread_Pool.
195 if (thread_pool
.put (mb
) == -1)
196 ACE_ERROR ((LM_ERROR
,
202 ACE_DEBUG ((LM_DEBUG
,
203 "\n(%t) start loop, dump of task:\n"));
204 // thread_pool.dump ();
206 // Send a shutdown message to the waiting threads and exit.
207 for (size_t i
= thread_pool
.thr_count (); i
> 0; i
--)
209 //FUZZ: disable check_for_NULL
210 ACE_DEBUG ((LM_DEBUG
,
211 "(%t) EOF, enqueueing NULL block for thread = %d\n",
213 //FUZZ: enable check_for_NULL
215 // Enqueue a NULL message to flag each consumer to
217 ACE_Message_Block
*mb
= 0;
220 if (thread_pool
.put (mb
) == -1)
221 ACE_ERROR ((LM_ERROR
,
226 ACE_DEBUG ((LM_DEBUG
,
227 "\n(%t) end loop\n"));
228 // thread_pool.dump ();
235 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
237 int n_threads
= argc
> 1 ? ACE_OS::atoi (argv
[1]) : ACE_DEFAULT_THREADS
;
238 n_iterations
= argc
> 2 ? ACE_OS::atoi (argv
[2]) : n_iterations
;
239 manual
= argc
> 3 ? 1 : 0;
241 ACE_DEBUG ((LM_DEBUG
,
242 "(%t) argc = %d, threads = %d\n",
246 // Create the worker tasks.
247 Thread_Pool
thread_pool (ACE_Thread_Manager::instance (),
250 // Create work for the worker tasks to process in their own threads.
251 producer (thread_pool
);
253 ACE_DEBUG ((LM_DEBUG
,
254 "(%t) waiting for threads to exit in Thread_Pool destructor...\n"));
255 // Wait for all the threads to reach their exit point.
256 if (thread_pool
.wait () == -1)
257 //FUZZ: disable check_for_lack_ACE_OS
258 ACE_ERROR_RETURN ((LM_ERROR
, "(%t) wait() failed\n"),
260 //FUZZ: enable check_for_lack_ACE_OS
262 ACE_DEBUG ((LM_DEBUG
,
263 "(%t) destroying worker tasks and exiting...\n"));
268 ACE_TMAIN (int, ACE_TCHAR
*[])
270 ACE_ERROR ((LM_ERROR
,
271 "threads not supported on this platform\n"));
274 #endif /* ACE_HAS_THREADS */