Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / APG / ThreadPools / Futures.cpp
blob16288868fa663bb53c6fb3467a3e650ffe79b3e9
1 #include "ace/config-lite.h"
2 #if defined (ACE_HAS_THREADS)
4 #include "ace/OS_NS_string.h"
5 #include "ace/OS_NS_time.h"
6 #include "ace/Task.h"
7 #include "ace/Unbounded_Queue.h"
8 #include "ace/Synch.h"
9 #include "ace/SString.h"
10 #include "ace/Method_Request.h"
11 #include "ace/Future.h"
12 #include "ace/Activation_Queue.h"
13 #include "ace/Condition_T.h"
15 #define OUTSTANDING_REQUESTS 20
17 // Listing 2 code/ch16
18 class CompletionCallBack: public ACE_Future_Observer<ACE_CString*>
20 public:
21 virtual void update (const ACE_Future<ACE_CString*> & future)
23 ACE_CString *result = 0;
25 // Block for the result.
26 future.get (result);
27 ACE_DEBUG ((LM_INFO, ACE_TEXT("%C\n"), result->c_str ()));
28 delete result;
31 // Listing 2
32 // Listing 1 code/ch16
33 class LongWork : public ACE_Method_Request
35 public:
36 virtual int call ()
38 ACE_TRACE ("LongWork::call");
39 ACE_DEBUG
40 ((LM_INFO, ACE_TEXT ("(%t) Attempting long work task\n")));
41 ACE_OS::sleep (1);
43 char buf[1024];
44 ACE_OS::strcpy (buf, "Completed assigned task\n");
45 ACE_CString *msg;
46 ACE_NEW_RETURN
47 (msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1);
48 result_.set (msg);
49 return 0;
52 ACE_Future<ACE_CString*> &future ()
54 ACE_TRACE ("LongWork::future");
55 return result_;
58 void attach (CompletionCallBack *cb)
60 result_.attach (cb);
63 private:
64 ACE_Future<ACE_CString*> result_;
66 // Listing 1
68 class Exit : public ACE_Method_Request
70 public:
71 virtual int call ()
73 ACE_TRACE ("Exit::call");
74 return -1;
78 class Worker;
80 class IManager
82 public:
83 virtual ~IManager () { }
85 virtual int return_to_work (Worker *worker) = 0;
88 // Listing 3 code/ch16
89 class Worker: public ACE_Task<ACE_MT_SYNCH>
91 public:
92 Worker (IManager *manager)
93 : manager_(manager), queue_ (msg_queue ())
94 { }
96 int perform (ACE_Method_Request *req)
98 ACE_TRACE ("Worker::perform");
99 return this->queue_.enqueue (req);
102 virtual int svc ()
104 thread_id_ = ACE_Thread::self ();
105 while (1)
107 ACE_Method_Request *request = this->queue_.dequeue();
108 if (request == 0)
109 return -1;
111 // Invoke the request
112 int result = request->call ();
113 if (result == -1)
114 break;
116 // Return to work.
117 this->manager_->return_to_work (this);
120 return 0;
123 ACE_thread_t thread_id ();
125 private:
126 IManager *manager_;
127 ACE_thread_t thread_id_;
128 ACE_Activation_Queue queue_;
130 // Listing 3
132 ACE_thread_t Worker::thread_id ()
134 return thread_id_;
137 // Listing 4 code/ch16
138 class Manager : public ACE_Task_Base, private IManager
140 public:
141 enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
143 Manager ()
144 : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
146 ACE_TRACE ("Manager");
149 int perform (ACE_Method_Request *req)
151 ACE_TRACE ("perform");
152 return this->queue_.enqueue (req);
155 int svc ()
157 ACE_TRACE ("svc");
159 ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n")));
161 // Create pool when you get in the first time.
162 create_worker_pool ();
164 while (!done ())
166 ACE_Time_Value tv ((long)MAX_TIMEOUT);
167 tv += ACE_OS::time (0);
169 // Get the next message
170 ACE_Method_Request *request = this->queue_.dequeue (&tv);
171 if (request == 0)
173 shut_down ();
174 break;
177 // Choose a worker.
178 Worker *worker = choose_worker ();
180 // Ask the worker to do the job.
181 worker->perform (request);
184 return 0;
187 int shut_down ();
189 virtual int return_to_work (Worker *worker)
191 ACE_GUARD_RETURN
192 (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
193 ACE_DEBUG
194 ((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work.\n")));
195 this->workers_.enqueue_tail (worker);
196 this->workers_cond_.signal ();
198 return 0;
201 private:
202 Worker *choose_worker ()
204 ACE_GUARD_RETURN
205 (ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0)
207 while (this->workers_.is_empty ())
208 workers_cond_.wait ();
210 Worker *worker = 0;
211 this->workers_.dequeue_head (worker);
212 return worker;
215 int create_worker_pool ()
217 ACE_GUARD_RETURN
218 (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1);
219 for (int i = 0; i < POOL_SIZE; i++)
221 Worker *worker;
222 ACE_NEW_RETURN (worker, Worker (this), -1);
223 this->workers_.enqueue_tail (worker);
224 worker->activate ();
227 return 0;
230 int done ()
232 return (shutdown_ == 1);
235 ACE_thread_t thread_id (Worker *worker)
237 return worker->thread_id ();
240 private:
241 int shutdown_;
242 ACE_Thread_Mutex workers_lock_;
243 ACE_Condition<ACE_Thread_Mutex> workers_cond_;
244 ACE_Unbounded_Queue<Worker* > workers_;
245 ACE_Activation_Queue queue_;
247 // Listing 4
250 Manager::shut_down ()
252 ACE_TRACE ("Manager::shut_down");
253 ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin ();
254 Worker **worker_ptr = 0;
257 iter.next (worker_ptr);
258 Worker *worker = (*worker_ptr);
259 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %d\n"),
260 thread_id (worker)));
262 Exit *req;
263 ACE_NEW_RETURN (req, Exit(), -1);
265 // Send the hangup message
266 worker->perform (req);
268 // Wait for the exit.
269 worker->wait ();
271 ACE_DEBUG ((LM_DEBUG,
272 ACE_TEXT ("(%t) Worker %d shut down.\n"),
273 thread_id (worker)));
275 delete req;
276 delete worker;
278 while (iter.advance ());
280 shutdown_ = 1;
282 return 0;
285 // Listing 5 code/ch16
286 int ACE_TMAIN (int, ACE_TCHAR *[])
288 Manager tp;
289 tp.activate ();
291 ACE_Time_Value tv;
292 tv.msec (100);
294 // Wait for a few seconds every time you send a message.
295 CompletionCallBack cb;
296 LongWork workArray[OUTSTANDING_REQUESTS];
297 for (int i = 0; i < OUTSTANDING_REQUESTS; i++)
299 workArray[i].attach (&cb);
300 ACE_OS::sleep (tv);
301 tp.perform (&workArray[i]);
304 ACE_Thread_Manager::instance ()->wait ();
305 return 0;
307 // Listing 5
309 #else
310 #include "ace/OS_main.h"
311 #include "ace/OS_NS_stdio.h"
313 int ACE_TMAIN (int, ACE_TCHAR *[])
315 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
316 return 0;
319 #endif /* ACE_HAS_THREADS */