1 #include "ace/config-lite.h"
2 #if defined (ACE_HAS_THREADS)
4 #include "ace/OS_NS_string.h"
5 #include "ace/OS_NS_time.h"
7 #include "ace/Unbounded_Queue.h"
9 #include "ace/SString.h"
10 #include "ace/Method_Request.h"
11 #include "ace/Future.h"
12 #include "ace/Activation_Queue.h"
13 #include "ace/Condition_T.h"
15 #define OUTSTANDING_REQUESTS 20
17 // Listing 2 code/ch16
18 class CompletionCallBack
: public ACE_Future_Observer
<ACE_CString
*>
21 virtual void update (const ACE_Future
<ACE_CString
*> & future
)
23 ACE_CString
*result
= 0;
25 // Block for the result.
27 ACE_DEBUG ((LM_INFO
, ACE_TEXT("%C\n"), result
->c_str ()));
32 // Listing 1 code/ch16
33 class LongWork
: public ACE_Method_Request
38 ACE_TRACE ("LongWork::call");
40 ((LM_INFO
, ACE_TEXT ("(%t) Attempting long work task\n")));
44 ACE_OS::strcpy (buf
, "Completed assigned task\n");
47 (msg
, ACE_CString (buf
, ACE_OS::strlen (buf
) + 1), -1);
52 ACE_Future
<ACE_CString
*> &future ()
54 ACE_TRACE ("LongWork::future");
58 void attach (CompletionCallBack
*cb
)
64 ACE_Future
<ACE_CString
*> result_
;
68 class Exit
: public ACE_Method_Request
73 ACE_TRACE ("Exit::call");
83 virtual ~IManager () { }
85 virtual int return_to_work (Worker
*worker
) = 0;
88 // Listing 3 code/ch16
89 class Worker
: public ACE_Task
<ACE_MT_SYNCH
>
92 Worker (IManager
*manager
)
93 : manager_(manager
), queue_ (msg_queue ())
96 int perform (ACE_Method_Request
*req
)
98 ACE_TRACE ("Worker::perform");
99 return this->queue_
.enqueue (req
);
104 thread_id_
= ACE_Thread::self ();
107 ACE_Method_Request
*request
= this->queue_
.dequeue();
111 // Invoke the request
112 int result
= request
->call ();
117 this->manager_
->return_to_work (this);
123 ACE_thread_t
thread_id ();
127 ACE_thread_t thread_id_
;
128 ACE_Activation_Queue queue_
;
132 ACE_thread_t
Worker::thread_id ()
137 // Listing 4 code/ch16
138 class Manager
: public ACE_Task_Base
, private IManager
141 enum {POOL_SIZE
= 5, MAX_TIMEOUT
= 5};
144 : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_
)
146 ACE_TRACE ("Manager");
149 int perform (ACE_Method_Request
*req
)
151 ACE_TRACE ("perform");
152 return this->queue_
.enqueue (req
);
159 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%t) Manager started\n")));
161 // Create pool when you get in the first time.
162 create_worker_pool ();
166 ACE_Time_Value
tv ((long)MAX_TIMEOUT
);
167 tv
+= ACE_OS::time (0);
169 // Get the next message
170 ACE_Method_Request
*request
= this->queue_
.dequeue (&tv
);
178 Worker
*worker
= choose_worker ();
180 // Ask the worker to do the job.
181 worker
->perform (request
);
189 virtual int return_to_work (Worker
*worker
)
192 (ACE_Thread_Mutex
, worker_mon
, this->workers_lock_
, -1);
194 ((LM_DEBUG
, ACE_TEXT ("(%t) Worker returning to work.\n")));
195 this->workers_
.enqueue_tail (worker
);
196 this->workers_cond_
.signal ();
202 Worker
*choose_worker ()
205 (ACE_Thread_Mutex
, worker_mon
, this->workers_lock_
, 0)
207 while (this->workers_
.is_empty ())
208 workers_cond_
.wait ();
211 this->workers_
.dequeue_head (worker
);
215 int create_worker_pool ()
218 (ACE_Thread_Mutex
, worker_mon
, this->workers_lock_
, -1);
219 for (int i
= 0; i
< POOL_SIZE
; i
++)
222 ACE_NEW_RETURN (worker
, Worker (this), -1);
223 this->workers_
.enqueue_tail (worker
);
232 return (shutdown_
== 1);
235 ACE_thread_t
thread_id (Worker
*worker
)
237 return worker
->thread_id ();
242 ACE_Thread_Mutex workers_lock_
;
243 ACE_Condition
<ACE_Thread_Mutex
> workers_cond_
;
244 ACE_Unbounded_Queue
<Worker
* > workers_
;
245 ACE_Activation_Queue queue_
;
250 Manager::shut_down ()
252 ACE_TRACE ("Manager::shut_down");
253 ACE_Unbounded_Queue
<Worker
* >::ITERATOR iter
= this->workers_
.begin ();
254 Worker
**worker_ptr
= 0;
257 iter
.next (worker_ptr
);
258 Worker
*worker
= (*worker_ptr
);
259 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Attempting shutdown of %d\n"),
260 thread_id (worker
)));
263 ACE_NEW_RETURN (req
, Exit(), -1);
265 // Send the hangup message
266 worker
->perform (req
);
268 // Wait for the exit.
271 ACE_DEBUG ((LM_DEBUG
,
272 ACE_TEXT ("(%t) Worker %d shut down.\n"),
273 thread_id (worker
)));
278 while (iter
.advance ());
285 // Listing 5 code/ch16
286 int ACE_TMAIN (int, ACE_TCHAR
*[])
294 // Wait for a few seconds every time you send a message.
295 CompletionCallBack cb
;
296 LongWork workArray
[OUTSTANDING_REQUESTS
];
297 for (int i
= 0; i
< OUTSTANDING_REQUESTS
; i
++)
299 workArray
[i
].attach (&cb
);
301 tp
.perform (&workArray
[i
]);
304 ACE_Thread_Manager::instance ()->wait ();
310 #include "ace/OS_main.h"
311 #include "ace/OS_NS_stdio.h"
313 int ACE_TMAIN (int, ACE_TCHAR
*[])
315 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
319 #endif /* ACE_HAS_THREADS */