1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/worker_pool_posix.h"
10 #include "base/callback.h"
11 #include "base/synchronization/condition_variable.h"
12 #include "base/synchronization/lock.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/threading/platform_thread.h"
15 #include "base/time/time.h"
16 #include "testing/gtest/include/gtest/gtest.h"
20 // Peer class to provide passthrough access to PosixDynamicThreadPool internals.
21 class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer
{
23 explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool
* pool
)
26 Lock
* lock() { return &pool_
->lock_
; }
27 ConditionVariable
* pending_tasks_available_cv() {
28 return &pool_
->pending_tasks_available_cv_
;
30 size_t num_pending_tasks() const { return pool_
->pending_tasks_
.size(); }
31 size_t num_idle_threads() const { return pool_
->num_idle_threads_
; }
32 ConditionVariable
* num_threads_cv() { return pool_
->num_threads_cv_
.get(); }
33 void set_num_threads_cv(ConditionVariable
* cv
) {
34 pool_
->num_threads_cv_
.reset(cv
);
36 const std::vector
<PlatformThreadHandle
>& threads_to_cleanup() const {
37 return pool_
->threads_to_cleanup_
;
39 const std::vector
<PlatformThreadHandle
>& worker_threads() const {
40 return pool_
->worker_threads_
;
44 PosixDynamicThreadPool
* pool_
;
46 DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer
);
51 const int64 kDefaultIdleSecondsBeforeExit
= 60 * 60;
53 // IncrementingTask's main purpose is to increment a counter. It also updates a
54 // set of unique thread ids, and signals a ConditionVariable on completion.
55 // Note that since it does not block, there is no way to control the number of
56 // threads used if more than one IncrementingTask is consecutively posted to the
57 // thread pool, since the first one might finish executing before the subsequent
58 // PostTask() calls get invoked.
59 void IncrementingTask(Lock
* counter_lock
,
61 Lock
* unique_threads_lock
,
62 std::set
<PlatformThreadId
>* unique_threads
) {
64 AutoLock
locked(*unique_threads_lock
);
65 unique_threads
->insert(PlatformThread::CurrentId());
67 AutoLock
locked(*counter_lock
);
71 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that
72 // allows for waiting at the start of Run() for a WaitableEvent to be signalled.
73 struct BlockingIncrementingTaskArgs
{
76 Lock
* unique_threads_lock
;
77 std::set
<PlatformThreadId
>* unique_threads
;
78 Lock
* num_waiting_to_start_lock
;
79 int* num_waiting_to_start
;
80 ConditionVariable
* num_waiting_to_start_cv
;
84 void BlockingIncrementingTask(const BlockingIncrementingTaskArgs
& args
) {
86 AutoLock
num_waiting_to_start_locked(*args
.num_waiting_to_start_lock
);
87 (*args
.num_waiting_to_start
)++;
89 args
.num_waiting_to_start_cv
->Signal();
91 IncrementingTask(args
.counter_lock
, args
.counter
, args
.unique_threads_lock
,
95 class PosixDynamicThreadPoolTest
: public testing::Test
{
97 PosixDynamicThreadPoolTest()
99 num_waiting_to_start_(0),
100 num_waiting_to_start_cv_(&num_waiting_to_start_lock_
),
101 start_(true, false) {}
103 void TearDown() override
{
104 // Wake up the idle threads so they can terminate.
106 pool_
->Terminate(false);
109 void Initialize(TimeDelta idle_time_before_exit
) {
110 pool_
= new PosixDynamicThreadPool("dynamic_pool", idle_time_before_exit
);
112 new PosixDynamicThreadPool::PosixDynamicThreadPoolPeer(pool_
.get()));
113 peer_
->set_num_threads_cv(new ConditionVariable(peer_
->lock()));
116 void WaitForTasksToStart(int num_tasks
) {
117 AutoLock
num_waiting_to_start_locked(num_waiting_to_start_lock_
);
118 while (num_waiting_to_start_
< num_tasks
) {
119 num_waiting_to_start_cv_
.Wait();
123 void WaitForIdleThreads(size_t num_idle_threads
) {
124 AutoLock
pool_locked(*peer_
->lock());
125 while (peer_
->num_idle_threads() != num_idle_threads
) {
126 peer_
->num_threads_cv()->Wait();
130 void WaitForLivingThreads(int num_living_threads
) {
131 AutoLock
pool_locked(*peer_
->lock());
132 while (static_cast<int>(peer_
->worker_threads().size()) !=
133 num_living_threads
) {
134 peer_
->num_threads_cv()->Wait();
138 Closure
CreateNewIncrementingTaskCallback() {
139 return Bind(&IncrementingTask
, &counter_lock_
, &counter_
,
140 &unique_threads_lock_
, &unique_threads_
);
143 Closure
CreateNewBlockingIncrementingTaskCallback() {
144 BlockingIncrementingTaskArgs args
= {
145 &counter_lock_
, &counter_
, &unique_threads_lock_
, &unique_threads_
,
146 &num_waiting_to_start_lock_
, &num_waiting_to_start_
,
147 &num_waiting_to_start_cv_
, &start_
149 return Bind(&BlockingIncrementingTask
, args
);
152 scoped_refptr
<PosixDynamicThreadPool
> pool_
;
153 scoped_ptr
<PosixDynamicThreadPool::PosixDynamicThreadPoolPeer
> peer_
;
156 Lock unique_threads_lock_
;
157 std::set
<PlatformThreadId
> unique_threads_
;
158 Lock num_waiting_to_start_lock_
;
159 int num_waiting_to_start_
;
160 ConditionVariable num_waiting_to_start_cv_
;
161 WaitableEvent start_
;
166 TEST_F(PosixDynamicThreadPoolTest
, Basic
) {
167 Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit
));
169 EXPECT_EQ(0U, peer_
->num_idle_threads());
170 EXPECT_EQ(0U, unique_threads_
.size());
171 EXPECT_EQ(0U, peer_
->num_pending_tasks());
173 // Add one task and wait for it to be completed.
174 pool_
->PostTask(FROM_HERE
, CreateNewIncrementingTaskCallback());
176 WaitForIdleThreads(1);
178 EXPECT_EQ(1U, unique_threads_
.size()) <<
179 "There should be only one thread allocated for one task.";
180 EXPECT_EQ(1, counter_
);
183 TEST_F(PosixDynamicThreadPoolTest
, ReuseIdle
) {
184 Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit
));
186 // Add one task and wait for it to be completed.
187 pool_
->PostTask(FROM_HERE
, CreateNewIncrementingTaskCallback());
189 WaitForIdleThreads(1);
191 // Add another 2 tasks. One should reuse the existing worker thread.
192 pool_
->PostTask(FROM_HERE
, CreateNewBlockingIncrementingTaskCallback());
193 pool_
->PostTask(FROM_HERE
, CreateNewBlockingIncrementingTaskCallback());
195 WaitForTasksToStart(2);
197 WaitForIdleThreads(2);
199 EXPECT_EQ(2U, unique_threads_
.size());
200 EXPECT_EQ(2U, peer_
->num_idle_threads());
201 EXPECT_EQ(3, counter_
);
204 TEST_F(PosixDynamicThreadPoolTest
, TwoActiveTasks
) {
205 Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit
));
207 // Add two blocking tasks.
208 pool_
->PostTask(FROM_HERE
, CreateNewBlockingIncrementingTaskCallback());
209 pool_
->PostTask(FROM_HERE
, CreateNewBlockingIncrementingTaskCallback());
211 EXPECT_EQ(0, counter_
) << "Blocking tasks should not have started yet.";
213 WaitForTasksToStart(2);
215 WaitForIdleThreads(2);
217 EXPECT_EQ(2U, unique_threads_
.size());
218 EXPECT_EQ(2U, peer_
->num_idle_threads()) << "Existing threads are now idle.";
219 EXPECT_EQ(2, counter_
);
222 TEST_F(PosixDynamicThreadPoolTest
, Complex
) {
223 Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit
));
225 // Add one non blocking tasks and wait for it to finish.
226 pool_
->PostTask(FROM_HERE
, CreateNewIncrementingTaskCallback());
228 WaitForIdleThreads(1);
230 // Add two blocking tasks, start them simultaneously, and wait for them to
232 pool_
->PostTask(FROM_HERE
, CreateNewBlockingIncrementingTaskCallback());
233 pool_
->PostTask(FROM_HERE
, CreateNewBlockingIncrementingTaskCallback());
235 WaitForTasksToStart(2);
237 WaitForIdleThreads(2);
239 EXPECT_EQ(3, counter_
);
240 EXPECT_EQ(2U, peer_
->num_idle_threads());
241 EXPECT_EQ(2U, unique_threads_
.size());
243 // Wake up all idle threads so they can exit.
245 AutoLock
locked(*peer_
->lock());
246 while (peer_
->worker_threads().size() > 0) {
247 peer_
->pending_tasks_available_cv()->Signal();
248 peer_
->num_threads_cv()->Wait();
252 // Add another non blocking task. There are no threads to reuse.
253 pool_
->PostTask(FROM_HERE
, CreateNewIncrementingTaskCallback());
254 WaitForIdleThreads(1);
256 // The POSIX implementation of PlatformThread::CurrentId() uses pthread_self()
257 // which is not guaranteed to be unique after a thread joins. The OS X
258 // implemntation of pthread_self() returns the address of the pthread_t, which
259 // is merely a malloc()ed pointer stored in the first TLS slot. When a thread
260 // joins and that structure is freed, the block of memory can be put on the
261 // OS free list, meaning the same address could be reused in a subsequent
262 // allocation. This in fact happens when allocating in a loop as this test
265 // Because there are two concurrent threads, there's at least the guarantee
266 // of having two unique thread IDs in the set. But after those two threads are
267 // joined, the next-created thread can get a re-used ID if the allocation of
268 // the pthread_t structure is taken from the free list. Therefore, there can
269 // be either 2 or 3 unique thread IDs in the set at this stage in the test.
270 EXPECT_TRUE(unique_threads_
.size() >= 2 && unique_threads_
.size() <= 3)
271 << "unique_threads_.size() = " << unique_threads_
.size();
272 EXPECT_EQ(1U, peer_
->num_idle_threads());
273 EXPECT_EQ(4, counter_
);
276 TEST_F(PosixDynamicThreadPoolTest
, NoNewThreadForCleanup
) {
277 // Let worker threads quit quickly after they are idle.
278 Initialize(TimeDelta::FromMilliseconds(1));
280 for (size_t i
= 0; i
< 2; ++i
) {
281 // This will create a worker thread.
282 pool_
->PostTask(FROM_HERE
, CreateNewBlockingIncrementingTaskCallback());
284 WaitForTasksToStart(1);
286 PlatformThreadHandle worker
;
288 AutoLock
locked(*peer_
->lock());
289 ASSERT_EQ(1u, peer_
->worker_threads().size());
290 worker
= peer_
->worker_threads()[0];
295 // Wait for the worker thread to quit.
296 WaitForLivingThreads(0);
299 AutoLock
locked(*peer_
->lock());
300 // The thread that just quit is recorded for cleanup. But we don't create
301 // a worker thread just for doing that.
302 ASSERT_EQ(1u, peer_
->threads_to_cleanup().size());
303 EXPECT_TRUE(worker
.is_equal(peer_
->threads_to_cleanup()[0]));
304 EXPECT_TRUE(peer_
->worker_threads().empty());
308 pool_
->Terminate(true);
311 AutoLock
locked(*peer_
->lock());
312 EXPECT_TRUE(peer_
->threads_to_cleanup().empty());
313 EXPECT_TRUE(peer_
->worker_threads().empty());
317 TEST_F(PosixDynamicThreadPoolTest
, BlockingTerminate
) {
318 // Let worker threads quit quickly after they are idle.
319 Initialize(TimeDelta::FromMilliseconds(3));
321 for (size_t i
= 0; i
< 5; ++i
) {
322 PlatformThread::Sleep(TimeDelta::FromMilliseconds(i
));
323 for (size_t j
= 0; j
< 50; ++j
)
324 pool_
->PostTask(FROM_HERE
, CreateNewIncrementingTaskCallback());
327 pool_
->Terminate(true);
330 AutoLock
locked(*peer_
->lock());
331 EXPECT_TRUE(peer_
->threads_to_cleanup().empty());
332 EXPECT_TRUE(peer_
->worker_threads().empty());
335 int counter
= counter_
;
336 EXPECT_GE(5 * 50, counter
);
337 EXPECT_GE(5 * 50u, unique_threads_
.size());
339 // Make sure that no threads are still running and trying to modify
341 PlatformThread::Sleep(TimeDelta::FromMilliseconds(10));
342 EXPECT_EQ(counter
, counter_
);