2 //=============================================================================
4 * @file Thread_Pool_Test.cpp
6 * This test program illustrates how the <ACE_Task>
7 * synchronization mechanisms work in conjunction with the
8 * <ACE_Thread_Manager>. If the <manual> flag is set input comes
9 * from stdin until the user enters a return -- otherwise, the
10 * input is generated automatically. All worker threads shutdown
11 * when (1) they receive a message block of length 0 or (2) the
12 * queue is deactivated.
14 * @author Karlheinz Dorn <Karlheinz.Dorn@med.siemens.de>
15 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
16 * @author and Prashant Jain <pjain@cs.wustl.edu>
18 //=============================================================================
21 #include "test_config.h"
24 #if defined (ACE_HAS_THREADS)
25 #include "ace/Lock_Adapter_T.h"
26 #include "ace/OS_NS_stdio.h"
27 #include "ace/OS_NS_string.h"
28 #include "ace/OS_NS_unistd.h"
30 // Number of iterations to run the test.
31 static size_t n_iterations
= 100;
33 // Controls whether the input is generated "manually" or automatically.
34 static int manual
= 0;
39 * @brief Defines a thread pool abstraction based on the <ACE_Task>.
41 class Thread_Pool
: public ACE_Task
<ACE_MT_SYNCH
>
44 /// Create the thread pool containing <n_threads>.
45 Thread_Pool (int n_threads
);
48 ~Thread_Pool () override
;
51 * Activate the task's thread pool, produce the messages that are
52 * consumed by the threads in the thread pool, and demonstate how to
53 * shutdown using the <ACE_Message_Queue::deactivate> method.
55 int test_queue_deactivation_shutdown ();
58 * Activate the task's thread pool, produce the messages that are,
59 * produce the messages that are consumed by the threads in the
60 * thread pool, and demonstrate how to shutdown by enqueueing
61 * "empty" messages into the queue.
63 int test_empty_message_shutdown ();
65 /// Iterate <n_iterations> time printing off a message and "waiting"
66 /// for all other threads to complete this iteration.
69 /// Allows the producer to pass messages to the <Thread_Pool>.
70 int put (ACE_Message_Block
*mb
,
71 ACE_Time_Value
*tv
= 0) override
;
74 //FUZZ: disable check_for_lack_ACE_OS
75 /// Spawn the threads in the pool.
76 int open (void * = 0) override
;
77 //FUZZ: enable check_for_lack_ACE_OS
80 int close (u_long
) override
;
82 /// Serialize access to <ACE_Message_Block> reference count, which
83 /// will be decremented by multiple threads.
84 ACE_Lock_Adapter
<ACE_Thread_Mutex
> lock_adapter_
;
86 /// Number of threads to spawn.
90 Thread_Pool::~Thread_Pool ()
95 Thread_Pool::close (u_long
)
98 ACE_TEXT ("(%t) worker thread closing down\n")));
102 Thread_Pool::Thread_Pool (int n_threads
)
103 : n_threads_ (n_threads
)
107 // Simply enqueue the Message_Block into the end of the queue.
110 Thread_Pool::put (ACE_Message_Block
*mb
,
113 return this->putq (mb
, tv
);
116 // Iterate <n_iterations> printing off a message and "waiting" for all
117 // other threads to complete this iteration.
122 // Keep looping, reading a message out of the queue, until we get a
123 // message with a length == 0, which signals us to quit.
125 for (int count
= 1; ; count
++)
127 ACE_Message_Block
*mb
= 0;
129 int result
= this->getq (mb
);
131 ACE_TEST_ASSERT (result
!= -1 || errno
== ESHUTDOWN
);
133 if (result
== -1 && errno
== ESHUTDOWN
)
135 // The queue has been deactivated, so let's bail out.
136 ACE_DEBUG ((LM_DEBUG
,
137 ACE_TEXT ("(%t) in iteration %d, queue len = %d, ")
138 ACE_TEXT ("queue deactivated, exiting\n"),
140 this->msg_queue ()->message_count ()));
145 size_t length
= mb
->length ();
148 ACE_DEBUG ((LM_DEBUG
,
149 ACE_TEXT ("(%t) in iteration %d, queue len = %d, ")
150 ACE_TEXT ("length = %d, text = \"%*s\"\n"),
152 this->msg_queue ()->message_count (),
157 // We're responsible for deallocating this.
162 ACE_DEBUG ((LM_DEBUG
,
163 ACE_TEXT ("(%t) in iteration %d, queue len = %d, ")
164 ACE_TEXT ("got \"empty\" message, exiting\n"),
166 this->msg_queue ()->message_count ()));
171 // Note that the <ACE_Task::svc_run> method automatically removes us
172 // from the <ACE_Thread_Manager> when the thread exits.
177 Thread_Pool::open (void *)
179 ACE_DEBUG ((LM_DEBUG
,
180 ACE_TEXT ("(%t) producer start, dumping the Thread_Pool\n")));
183 // Create a pool of worker threads.
184 if (this->activate (THR_NEW_LWP
,
185 this->n_threads_
) == -1)
186 ACE_ERROR_RETURN ((LM_ERROR
,
188 ACE_TEXT ("activate failed")),
193 // Activate the task's thread pool, produce the messages that are
194 // consumed by the threads in the thread pool, and demonstate how to
195 // shutdown using the <ACE_Message_Queue::deactivate> method.
198 Thread_Pool::test_queue_deactivation_shutdown ()
200 if (this->open () == -1)
203 ACE_Message_Block
*mb
= 0;
205 // Run the main loop that generates messages and enqueues them into
206 // the pool of threads managed by <ACE_Task>.
208 for (size_t count
= 0;
214 // Allocate a new message.
216 ACE_Message_Block (BUFSIZ
,
217 ACE_Message_Block::MB_DATA
,
221 &this->lock_adapter_
),
226 ACE_DEBUG ((LM_DEBUG
,
227 ACE_TEXT ("(%t) enter a new message for ")
228 ACE_TEXT ("the task pool...")));
229 n
= ACE_OS::read (ACE_STDIN
,
235 static size_t count
= 0;
237 ACE_OS::snprintf (reinterpret_cast<ACE_TCHAR
*> (mb
->wr_ptr ()), BUFSIZ
,
238 ACE_SIZE_T_FORMAT_SPECIFIER
,
240 n
= ACE_OS::strlen (mb
->rd_ptr ());
242 if (count
== n_iterations
)
243 n
= 1; // Indicate that we need to shut down.
247 if (count
== 0 || (count
% 20 == 0))
253 // Send a normal message to the waiting threads and continue
255 mb
->wr_ptr (n
* sizeof (ACE_TCHAR
));
257 // Pass the message to the Thread_Pool.
258 if (this->put (mb
) == -1)
259 ACE_ERROR ((LM_ERROR
,
260 ACE_TEXT (" (%t) %p\n"),
265 // Release the <Message_Block> since we're shutting down and
266 // don't need it anymore.
269 // Deactivate the message queue and return.
270 ACE_DEBUG ((LM_DEBUG
,
271 ACE_TEXT ("\n(%t) deactivating queue for %d threads, ")
272 ACE_TEXT ("dump of task:\n"),
273 this->thr_count ()));
276 // Deactivate the queue.
277 return this->msg_queue ()->deactivate ();
282 // Activate the task's thread pool, produce the messages that are,
283 // produce the messages that are consumed by the threads in the thread
284 // pool, and demonstrate how to shutdown by enqueueing "empty"
285 // messages into the queue.
288 Thread_Pool::test_empty_message_shutdown ()
290 if (this->open () == -1)
293 ACE_Message_Block
*mb
= 0;
295 // Run the main loop that generates messages and enqueues them into
296 // the pool of threads managed by <ACE_Task>.
298 for (size_t count
= 0;
304 // Allocate a new message.
306 ACE_Message_Block (BUFSIZ
,
307 ACE_Message_Block::MB_DATA
,
311 &this->lock_adapter_
),
316 ACE_DEBUG ((LM_DEBUG
,
317 ACE_TEXT ("(%t) enter a new message for ")
318 ACE_TEXT ("the task pool...")));
319 n
= ACE_OS::read (ACE_STDIN
,
325 static size_t count
= 0;
327 ACE_OS::snprintf (reinterpret_cast<ACE_TCHAR
*> (mb
->wr_ptr ()), BUFSIZ
,
328 ACE_SIZE_T_FORMAT_SPECIFIER
,
330 n
= ACE_OS::strlen (mb
->rd_ptr ());
332 if (count
== n_iterations
)
333 n
= 1; // Indicate that we need to shut down.
337 if (count
== 0 || (count
% 20 == 0))
343 // Send a normal message to the waiting threads and continue
345 mb
->wr_ptr (n
* sizeof (ACE_TCHAR
));
347 // Pass the message to the Thread_Pool.
348 if (this->put (mb
) == -1)
349 ACE_ERROR ((LM_ERROR
,
350 ACE_TEXT (" (%t) %p\n"),
355 // Send a shutdown message to the waiting threads and return.
356 ACE_DEBUG ((LM_DEBUG
,
357 ACE_TEXT ("\n(%t) sending shutdown message to %d threads, ")
358 ACE_TEXT ("dump of task:\n"),
359 this->thr_count ()));
364 // Enqueue an empty message to flag each consumer thread to
365 // inform it to shutdown.
366 for (i
= this->thr_count ();
370 ACE_DEBUG ((LM_DEBUG
,
371 ACE_TEXT ("(%t) end of input, ")
372 ACE_TEXT ("enqueueing \"empty\" message %d\n"),
375 // Note the use of reference counting to avoid copying
376 // the message contents.
377 ACE_Message_Block
*dup
= mb
->duplicate ();
379 if (this->put (dup
) == -1)
380 ACE_ERROR ((LM_ERROR
,
381 ACE_TEXT (" (%t) %p\n"),
387 ACE_DEBUG ((LM_DEBUG
,
388 ACE_TEXT ("\n(%t) end loop, dump of task:\n")));
396 #endif /* ACE_HAS_THREADS */
399 run_main (int, ACE_TCHAR
*[])
401 ACE_START_TEST (ACE_TEXT ("Thread_Pool_Test"));
403 #if defined (ACE_HAS_THREADS)
404 int n_threads
= ACE_MAX_THREADS
;
406 // Create the worker tasks.
407 Thread_Pool
thread_pool (n_threads
);
409 ACE_DEBUG ((LM_DEBUG
,
410 ACE_TEXT ("(%t) running test with %d threads\n"),
413 ACE_DEBUG ((LM_DEBUG
,
414 ACE_TEXT ("(%t) starting empty message shutdown test\n")));
416 // Activate the task's thread pool, produce the messages that are,
417 // produce the messages that are consumed by the threads in the
418 // thread pool, and demonstrate how to shutdown by enqueueing
419 // "empty" messages into the queue.
420 if (thread_pool
.test_empty_message_shutdown () == -1)
423 ACE_DEBUG ((LM_DEBUG
,
424 ACE_TEXT ("(%t) waiting for worker tasks to finish...\n")));
425 // Wait for all the threads to reach their exit point, at which
426 // point the barrier in the destructor of the <ACE_Task> portion of
427 // <Thread_Pool> will return.
428 if (thread_pool
.wait () == -1)
431 ACE_DEBUG ((LM_DEBUG
,
432 ACE_TEXT ("(%t) starting queue deactivation shutdown test\n")));
434 // Activate the task's thread pool, produce the messages that are
435 // consumed by the threads in the thread pool, and demonstate how to
436 // shutdown using the <ACE_Message_Queue::deactivate> method.
437 if (thread_pool
.test_queue_deactivation_shutdown () == -1)
440 ACE_DEBUG ((LM_DEBUG
,
441 ACE_TEXT ("(%t) waiting for worker tasks to finish...\n")));
442 // Wait for all the threads to reach their exit point, at which
443 // point the barrier in the destructor of the <ACE_Task> portion of
444 // <Thread_Pool> will return.
445 if (thread_pool
.wait () == -1)
448 ACE_DEBUG ((LM_DEBUG
,
449 ACE_TEXT ("(%t) all worker tasks destroyed, exiting test...\n")));
452 ACE_TEXT ("threads not supported on this platform\n")));
453 #endif /* ACE_HAS_THREADS */