Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / APG / ThreadPools / ThreadPool.cpp
blobba3b0d711ec8b03a9e6499687f4afbf4d6c9b865
1 #include "ace/config-lite.h"
2 #if defined (ACE_HAS_THREADS)
4 #include "ace/OS_NS_string.h"
5 #include "ace/OS_NS_time.h"
6 #include "ace/Task.h"
7 #include "ace/Containers.h"
8 #include "ace/Synch.h"
9 #include "ace/SString.h"
10 #include "ace/Method_Request.h"
11 #include "ace/Future.h"
12 #include "ace/Activation_Queue.h"
13 #include "ace/Condition_T.h"
15 class Worker;
17 class IManager
19 public:
20 virtual ~IManager () { }
22 virtual int return_to_work (Worker *worker) = 0;
25 // Listing 2 code/ch16
26 class Worker : public ACE_Task<ACE_MT_SYNCH>
28 public:
29 Worker (IManager *manager) : manager_(manager) { }
31 virtual int svc ()
33 ACE_Thread_ID id;
34 thread_id_ = id;
35 while (1)
37 ACE_Message_Block *mb = 0;
38 if (this->getq (mb) == -1)
39 ACE_ERROR_BREAK
40 ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")));
41 if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
43 ACE_DEBUG ((LM_INFO,
44 ACE_TEXT ("(%t) Shutting down\n")));
45 mb->release ();
46 break;
48 // Process the message.
49 process_message (mb);
50 // Return to work.
51 this->manager_->return_to_work (this);
54 return 0;
56 // Listing 2
58 const ACE_Thread_ID& thread_id ()
60 return this->thread_id_;
63 private:
64 void process_message (ACE_Message_Block *mb)
66 ACE_TRACE ("Worker::process_message");
67 int msgId;
68 ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
69 mb->release ();
71 ACE_DEBUG ((LM_DEBUG,
72 ACE_TEXT ("(%t) Started processing message %d\n"),
73 msgId));
74 ACE_OS::sleep (3);
75 ACE_DEBUG ((LM_DEBUG,
76 ACE_TEXT ("(%t) Finished processing message %d\n"),
77 msgId));
80 IManager *manager_;
81 ACE_Thread_ID thread_id_;
84 // Listing 1 code/ch16
85 class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager
87 public:
88 enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
90 Manager ()
91 : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
93 ACE_TRACE ("Manager::Manager");
96 int svc ()
98 ACE_TRACE ("Manager::svc");
100 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
102 // Create pool.
103 create_worker_pool ();
105 while (!done ())
107 ACE_Message_Block *mb = 0;
108 ACE_Time_Value tv ((long)MAX_TIMEOUT);
109 tv += ACE_OS::time (0);
111 // Get a message request.
112 if (this->getq (mb, &tv) < 0)
114 shut_down ();
115 break;
118 // Choose a worker.
119 Worker *worker = 0;
121 ACE_GUARD_RETURN (ACE_Thread_Mutex,
122 worker_mon, this->workers_lock_, -1);
124 while (this->workers_.is_empty ())
125 workers_cond_.wait ();
127 this->workers_.dequeue_head (worker);
130 // Ask the worker to do the job.
131 worker->putq (mb);
134 return 0;
137 int shut_down ();
139 const ACE_Thread_ID& thread_id (Worker *worker);
141 virtual int return_to_work (Worker *worker)
143 ACE_GUARD_RETURN (ACE_Thread_Mutex,
144 worker_mon, this->workers_lock_, -1);
145 ACE_DEBUG ((LM_DEBUG,
146 ACE_TEXT ("(%t) Worker %t returning to work.\n")));
147 this->workers_.enqueue_tail (worker);
148 this->workers_cond_.signal ();
150 return 0;
153 private:
154 int create_worker_pool ()
156 ACE_GUARD_RETURN (ACE_Thread_Mutex,
157 worker_mon,
158 this->workers_lock_,
159 -1);
160 for (int i = 0; i < POOL_SIZE; i++)
162 Worker *worker = 0;
163 ACE_NEW_RETURN (worker, Worker (this), -1);
164 this->workers_.enqueue_tail (worker);
165 worker->activate ();
168 return 0;
171 int done ();
173 private:
174 int shutdown_;
175 ACE_Thread_Mutex workers_lock_;
176 ACE_Condition<ACE_Thread_Mutex> workers_cond_;
177 ACE_Unbounded_Queue<Worker* > workers_;
179 // Listing 1
181 int Manager::done ()
183 return (shutdown_ == 1);
187 Manager::shut_down ()
189 ACE_TRACE ("Manager::shut_down");
190 ACE_Unbounded_Queue<Worker* >::ITERATOR iter =
191 this->workers_.begin ();
192 Worker **worker_ptr = 0;
195 iter.next (worker_ptr);
196 Worker *worker = (*worker_ptr);
197 ACE_Thread_ID id = thread_id (worker);
198 char buf [65];
199 id.to_string (buf);
200 ACE_DEBUG ((LM_DEBUG,
201 ACE_TEXT ("(%t) Attempting shutdown of %C\n"),
202 buf));
204 // Send the hangup message.
205 ACE_Message_Block *mb = 0;
206 ACE_NEW_RETURN
207 (mb,
208 ACE_Message_Block(0,
209 ACE_Message_Block::MB_HANGUP),
210 -1);
211 worker->putq (mb);
213 // Wait for the exit.
214 worker->wait ();
216 ACE_ASSERT (worker->msg_queue ()->is_empty ());
217 ACE_DEBUG ((LM_DEBUG,
218 ACE_TEXT ("(%t) Worker %C shut down.\n"),
219 buf));
220 delete worker;
222 while (iter.advance ());
224 shutdown_ = 1;
226 return 0;
229 const ACE_Thread_ID&
230 Manager::thread_id (Worker *worker)
232 return worker->thread_id ();
235 int ACE_TMAIN (int, ACE_TCHAR *[])
237 Manager tp;
238 tp.activate ();
240 // Wait for a moment every time you send a message.
241 ACE_Time_Value tv;
242 tv.msec (100);
244 ACE_Message_Block *mb = 0;
245 for (int i = 0; i < 30; i++)
247 ACE_NEW_RETURN
248 (mb, ACE_Message_Block(sizeof(int)), -1);
250 ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
252 ACE_OS::sleep (tv);
254 // Add a new work item.
255 tp.putq (mb);
258 ACE_Thread_Manager::instance ()->wait ();
259 return 0;
262 #else
263 #include "ace/OS_main.h"
264 #include "ace/OS_NS_stdio.h"
266 int ACE_TMAIN (int, ACE_TCHAR *[])
268 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
269 return 0;
272 #endif /* ACE_HAS_THREADS */