1 #include "ace/config-lite.h"
3 #include "ace/OS_NS_string.h"
4 #include "ace/OS_NS_time.h"
6 #include "ace/Containers.h"
8 #include "ace/SString.h"
9 #include "ace/Method_Request.h"
10 #include "ace/Future.h"
11 #include "ace/Activation_Queue.h"
12 #include "ace/Condition_T.h"
13 #include "test_config.h"
15 #if defined (ACE_HAS_THREADS)
22 virtual ~IManager () { }
24 virtual int return_to_work (Worker
*worker
) = 0;
27 // Listing 2 code/ch16
28 class Worker
: public ACE_Task
<ACE_MT_SYNCH
>
31 Worker (IManager
*manager
) : manager_(manager
) { }
39 ACE_Message_Block
*mb
= 0;
40 if (this->getq (mb
) == -1)
42 ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")));
43 if (mb
->msg_type () == ACE_Message_Block::MB_HANGUP
)
46 ACE_TEXT ("(%t) Shutting down\n")));
50 // Process the message.
53 this->manager_
->return_to_work (this);
59 const ACE_Thread_ID
& thread_id ()
65 void process_message (ACE_Message_Block
*mb
)
67 ACE_TRACE ("Worker::process_message");
69 ACE_OS::memcpy (&msgId
, mb
->rd_ptr (), sizeof(int));
73 ACE_TEXT ("(%t) Started processing message %d\n"),
77 ACE_TEXT ("(%t) Finished processing message %d\n"),
82 ACE_Thread_ID thread_id_
;
85 // Listing 1 code/ch16
86 class Manager
: public ACE_Task
<ACE_MT_SYNCH
>, private IManager
89 enum {POOL_SIZE
= 5, MAX_TIMEOUT
= 5};
92 : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_
)
94 ACE_TRACE ("Manager::Manager");
99 ACE_TRACE ("Manager::svc");
101 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%t) Manager started\n")));
104 create_worker_pool ();
108 ACE_Message_Block
*mb
= 0;
109 ACE_Time_Value
tv ((long)MAX_TIMEOUT
);
110 tv
+= ACE_OS::time (0);
112 // Get a message request.
113 if (this->getq (mb
, &tv
) < 0)
122 ACE_GUARD_RETURN (ACE_Mutex
,
123 worker_mon
, this->workers_lock_
, -1);
125 while (this->workers_
.is_empty ())
126 workers_cond_
.wait ();
128 this->workers_
.dequeue_head (worker
);
131 // Ask the worker to do the job.
140 const ACE_Thread_ID
& thread_id (Worker
*worker
);
142 int return_to_work (Worker
*worker
) override
144 ACE_GUARD_RETURN (ACE_Mutex
,
145 worker_mon
, this->workers_lock_
, -1);
146 ACE_DEBUG ((LM_DEBUG
,
147 ACE_TEXT ("(%t) Worker %t returning to work.\n")));
148 this->workers_
.enqueue_tail (worker
);
149 this->workers_cond_
.signal ();
155 int create_worker_pool ()
157 ACE_GUARD_RETURN (ACE_Mutex
,
161 for (int i
= 0; i
< POOL_SIZE
; i
++)
164 ACE_NEW_RETURN (worker
, Worker (this), -1);
165 this->workers_
.enqueue_tail (worker
);
176 ACE_Mutex workers_lock_
;
177 ACE_Condition
<ACE_Mutex
> workers_cond_
;
178 ACE_Unbounded_Queue
<Worker
* > workers_
;
184 return (shutdown_
== 1);
188 Manager::shut_down ()
190 ACE_TRACE ("Manager::shut_down");
191 ACE_Unbounded_Queue
<Worker
* >::ITERATOR iter
=
192 this->workers_
.begin ();
193 Worker
**worker_ptr
= 0;
196 iter
.next (worker_ptr
);
197 Worker
*worker
= (*worker_ptr
);
198 ACE_Thread_ID id
= thread_id (worker
);
200 id
.to_string (buf
, 65);
201 ACE_DEBUG ((LM_DEBUG
,
202 ACE_TEXT ("(%t) Attempting shutdown of %C\n"),
205 // Send the hangup message.
206 ACE_Message_Block
*mb
= 0;
210 ACE_Message_Block::MB_HANGUP
),
214 // Wait for the exit.
217 ACE_TEST_ASSERT (worker
->msg_queue ()->is_empty ());
218 ACE_DEBUG ((LM_DEBUG
,
219 ACE_TEXT ("(%t) Worker %C shut down.\n"),
223 while (iter
.advance ());
231 Manager::thread_id (Worker
*worker
)
233 return worker
->thread_id ();
238 run_main (int, ACE_TCHAR
*[])
240 ACE_START_TEST (ACE_TEXT ("Bug_3878_Regression_Test"));
242 #if defined (ACE_HAS_THREADS)
246 // Wait for a moment every time you send a message.
250 ACE_Message_Block
*mb
= 0;
251 for (int i
= 0; i
< 3; i
++)
254 (mb
, ACE_Message_Block(sizeof(int)), -1);
256 ACE_OS::memcpy (mb
->wr_ptr (), &i
, sizeof(int));
260 // Add a new work item.
264 ACE_Thread_Manager::instance ()->wait ();
265 #else /* ACE_HAS_THREADS */
267 ACE_TEXT ("threads are not supported on this platform\n")));