1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/worker_pool_posix.h"
8 #include "base/callback.h"
9 #include "base/lazy_instance.h"
10 #include "base/logging.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/threading/platform_thread.h"
14 #include "base/threading/thread_local.h"
15 #include "base/threading/worker_pool.h"
16 #include "base/trace_event/trace_event.h"
17 #include "base/tracked_objects.h"
19 using tracked_objects::TrackedTime
;
25 LazyInstance
<ThreadLocalBoolean
>::Leaky g_worker_pool_running_on_this_thread
=
26 LAZY_INSTANCE_INITIALIZER
;
28 const int64 kIdleSecondsBeforeExit
= 10 * 60;
30 class WorkerPoolImpl
{
35 void PostTask(const tracked_objects::Location
& from_here
,
39 void ShutDownCleanly();
42 scoped_refptr
<PosixDynamicThreadPool
> pool_
;
45 WorkerPoolImpl::WorkerPoolImpl()
46 : pool_(new PosixDynamicThreadPool(
48 TimeDelta::FromSeconds(kIdleSecondsBeforeExit
))) {
51 WorkerPoolImpl::~WorkerPoolImpl() {
52 pool_
->Terminate(false);
55 void WorkerPoolImpl::PostTask(const tracked_objects::Location
& from_here
,
58 pool_
->PostTask(from_here
, task
);
61 void WorkerPoolImpl::ShutDownCleanly() {
62 pool_
->Terminate(true);
65 LazyInstance
<WorkerPoolImpl
> g_lazy_worker_pool
= LAZY_INSTANCE_INITIALIZER
;
67 class WorkerThread
: public PlatformThread::Delegate
{
69 WorkerThread(const std::string
& name_prefix
, PosixDynamicThreadPool
* pool
)
70 : name_prefix_(name_prefix
), pool_(pool
) {}
72 void ThreadMain() override
;
75 const std::string name_prefix_
;
76 scoped_refptr
<PosixDynamicThreadPool
> pool_
;
78 DISALLOW_COPY_AND_ASSIGN(WorkerThread
);
81 void WorkerThread::ThreadMain() {
82 g_worker_pool_running_on_this_thread
.Get().Set(true);
83 const std::string name
=
84 StringPrintf("%s/%d", name_prefix_
.c_str(), PlatformThread::CurrentId());
85 // Note |name.c_str()| must remain valid for for the whole life of the thread.
86 PlatformThread::SetName(name
);
89 PendingTask pending_task
= pool_
->WaitForTask();
90 if (pending_task
.task
.is_null())
92 TRACE_EVENT2("toplevel", "WorkerThread::ThreadMain::Run",
93 "src_file", pending_task
.posted_from
.file_name(),
94 "src_func", pending_task
.posted_from
.function_name());
96 tracked_objects::TaskStopwatch stopwatch
;
98 pending_task
.task
.Run();
101 tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking(
102 pending_task
.birth_tally
, pending_task
.time_posted
, stopwatch
);
105 pool_
->NotifyWorkerIsGoingAway(PlatformThread::CurrentHandle());
112 bool WorkerPool::PostTask(const tracked_objects::Location
& from_here
,
115 g_lazy_worker_pool
.Pointer()->PostTask(from_here
, task
, task_is_slow
);
120 bool WorkerPool::RunsTasksOnCurrentThread() {
121 return g_worker_pool_running_on_this_thread
.Get().Get();
125 void WorkerPool::ShutDownCleanly() {
126 g_lazy_worker_pool
.Pointer()->ShutDownCleanly();
129 PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string
& name_prefix
,
130 TimeDelta idle_time_before_exit
)
131 : name_prefix_(name_prefix
),
132 idle_time_before_exit_(idle_time_before_exit
),
133 pending_tasks_available_cv_(&lock_
),
134 num_idle_threads_(0),
135 has_pending_cleanup_task_(false),
139 PosixDynamicThreadPool::~PosixDynamicThreadPool() {
140 while (!pending_tasks_
.empty())
141 pending_tasks_
.pop();
144 void PosixDynamicThreadPool::Terminate(bool blocking
) {
145 std::vector
<PlatformThreadHandle
> threads_to_cleanup
;
146 std::vector
<PlatformThreadHandle
> worker_threads
;
148 AutoLock
locked(lock_
);
153 threads_to_cleanup
.swap(threads_to_cleanup_
);
154 worker_threads
.swap(worker_threads_
);
156 pending_tasks_available_cv_
.Broadcast();
159 for (const auto& item
: threads_to_cleanup
)
160 PlatformThread::Join(item
);
162 for (const auto& item
: worker_threads
)
163 PlatformThread::Join(item
);
165 // No need to take the lock. No one else should be accessing these members.
166 DCHECK_EQ(0u, num_idle_threads_
);
167 // The following members should not have new elements added after
168 // |terminated_| is set to true.
169 DCHECK(threads_to_cleanup_
.empty());
170 DCHECK(worker_threads_
.empty());
174 void PosixDynamicThreadPool::PostTask(
175 const tracked_objects::Location
& from_here
,
176 const Closure
& task
) {
177 PendingTask
pending_task(from_here
, task
);
178 AutoLock
locked(lock_
);
179 AddTaskNoLock(&pending_task
);
182 PendingTask
PosixDynamicThreadPool::WaitForTask() {
183 AutoLock
locked(lock_
);
186 return PendingTask(FROM_HERE
, Closure());
188 if (pending_tasks_
.empty()) { // No work available, wait for work.
191 num_threads_cv_
->Broadcast();
192 pending_tasks_available_cv_
.TimedWait(idle_time_before_exit_
);
195 num_threads_cv_
->Broadcast();
196 if (pending_tasks_
.empty()) {
197 // We waited for work, but there's still no work. Return an empty task to
198 // signal the thread to terminate.
199 return PendingTask(FROM_HERE
, Closure());
203 PendingTask pending_task
= pending_tasks_
.front();
204 pending_tasks_
.pop();
208 void PosixDynamicThreadPool::NotifyWorkerIsGoingAway(
209 PlatformThreadHandle worker
) {
210 AutoLock
locked(lock_
);
214 auto new_end
= std::remove_if(worker_threads_
.begin(), worker_threads_
.end(),
215 [worker
](PlatformThreadHandle handle
) {
216 return handle
.is_equal(worker
);
218 DCHECK_EQ(1, worker_threads_
.end() - new_end
);
219 worker_threads_
.erase(new_end
, worker_threads_
.end());
221 threads_to_cleanup_
.push_back(worker
);
224 num_threads_cv_
->Broadcast();
226 if (!has_pending_cleanup_task_
) {
227 has_pending_cleanup_task_
= true;
228 PendingTask
pending_task(
230 base::Bind(&PosixDynamicThreadPool::CleanUpThreads
, Unretained(this)));
231 AddTaskNoLock(&pending_task
);
235 void PosixDynamicThreadPool::AddTaskNoLock(PendingTask
* pending_task
) {
236 lock_
.AssertAcquired();
240 << "This thread pool is already terminated. Do not post new tasks.";
244 pending_tasks_
.push(*pending_task
);
245 pending_task
->task
.Reset();
247 // We have enough worker threads.
248 if (num_idle_threads_
>=
249 pending_tasks_
.size() - (has_pending_cleanup_task_
? 1 : 0)) {
250 pending_tasks_available_cv_
.Signal();
252 // The new PlatformThread will take ownership of the WorkerThread object,
253 // which will delete itself on exit.
254 WorkerThread
* worker
= new WorkerThread(name_prefix_
, this);
255 PlatformThreadHandle handle
;
256 PlatformThread::Create(0, worker
, &handle
);
257 worker_threads_
.push_back(handle
);
260 num_threads_cv_
->Broadcast();
264 void PosixDynamicThreadPool::CleanUpThreads() {
265 std::vector
<PlatformThreadHandle
> threads_to_cleanup
;
267 AutoLock
locked(lock_
);
268 DCHECK(has_pending_cleanup_task_
);
269 has_pending_cleanup_task_
= false;
270 threads_to_cleanup
.swap(threads_to_cleanup_
);
272 for (const auto& item
: threads_to_cleanup
)
273 PlatformThread::Join(item
);