1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "cc/base/worker_pool.h"
10 #include "base/debug/trace_event.h"
11 #include "base/stringprintf.h"
12 #include "base/synchronization/condition_variable.h"
13 #include "base/threading/simple_thread.h"
15 #if defined(OS_ANDROID)
16 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
17 #include <sys/resource.h>
24 class WorkerPoolTaskImpl
: public internal::WorkerPoolTask
{
26 WorkerPoolTaskImpl(const WorkerPool::Callback
& task
,
27 const base::Closure
& reply
)
28 : internal::WorkerPoolTask(reply
),
31 virtual bool IsCheap() OVERRIDE
{ return false; }
33 virtual void Run() OVERRIDE
{
37 virtual void RunOnThread(unsigned thread_index
) OVERRIDE
{
42 WorkerPool::Callback task_
;
49 WorkerPoolTask::WorkerPoolTask(const base::Closure
& reply
) : reply_(reply
) {
52 WorkerPoolTask::~WorkerPoolTask() {
55 void WorkerPoolTask::DidComplete() {
59 } // namespace internal
61 // Internal to the worker pool. Any data or logic that needs to be
62 // shared between threads lives in this class. All members are guarded
64 class WorkerPool::Inner
: public base::DelegateSimpleThread::Delegate
{
66 Inner(WorkerPool
* worker_pool
,
68 const std::string
& thread_name_prefix
,
69 bool need_on_task_completed_callback
);
74 void PostTask(scoped_ptr
<internal::WorkerPoolTask
> task
, bool signal_workers
);
76 // Appends all completed tasks to worker pool's completed tasks queue
77 // and returns true if idle.
78 bool CollectCompletedTasks();
80 // Runs cheap tasks on caller thread until |time_limit| is reached
81 // and returns true if idle.
82 bool RunCheapTasksUntilTimeLimit(base::TimeTicks time_limit
);
85 // Appends all completed tasks to |completed_tasks|. Lock must
86 // already be acquired before calling this function.
87 bool AppendCompletedTasksWithLockAcquired(
88 ScopedPtrDeque
<internal::WorkerPoolTask
>* completed_tasks
);
90 // Schedule a OnTaskCompletedOnOriginThread callback if not already
91 // pending. Lock must already be acquired before calling this function.
92 void ScheduleOnTaskCompletedWithLockAcquired();
93 void OnTaskCompletedOnOriginThread();
95 // Schedule an OnIdleOnOriginThread callback if not already pending.
96 // Lock must already be acquired before calling this function.
97 void ScheduleOnIdleWithLockAcquired();
98 void OnIdleOnOriginThread();
100 // Overridden from base::DelegateSimpleThread:
101 virtual void Run() OVERRIDE
;
103 // Pointer to worker pool. Can only be used on origin thread.
104 // Not guarded by |lock_|.
105 WorkerPool
* worker_pool_on_origin_thread_
;
107 // This lock protects all members of this class except
108 // |worker_pool_on_origin_thread_|. Do not read or modify anything
109 // without holding this lock. Do not block while holding this lock.
110 mutable base::Lock lock_
;
112 // Condition variable that is waited on by worker threads until new
113 // tasks are posted or shutdown starts.
114 base::ConditionVariable has_pending_tasks_cv_
;
116 // Target message loop used for posting callbacks.
117 scoped_refptr
<base::MessageLoopProxy
> origin_loop_
;
119 base::WeakPtrFactory
<Inner
> weak_ptr_factory_
;
121 // Set to true when worker pool requires a callback for each
123 bool need_on_task_completed_callback_
;
125 const base::Closure on_task_completed_callback_
;
126 // Set when a OnTaskCompletedOnOriginThread() callback is pending.
127 bool on_task_completed_pending_
;
129 const base::Closure on_idle_callback_
;
130 // Set when a OnIdleOnOriginThread() callback is pending.
131 bool on_idle_pending_
;
133 // Provides each running thread loop with a unique index. First thread
135 unsigned next_thread_index_
;
137 // Number of tasks currently running.
138 unsigned running_task_count_
;
140 // Set during shutdown. Tells workers to exit when no more tasks
144 typedef ScopedPtrDeque
<internal::WorkerPoolTask
> TaskDeque
;
145 TaskDeque pending_tasks_
;
146 TaskDeque completed_tasks_
;
148 ScopedPtrDeque
<base::DelegateSimpleThread
> workers_
;
150 DISALLOW_COPY_AND_ASSIGN(Inner
);
153 WorkerPool::Inner::Inner(WorkerPool
* worker_pool
,
155 const std::string
& thread_name_prefix
,
156 bool need_on_task_completed_callback
)
157 : worker_pool_on_origin_thread_(worker_pool
),
159 has_pending_tasks_cv_(&lock_
),
160 origin_loop_(base::MessageLoopProxy::current()),
161 weak_ptr_factory_(this),
162 need_on_task_completed_callback_(need_on_task_completed_callback
),
163 on_task_completed_callback_(
164 base::Bind(&WorkerPool::Inner::OnTaskCompletedOnOriginThread
,
165 weak_ptr_factory_
.GetWeakPtr())),
166 on_task_completed_pending_(false),
167 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread
,
168 weak_ptr_factory_
.GetWeakPtr())),
169 on_idle_pending_(false),
170 next_thread_index_(0),
171 running_task_count_(0),
173 base::AutoLock
lock(lock_
);
175 while (workers_
.size() < num_threads
) {
176 scoped_ptr
<base::DelegateSimpleThread
> worker
= make_scoped_ptr(
177 new base::DelegateSimpleThread(
182 static_cast<unsigned long>(workers_
.size() + 1)).c_str()));
184 workers_
.push_back(worker
.Pass());
188 WorkerPool::Inner::~Inner() {
189 base::AutoLock
lock(lock_
);
193 // Cancel all pending callbacks.
194 weak_ptr_factory_
.InvalidateWeakPtrs();
196 DCHECK_EQ(0u, pending_tasks_
.size());
197 DCHECK_EQ(0u, completed_tasks_
.size());
198 DCHECK_EQ(0u, running_task_count_
);
201 void WorkerPool::Inner::Shutdown() {
203 base::AutoLock
lock(lock_
);
208 // Wake up a worker so it knows it should exit. This will cause all workers
209 // to exit as each will wake up another worker before exiting.
210 has_pending_tasks_cv_
.Signal();
213 while (workers_
.size()) {
214 scoped_ptr
<base::DelegateSimpleThread
> worker
= workers_
.take_front();
219 void WorkerPool::Inner::PostTask(scoped_ptr
<internal::WorkerPoolTask
> task
,
220 bool signal_workers
) {
221 base::AutoLock
lock(lock_
);
223 pending_tasks_
.push_back(task
.Pass());
225 // There is more work available, so wake up worker thread.
227 has_pending_tasks_cv_
.Signal();
230 bool WorkerPool::Inner::CollectCompletedTasks() {
231 base::AutoLock
lock(lock_
);
233 return AppendCompletedTasksWithLockAcquired(
234 &worker_pool_on_origin_thread_
->completed_tasks_
);
237 bool WorkerPool::Inner::RunCheapTasksUntilTimeLimit(
238 base::TimeTicks time_limit
) {
239 base::AutoLock
lock(lock_
);
241 while (base::TimeTicks::Now() < time_limit
) {
242 scoped_ptr
<internal::WorkerPoolTask
> task
;
244 // Find next cheap task.
245 for (TaskDeque::iterator iter
= pending_tasks_
.begin();
246 iter
!= pending_tasks_
.end(); ++iter
) {
247 if ((*iter
)->IsCheap()) {
248 task
= pending_tasks_
.take(iter
);
254 // Schedule an idle callback if requested and not pending.
255 if (!running_task_count_
&& pending_tasks_
.empty())
256 ScheduleOnIdleWithLockAcquired();
258 // Exit when no more cheap tasks are pending.
262 // Increment |running_task_count_| before starting to run task.
263 running_task_count_
++;
266 base::AutoUnlock
unlock(lock_
);
270 // Append tasks directly to worker pool's completed tasks queue.
271 worker_pool_on_origin_thread_
->completed_tasks_
.push_back(task
.Pass());
272 if (need_on_task_completed_callback_
)
273 worker_pool_on_origin_thread_
->OnTaskCompleted();
276 // Decrement |running_task_count_| now that we are done running task.
277 running_task_count_
--;
280 if (!pending_tasks_
.empty())
281 has_pending_tasks_cv_
.Signal();
283 // Append any other completed tasks before releasing lock.
284 return AppendCompletedTasksWithLockAcquired(
285 &worker_pool_on_origin_thread_
->completed_tasks_
);
288 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
289 ScopedPtrDeque
<internal::WorkerPoolTask
>* completed_tasks
) {
290 lock_
.AssertAcquired();
292 while (completed_tasks_
.size())
293 completed_tasks
->push_back(completed_tasks_
.take_front().Pass());
295 return !running_task_count_
&& pending_tasks_
.empty();
298 void WorkerPool::Inner::ScheduleOnTaskCompletedWithLockAcquired() {
299 lock_
.AssertAcquired();
301 if (on_task_completed_pending_
|| !need_on_task_completed_callback_
)
303 origin_loop_
->PostTask(FROM_HERE
, on_task_completed_callback_
);
304 on_task_completed_pending_
= true;
307 void WorkerPool::Inner::OnTaskCompletedOnOriginThread() {
309 base::AutoLock
lock(lock_
);
311 DCHECK(on_task_completed_pending_
);
312 on_task_completed_pending_
= false;
314 AppendCompletedTasksWithLockAcquired(
315 &worker_pool_on_origin_thread_
->completed_tasks_
);
318 worker_pool_on_origin_thread_
->OnTaskCompleted();
321 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
322 lock_
.AssertAcquired();
324 if (on_idle_pending_
)
326 origin_loop_
->PostTask(FROM_HERE
, on_idle_callback_
);
327 on_idle_pending_
= true;
330 void WorkerPool::Inner::OnIdleOnOriginThread() {
332 base::AutoLock
lock(lock_
);
334 DCHECK(on_idle_pending_
);
335 on_idle_pending_
= false;
337 // Early out if no longer idle.
338 if (running_task_count_
|| !pending_tasks_
.empty())
341 AppendCompletedTasksWithLockAcquired(
342 &worker_pool_on_origin_thread_
->completed_tasks_
);
345 worker_pool_on_origin_thread_
->OnIdle();
348 void WorkerPool::Inner::Run() {
349 #if defined(OS_ANDROID)
350 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
351 int nice_value
= 10; // Idle priority.
352 setpriority(PRIO_PROCESS
, base::PlatformThread::CurrentId(), nice_value
);
355 base::AutoLock
lock(lock_
);
357 // Get a unique thread index.
358 int thread_index
= next_thread_index_
++;
361 if (pending_tasks_
.empty()) {
362 // Exit when shutdown is set and no more tasks are pending.
366 // Schedule an idle callback if requested and not pending.
367 if (!running_task_count_
)
368 ScheduleOnIdleWithLockAcquired();
370 // Wait for new pending tasks.
371 has_pending_tasks_cv_
.Wait();
376 scoped_ptr
<internal::WorkerPoolTask
> task
= pending_tasks_
.take_front();
378 // Increment |running_task_count_| before starting to run task.
379 running_task_count_
++;
381 // There may be more work available, so wake up another
383 has_pending_tasks_cv_
.Signal();
386 base::AutoUnlock
unlock(lock_
);
388 task
->RunOnThread(thread_index
);
391 completed_tasks_
.push_back(task
.Pass());
393 // Decrement |running_task_count_| now that we are done running task.
394 running_task_count_
--;
396 // Schedule a task completed callback if requested and not pending.
397 ScheduleOnTaskCompletedWithLockAcquired();
400 // We noticed we should exit. Wake up the next worker so it knows it should
401 // exit as well (because the Shutdown() code only signals once).
402 has_pending_tasks_cv_
.Signal();
406 WorkerPool::WorkerPool(WorkerPoolClient
* client
,
408 base::TimeDelta check_for_completed_tasks_delay
,
409 const std::string
& thread_name_prefix
)
411 origin_loop_(base::MessageLoopProxy::current()),
412 weak_ptr_factory_(this),
413 check_for_completed_tasks_delay_(check_for_completed_tasks_delay
),
414 check_for_completed_tasks_pending_(false),
415 run_cheap_tasks_callback_(
416 base::Bind(&WorkerPool::RunCheapTasks
,
417 weak_ptr_factory_
.GetWeakPtr())),
418 run_cheap_tasks_pending_(false),
419 inner_(make_scoped_ptr(
424 // Request OnTaskCompleted() callback when check
425 // for completed tasks delay is 0.
426 check_for_completed_tasks_delay
== base::TimeDelta()))) {
429 WorkerPool::~WorkerPool() {
432 // Cancel all pending callbacks.
433 weak_ptr_factory_
.InvalidateWeakPtrs();
435 DCHECK_EQ(0u, completed_tasks_
.size());
438 void WorkerPool::Shutdown() {
440 DispatchCompletionCallbacks();
443 void WorkerPool::PostTaskAndReply(
444 const Callback
& task
, const base::Closure
& reply
) {
445 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
447 reply
)).PassAs
<internal::WorkerPoolTask
>());
450 void WorkerPool::SetRunCheapTasksTimeLimit(
451 base::TimeTicks run_cheap_tasks_time_limit
) {
452 run_cheap_tasks_time_limit_
= run_cheap_tasks_time_limit
;
453 ScheduleRunCheapTasks();
456 void WorkerPool::OnIdle() {
457 TRACE_EVENT0("cc", "WorkerPool::OnIdle");
459 DispatchCompletionCallbacks();
462 void WorkerPool::OnTaskCompleted() {
463 TRACE_EVENT0("cc", "WorkerPool::OnTaskCompleted");
465 DispatchCompletionCallbacks();
468 void WorkerPool::ScheduleCheckForCompletedTasks() {
469 if (check_for_completed_tasks_pending_
||
470 check_for_completed_tasks_delay_
== base::TimeDelta())
472 check_for_completed_tasks_callback_
.Reset(
473 base::Bind(&WorkerPool::CheckForCompletedTasks
,
474 weak_ptr_factory_
.GetWeakPtr()));
475 check_for_completed_tasks_time_
= base::TimeTicks::Now() +
476 check_for_completed_tasks_delay_
;
477 origin_loop_
->PostDelayedTask(
479 check_for_completed_tasks_callback_
.callback(),
480 check_for_completed_tasks_delay_
);
481 check_for_completed_tasks_pending_
= true;
484 void WorkerPool::CheckForCompletedTasks() {
485 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
486 DCHECK(check_for_completed_tasks_pending_
);
487 check_for_completed_tasks_pending_
= false;
489 // Schedule another check for completed tasks if not idle.
490 if (!inner_
->CollectCompletedTasks())
491 ScheduleCheckForCompletedTasks();
493 DispatchCompletionCallbacks();
496 void WorkerPool::CancelCheckForCompletedTasks() {
497 if (!check_for_completed_tasks_pending_
)
500 check_for_completed_tasks_callback_
.Cancel();
501 check_for_completed_tasks_pending_
= false;
504 void WorkerPool::DispatchCompletionCallbacks() {
505 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
507 if (completed_tasks_
.empty())
510 while (completed_tasks_
.size()) {
511 scoped_ptr
<internal::WorkerPoolTask
> task
= completed_tasks_
.take_front();
515 client_
->DidFinishDispatchingWorkerPoolCompletionCallbacks();
518 void WorkerPool::PostTask(scoped_ptr
<internal::WorkerPoolTask
> task
) {
519 bool signal_workers
= true;
520 if (task
->IsCheap()) {
521 // To make cheap tasks more likely to run on the origin thread, don't wake
522 // workers when posting them.
523 signal_workers
= false;
524 ScheduleRunCheapTasks();
527 // Schedule check for completed tasks if not pending.
528 ScheduleCheckForCompletedTasks();
530 inner_
->PostTask(task
.Pass(), signal_workers
);
533 void WorkerPool::ScheduleRunCheapTasks() {
534 if (run_cheap_tasks_pending_
)
536 origin_loop_
->PostTask(FROM_HERE
, run_cheap_tasks_callback_
);
537 run_cheap_tasks_pending_
= true;
540 void WorkerPool::RunCheapTasks() {
541 TRACE_EVENT0("cc", "WorkerPool::RunCheapTasks");
542 DCHECK(run_cheap_tasks_pending_
);
543 run_cheap_tasks_pending_
= false;
546 base::TimeTicks time_limit
= run_cheap_tasks_time_limit_
;
548 if (!check_for_completed_tasks_time_
.is_null())
549 time_limit
= std::min(time_limit
, check_for_completed_tasks_time_
);
551 bool is_idle
= inner_
->RunCheapTasksUntilTimeLimit(time_limit
);
553 base::TimeTicks now
= base::TimeTicks::Now();
554 if (now
>= run_cheap_tasks_time_limit_
) {
555 TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks out of time",
556 TRACE_EVENT_SCOPE_THREAD
);
560 // We must be out of cheap tasks if this happens.
561 if (!check_for_completed_tasks_pending_
||
562 now
< check_for_completed_tasks_time_
)
565 TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks check time",
566 TRACE_EVENT_SCOPE_THREAD
);
567 CancelCheckForCompletedTasks();
568 DispatchCompletionCallbacks();
569 // Schedule another check for completed tasks if not idle.
571 ScheduleCheckForCompletedTasks();