Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / Threads / thread_pool.cpp
blob9ea0488c33038ee8e852f15749962ea0727578cc
1 // This test program illustrates how the <ACE_Task> synchronization
2 // mechanisms work in conjunction with the <ACE_Thread_Manager>. If
3 // the <manual> flag is set input comes from stdin until the user
4 // enters a return -- otherwise, the input is generated automatically.
5 // All worker threads shutdown when they receive a message block of
6 // length 0.
7 //
8 // This code is original based on a test program written by Karlheinz
9 // Dorn <Karlheinz.Dorn@med.siemens.de>. It was modified to utilize
10 // more ACE features by Doug Schmidt <d.schmidt@vanderbilt.edu>.
12 #include "ace/OS_NS_stdio.h"
13 #include "ace/OS_NS_string.h"
14 #include "ace/OS_NS_unistd.h"
15 #include "ace/OS_main.h"
16 #include "ace/Task.h"
17 #include "ace/Service_Config.h"
20 #if defined (ACE_HAS_THREADS)
22 // Default number of iterations to run the test.
23 static int n_iterations = 100;
25 // Controls whether the input is generated "manually" or automatically.
26 static int manual = 0;
28 class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
30 // = TITLE
31 // Defines a thread pool abstraction based on the <ACE_Task>.
32 public:
33 Thread_Pool (ACE_Thread_Manager *thr_mgr,
34 int n_threads);
35 // Constructor activates <n_threads> in the thread pool.
37 ~Thread_Pool ();
38 // Destructor...
40 virtual int svc ();
41 // Iterate <n_iterations> time printing off a message and "waiting"
42 // for all other threads to complete this iteration.
44 virtual int put (ACE_Message_Block *mb,
45 ACE_Time_Value *tv = 0);
46 // This allows the producer to pass messages to the <Thread_Pool>.
48 private:
49 //FUZZ: disable check_for_lack_ACE_OS
50 virtual int close (u_long);
51 // Close hook.
52 //FUZZ: enable check_for_lack_ACE_OS
55 int
56 Thread_Pool::close (u_long)
58 ACE_DEBUG ((LM_DEBUG,
59 "(%t) worker thread closing down\n"));
60 return 0;
63 Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr,
64 int n_threads)
65 : ACE_Task<ACE_MT_SYNCH> (thr_mgr)
67 // Create the pool of worker threads.
68 if (this->activate (THR_NEW_LWP,
69 n_threads) == -1)
70 ACE_ERROR ((LM_ERROR,
71 "%p\n",
72 "activate failed"));
75 Thread_Pool::~Thread_Pool ()
79 // Simply enqueue the Message_Block into the end of the queue.
81 int
82 Thread_Pool::put (ACE_Message_Block *mb,
83 ACE_Time_Value *tv)
85 return this->putq (mb, tv);
88 // Iterate <n_iterations> time printing off a message and "waiting"
89 // for all other threads to complete this iteration.
91 int
92 Thread_Pool::svc ()
94 // Note that the <ACE_Task::svc_run> method automatically adds us to
95 // the Thread_Manager when the thread begins.
97 int count = 1;
99 // Keep looping, reading a message out of the queue, until we get a
100 // message with a length == 0, which signals us to quit.
102 for (;; count++)
104 ACE_Message_Block *mb = 0;
106 ACE_DEBUG ((LM_DEBUG,
107 "(%t) in iteration %d before getq ()\n",
108 count));
110 if (this->getq (mb) == -1)
112 ACE_ERROR ((LM_ERROR,
113 "(%t) in iteration %d, got result -1, exiting\n",
114 count));
115 break;
118 size_t length = mb->length ();
120 if (length > 0)
121 ACE_DEBUG ((LM_DEBUG,
122 "(%t) in iteration %d, length = %d, text = \"%*s\"\n",
123 count,
124 length,
125 length - 1,
126 mb->rd_ptr ()));
128 // We're responsible for deallocating this.
129 mb->release ();
131 if (length == 0)
133 //FUZZ: disable check_for_NULL
134 ACE_DEBUG ((LM_DEBUG,
135 "(%t) in iteration %d, got NULL message, exiting\n",
136 count));
137 //FUZZ: enable check_for_NULL
139 break;
143 // Note that the <ACE_Task::svc_run> method automatically removes us
144 // from the <ACE_Thread_Manager> when the thread exits.
145 return 0;
148 static void
149 producer (Thread_Pool &thread_pool)
151 ACE_DEBUG ((LM_DEBUG,
152 "(%t) producer start, generating data for the <Thread_Pool>\n"));
153 // thread_pool.dump ();
155 for (int n; ;)
157 // Allocate a new message.
158 ACE_Message_Block *mb = 0;
159 ACE_NEW (mb,
160 ACE_Message_Block (BUFSIZ));
162 if (manual)
164 ACE_DEBUG ((LM_DEBUG,
165 "(%t) enter a new message for the task pool..."));
166 n = ACE_OS::read (ACE_STDIN,
167 mb->rd_ptr (),
168 mb->size ());
170 else
172 static int count = 0;
174 ACE_OS::sprintf (mb->rd_ptr (),
175 "%d\n",
176 count);
177 n = ACE_Utils::truncate_cast<int> (ACE_OS::strlen (mb->rd_ptr ()));
179 if (count == n_iterations)
180 n = 1; // Indicate that we need to shut down.
181 else
182 count++;
184 if (count == 0 || (count % 20 == 0))
185 ACE_OS::sleep (1);
188 if (n > 1)
190 // Send a normal message to the waiting threads and continue
191 // producing.
192 mb->wr_ptr (n);
194 // Pass the message to the Thread_Pool.
195 if (thread_pool.put (mb) == -1)
196 ACE_ERROR ((LM_ERROR,
197 " (%t) %p\n",
198 "put"));
200 else
202 ACE_DEBUG ((LM_DEBUG,
203 "\n(%t) start loop, dump of task:\n"));
204 // thread_pool.dump ();
206 // Send a shutdown message to the waiting threads and exit.
207 for (size_t i = thread_pool.thr_count (); i > 0; i--)
209 //FUZZ: disable check_for_NULL
210 ACE_DEBUG ((LM_DEBUG,
211 "(%t) EOF, enqueueing NULL block for thread = %d\n",
212 i));
213 //FUZZ: enable check_for_NULL
215 // Enqueue a NULL message to flag each consumer to
216 // shutdown.
217 ACE_Message_Block *mb = 0;
218 ACE_NEW (mb,
219 ACE_Message_Block);
220 if (thread_pool.put (mb) == -1)
221 ACE_ERROR ((LM_ERROR,
222 " (%t) %p\n",
223 "put"));
226 ACE_DEBUG ((LM_DEBUG,
227 "\n(%t) end loop\n"));
228 // thread_pool.dump ();
229 break;
235 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
237 int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS;
238 n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : n_iterations;
239 manual = argc > 3 ? 1 : 0;
241 ACE_DEBUG ((LM_DEBUG,
242 "(%t) argc = %d, threads = %d\n",
243 argc,
244 n_threads));
246 // Create the worker tasks.
247 Thread_Pool thread_pool (ACE_Thread_Manager::instance (),
248 n_threads);
250 // Create work for the worker tasks to process in their own threads.
251 producer (thread_pool);
253 ACE_DEBUG ((LM_DEBUG,
254 "(%t) waiting for threads to exit in Thread_Pool destructor...\n"));
255 // Wait for all the threads to reach their exit point.
256 if (thread_pool.wait () == -1)
257 //FUZZ: disable check_for_lack_ACE_OS
258 ACE_ERROR_RETURN ((LM_ERROR, "(%t) wait() failed\n"),
260 //FUZZ: enable check_for_lack_ACE_OS
262 ACE_DEBUG ((LM_DEBUG,
263 "(%t) destroying worker tasks and exiting...\n"));
264 return 0;
266 #else
268 ACE_TMAIN (int, ACE_TCHAR *[])
270 ACE_ERROR ((LM_ERROR,
271 "threads not supported on this platform\n"));
272 return 0;
274 #endif /* ACE_HAS_THREADS */