Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Thread_Pool_Test.cpp
blob6e4d0a061bf9f6cbd78572960a91f75a7f075bc8
2 //=============================================================================
3 /**
4 * @file Thread_Pool_Test.cpp
6 * This test program illustrates how the <ACE_Task>
7 * synchronization mechanisms work in conjunction with the
8 * <ACE_Thread_Manager>. If the <manual> flag is set input comes
9 * from stdin until the user enters a return -- otherwise, the
10 * input is generated automatically. All worker threads shutdown
11 * when (1) they receive a message block of length 0 or (2) the
12 * queue is deactivated.
14 * @author Karlheinz Dorn <Karlheinz.Dorn@med.siemens.de>
15 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
16 * @author and Prashant Jain <pjain@cs.wustl.edu>
18 //=============================================================================
21 #include "test_config.h"
22 #include "ace/Task.h"
24 #if defined (ACE_HAS_THREADS)
25 #include "ace/Lock_Adapter_T.h"
26 #include "ace/OS_NS_stdio.h"
27 #include "ace/OS_NS_string.h"
28 #include "ace/OS_NS_unistd.h"
30 // Number of iterations to run the test.
31 static size_t n_iterations = 100;
33 // Controls whether the input is generated "manually" or automatically.
34 static int manual = 0;
36 /**
37 * @class Thread_Pool
39 * @brief Defines a thread pool abstraction based on the <ACE_Task>.
41 class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
43 public:
44 /// Create the thread pool containing <n_threads>.
45 Thread_Pool (int n_threads);
47 /// Destructor...
48 ~Thread_Pool (void);
50 /**
51 * Activate the task's thread pool, produce the messages that are
52 * consumed by the threads in the thread pool, and demonstate how to
53 * shutdown using the <ACE_Message_Queue::deactivate> method.
55 int test_queue_deactivation_shutdown (void);
57 /**
58 * Activate the task's thread pool, produce the messages that are,
59 * produce the messages that are consumed by the threads in the
60 * thread pool, and demonstrate how to shutdown by enqueueing
61 * "empty" messages into the queue.
63 int test_empty_message_shutdown (void);
65 /// Iterate <n_iterations> time printing off a message and "waiting"
66 /// for all other threads to complete this iteration.
67 virtual int svc (void);
69 /// Allows the producer to pass messages to the <Thread_Pool>.
70 virtual int put (ACE_Message_Block *mb,
71 ACE_Time_Value *tv = 0);
73 private:
74 //FUZZ: disable check_for_lack_ACE_OS
75 /// Spawn the threads in the pool.
76 virtual int open (void * = 0);
77 //FUZZ: enable check_for_lack_ACE_OS
79 /// Close hook.
80 virtual int close (u_long);
82 /// Serialize access to <ACE_Message_Block> reference count, which
83 /// will be decremented by multiple threads.
84 ACE_Lock_Adapter<ACE_Thread_Mutex> lock_adapter_;
86 /// Number of threads to spawn.
87 int n_threads_;
90 Thread_Pool::~Thread_Pool (void)
94 int
95 Thread_Pool::close (u_long)
97 ACE_DEBUG ((LM_DEBUG,
98 ACE_TEXT ("(%t) worker thread closing down\n")));
99 return 0;
102 Thread_Pool::Thread_Pool (int n_threads)
103 : n_threads_ (n_threads)
107 // Simply enqueue the Message_Block into the end of the queue.
110 Thread_Pool::put (ACE_Message_Block *mb,
111 ACE_Time_Value *tv)
113 return this->putq (mb, tv);
116 // Iterate <n_iterations> printing off a message and "waiting" for all
117 // other threads to complete this iteration.
120 Thread_Pool::svc (void)
122 // Keep looping, reading a message out of the queue, until we get a
123 // message with a length == 0, which signals us to quit.
125 for (int count = 1; ; count++)
127 ACE_Message_Block *mb = 0;
129 int result = this->getq (mb);
131 ACE_TEST_ASSERT (result != -1 || errno == ESHUTDOWN);
133 if (result == -1 && errno == ESHUTDOWN)
135 // The queue has been deactivated, so let's bail out.
136 ACE_DEBUG ((LM_DEBUG,
137 ACE_TEXT ("(%t) in iteration %d, queue len = %d, ")
138 ACE_TEXT ("queue deactivated, exiting\n"),
139 count,
140 this->msg_queue ()->message_count ()));
142 break;
145 size_t length = mb->length ();
147 if (length > 0)
148 ACE_DEBUG ((LM_DEBUG,
149 ACE_TEXT ("(%t) in iteration %d, queue len = %d, ")
150 ACE_TEXT ("length = %d, text = \"%*s\"\n"),
151 count,
152 this->msg_queue ()->message_count (),
153 length,
154 length - 1,
155 mb->rd_ptr ()));
157 // We're responsible for deallocating this.
158 mb->release ();
160 if (length == 0)
162 ACE_DEBUG ((LM_DEBUG,
163 ACE_TEXT ("(%t) in iteration %d, queue len = %d, ")
164 ACE_TEXT ("got \"empty\" message, exiting\n"),
165 count,
166 this->msg_queue ()->message_count ()));
167 break;
171 // Note that the <ACE_Task::svc_run> method automatically removes us
172 // from the <ACE_Thread_Manager> when the thread exits.
173 return 0;
177 Thread_Pool::open (void *)
179 ACE_DEBUG ((LM_DEBUG,
180 ACE_TEXT ("(%t) producer start, dumping the Thread_Pool\n")));
181 this->dump ();
183 // Create a pool of worker threads.
184 if (this->activate (THR_NEW_LWP,
185 this->n_threads_) == -1)
186 ACE_ERROR_RETURN ((LM_ERROR,
187 ACE_TEXT ("%p\n"),
188 ACE_TEXT ("activate failed")),
189 -1);
190 return 0;
193 // Activate the task's thread pool, produce the messages that are
194 // consumed by the threads in the thread pool, and demonstate how to
195 // shutdown using the <ACE_Message_Queue::deactivate> method.
198 Thread_Pool::test_queue_deactivation_shutdown (void)
200 if (this->open () == -1)
201 return -1;
203 ACE_Message_Block *mb = 0;
205 // Run the main loop that generates messages and enqueues them into
206 // the pool of threads managed by <ACE_Task>.
208 for (size_t count = 0;
210 count++)
212 ssize_t n = 0;
214 // Allocate a new message.
215 ACE_NEW_RETURN (mb,
216 ACE_Message_Block (BUFSIZ,
217 ACE_Message_Block::MB_DATA,
221 &this->lock_adapter_),
222 -1);
224 if (manual)
226 #if !defined (ACE_HAS_WINCE)
227 ACE_DEBUG ((LM_DEBUG,
228 ACE_TEXT ("(%t) enter a new message for ")
229 ACE_TEXT ("the task pool...")));
230 n = ACE_OS::read (ACE_STDIN,
231 mb->wr_ptr (),
232 mb->size ());
233 #endif // ACE_HAS_WINCE
235 else
237 static size_t count = 0;
239 ACE_OS::snprintf (reinterpret_cast<ACE_TCHAR *> (mb->wr_ptr ()), BUFSIZ,
240 ACE_SIZE_T_FORMAT_SPECIFIER,
241 count);
242 n = ACE_OS::strlen (mb->rd_ptr ());
244 if (count == n_iterations)
245 n = 1; // Indicate that we need to shut down.
246 else
247 count++;
249 if (count == 0 || (count % 20 == 0))
250 ACE_OS::sleep (1);
253 if (n > 1)
255 // Send a normal message to the waiting threads and continue
256 // producing.
257 mb->wr_ptr (n * sizeof (ACE_TCHAR));
259 // Pass the message to the Thread_Pool.
260 if (this->put (mb) == -1)
261 ACE_ERROR ((LM_ERROR,
262 ACE_TEXT (" (%t) %p\n"),
263 ACE_TEXT ("put")));
265 else
267 // Release the <Message_Block> since we're shutting down and
268 // don't need it anymore.
270 mb->release ();
271 // Deactivate the message queue and return.
272 ACE_DEBUG ((LM_DEBUG,
273 ACE_TEXT ("\n(%t) deactivating queue for %d threads, ")
274 ACE_TEXT ("dump of task:\n"),
275 this->thr_count ()));
276 this->dump ();
278 // Deactivate the queue.
279 return this->msg_queue ()->deactivate ();
284 // Activate the task's thread pool, produce the messages that are,
285 // produce the messages that are consumed by the threads in the thread
286 // pool, and demonstrate how to shutdown by enqueueing "empty"
287 // messages into the queue.
290 Thread_Pool::test_empty_message_shutdown (void)
292 if (this->open () == -1)
293 return -1;
295 ACE_Message_Block *mb = 0;
297 // Run the main loop that generates messages and enqueues them into
298 // the pool of threads managed by <ACE_Task>.
300 for (size_t count = 0;
302 count++)
304 ssize_t n = 0;
306 // Allocate a new message.
307 ACE_NEW_RETURN (mb,
308 ACE_Message_Block (BUFSIZ,
309 ACE_Message_Block::MB_DATA,
313 &this->lock_adapter_),
314 -1);
316 if (manual)
318 #if !defined (ACE_HAS_WINCE)
319 ACE_DEBUG ((LM_DEBUG,
320 ACE_TEXT ("(%t) enter a new message for ")
321 ACE_TEXT ("the task pool...")));
322 n = ACE_OS::read (ACE_STDIN,
323 mb->wr_ptr (),
324 mb->size ());
325 #endif // ACE_HAS_WINCE
327 else
329 static size_t count = 0;
331 ACE_OS::snprintf (reinterpret_cast<ACE_TCHAR *> (mb->wr_ptr ()), BUFSIZ,
332 ACE_SIZE_T_FORMAT_SPECIFIER,
333 count);
334 n = ACE_OS::strlen (mb->rd_ptr ());
336 if (count == n_iterations)
337 n = 1; // Indicate that we need to shut down.
338 else
339 count++;
341 if (count == 0 || (count % 20 == 0))
342 ACE_OS::sleep (1);
345 if (n > 1)
347 // Send a normal message to the waiting threads and continue
348 // producing.
349 mb->wr_ptr (n * sizeof (ACE_TCHAR));
351 // Pass the message to the Thread_Pool.
352 if (this->put (mb) == -1)
353 ACE_ERROR ((LM_ERROR,
354 ACE_TEXT (" (%t) %p\n"),
355 ACE_TEXT ("put")));
357 else
359 // Send a shutdown message to the waiting threads and return.
360 ACE_DEBUG ((LM_DEBUG,
361 ACE_TEXT ("\n(%t) sending shutdown message to %d threads, ")
362 ACE_TEXT ("dump of task:\n"),
363 this->thr_count ()));
364 this->dump ();
366 size_t i = 0;
368 // Enqueue an empty message to flag each consumer thread to
369 // inform it to shutdown.
370 for (i = this->thr_count ();
371 i > 0;
372 i--)
374 ACE_DEBUG ((LM_DEBUG,
375 ACE_TEXT ("(%t) end of input, ")
376 ACE_TEXT ("enqueueing \"empty\" message %d\n"),
377 i));
379 // Note the use of reference counting to avoid copying
380 // the message contents.
381 ACE_Message_Block *dup = mb->duplicate ();
383 if (this->put (dup) == -1)
384 ACE_ERROR ((LM_ERROR,
385 ACE_TEXT (" (%t) %p\n"),
386 ACE_TEXT ("put")));
389 mb->release ();
391 ACE_DEBUG ((LM_DEBUG,
392 ACE_TEXT ("\n(%t) end loop, dump of task:\n")));
393 this->dump ();
395 return 0;
400 #endif /* ACE_HAS_THREADS */
403 run_main (int, ACE_TCHAR *[])
405 ACE_START_TEST (ACE_TEXT ("Thread_Pool_Test"));
407 #if defined (ACE_HAS_THREADS)
408 int n_threads = ACE_MAX_THREADS;
410 // Create the worker tasks.
411 Thread_Pool thread_pool (n_threads);
413 ACE_DEBUG ((LM_DEBUG,
414 ACE_TEXT ("(%t) running test with %d threads\n"),
415 n_threads));
417 ACE_DEBUG ((LM_DEBUG,
418 ACE_TEXT ("(%t) starting empty message shutdown test\n")));
420 // Activate the task's thread pool, produce the messages that are,
421 // produce the messages that are consumed by the threads in the
422 // thread pool, and demonstrate how to shutdown by enqueueing
423 // "empty" messages into the queue.
424 if (thread_pool.test_empty_message_shutdown () == -1)
425 return 1;
427 ACE_DEBUG ((LM_DEBUG,
428 ACE_TEXT ("(%t) waiting for worker tasks to finish...\n")));
429 // Wait for all the threads to reach their exit point, at which
430 // point the barrier in the destructor of the <ACE_Task> portion of
431 // <Thread_Pool> will return.
432 if (thread_pool.wait () == -1)
433 return 1;
435 ACE_DEBUG ((LM_DEBUG,
436 ACE_TEXT ("(%t) starting queue deactivation shutdown test\n")));
438 // Activate the task's thread pool, produce the messages that are
439 // consumed by the threads in the thread pool, and demonstate how to
440 // shutdown using the <ACE_Message_Queue::deactivate> method.
441 if (thread_pool.test_queue_deactivation_shutdown () == -1)
442 return 1;
444 ACE_DEBUG ((LM_DEBUG,
445 ACE_TEXT ("(%t) waiting for worker tasks to finish...\n")));
446 // Wait for all the threads to reach their exit point, at which
447 // point the barrier in the destructor of the <ACE_Task> portion of
448 // <Thread_Pool> will return.
449 if (thread_pool.wait () == -1)
450 return 1;
452 ACE_DEBUG ((LM_DEBUG,
453 ACE_TEXT ("(%t) all worker tasks destroyed, exiting test...\n")));
454 #else
455 ACE_ERROR ((LM_INFO,
456 ACE_TEXT ("threads not supported on this platform\n")));
457 #endif /* ACE_HAS_THREADS */
458 ACE_END_TEST;
459 return 0;