1 #include "ace/config-lite.h"
2 #if defined (ACE_HAS_THREADS)
4 #include "ace/OS_NS_string.h"
5 #include "ace/OS_NS_time.h"
7 #include "ace/Containers.h"
9 #include "ace/SString.h"
10 #include "ace/Method_Request.h"
11 #include "ace/Future.h"
12 #include "ace/Activation_Queue.h"
13 #include "ace/Condition_T.h"
20 virtual ~IManager () { }
22 virtual int return_to_work (Worker
*worker
) = 0;
25 // Listing 2 code/ch16
26 class Worker
: public ACE_Task
<ACE_MT_SYNCH
>
29 Worker (IManager
*manager
) : manager_(manager
) { }
37 ACE_Message_Block
*mb
= 0;
38 if (this->getq (mb
) == -1)
40 ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")));
41 if (mb
->msg_type () == ACE_Message_Block::MB_HANGUP
)
44 ACE_TEXT ("(%t) Shutting down\n")));
48 // Process the message.
51 this->manager_
->return_to_work (this);
58 const ACE_Thread_ID
& thread_id ()
60 return this->thread_id_
;
64 void process_message (ACE_Message_Block
*mb
)
66 ACE_TRACE ("Worker::process_message");
68 ACE_OS::memcpy (&msgId
, mb
->rd_ptr (), sizeof(int));
72 ACE_TEXT ("(%t) Started processing message %d\n"),
76 ACE_TEXT ("(%t) Finished processing message %d\n"),
81 ACE_Thread_ID thread_id_
;
84 // Listing 1 code/ch16
85 class Manager
: public ACE_Task
<ACE_MT_SYNCH
>, private IManager
88 enum {POOL_SIZE
= 5, MAX_TIMEOUT
= 5};
91 : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_
)
93 ACE_TRACE ("Manager::Manager");
98 ACE_TRACE ("Manager::svc");
100 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%t) Manager started\n")));
103 create_worker_pool ();
107 ACE_Message_Block
*mb
= 0;
108 ACE_Time_Value
tv ((long)MAX_TIMEOUT
);
109 tv
+= ACE_OS::time (0);
111 // Get a message request.
112 if (this->getq (mb
, &tv
) < 0)
121 ACE_GUARD_RETURN (ACE_Thread_Mutex
,
122 worker_mon
, this->workers_lock_
, -1);
124 while (this->workers_
.is_empty ())
125 workers_cond_
.wait ();
127 this->workers_
.dequeue_head (worker
);
130 // Ask the worker to do the job.
139 const ACE_Thread_ID
& thread_id (Worker
*worker
);
141 virtual int return_to_work (Worker
*worker
)
143 ACE_GUARD_RETURN (ACE_Thread_Mutex
,
144 worker_mon
, this->workers_lock_
, -1);
145 ACE_DEBUG ((LM_DEBUG
,
146 ACE_TEXT ("(%t) Worker %t returning to work.\n")));
147 this->workers_
.enqueue_tail (worker
);
148 this->workers_cond_
.signal ();
154 int create_worker_pool ()
156 ACE_GUARD_RETURN (ACE_Thread_Mutex
,
160 for (int i
= 0; i
< POOL_SIZE
; i
++)
163 ACE_NEW_RETURN (worker
, Worker (this), -1);
164 this->workers_
.enqueue_tail (worker
);
175 ACE_Thread_Mutex workers_lock_
;
176 ACE_Condition
<ACE_Thread_Mutex
> workers_cond_
;
177 ACE_Unbounded_Queue
<Worker
* > workers_
;
183 return (shutdown_
== 1);
187 Manager::shut_down ()
189 ACE_TRACE ("Manager::shut_down");
190 ACE_Unbounded_Queue
<Worker
* >::ITERATOR iter
=
191 this->workers_
.begin ();
192 Worker
**worker_ptr
= 0;
195 iter
.next (worker_ptr
);
196 Worker
*worker
= (*worker_ptr
);
197 ACE_Thread_ID id
= thread_id (worker
);
200 ACE_DEBUG ((LM_DEBUG
,
201 ACE_TEXT ("(%t) Attempting shutdown of %C\n"),
204 // Send the hangup message.
205 ACE_Message_Block
*mb
= 0;
209 ACE_Message_Block::MB_HANGUP
),
213 // Wait for the exit.
216 ACE_ASSERT (worker
->msg_queue ()->is_empty ());
217 ACE_DEBUG ((LM_DEBUG
,
218 ACE_TEXT ("(%t) Worker %C shut down.\n"),
222 while (iter
.advance ());
230 Manager::thread_id (Worker
*worker
)
232 return worker
->thread_id ();
235 int ACE_TMAIN (int, ACE_TCHAR
*[])
240 // Wait for a moment every time you send a message.
244 ACE_Message_Block
*mb
= 0;
245 for (int i
= 0; i
< 30; i
++)
248 (mb
, ACE_Message_Block(sizeof(int)), -1);
250 ACE_OS::memcpy (mb
->wr_ptr (), &i
, sizeof(int));
254 // Add a new work item.
258 ACE_Thread_Manager::instance ()->wait ();
263 #include "ace/OS_main.h"
264 #include "ace/OS_NS_stdio.h"
266 int ACE_TMAIN (int, ACE_TCHAR
*[])
268 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
272 #endif /* ACE_HAS_THREADS */