Add signalSyncPoint to the WebGraphicsContext3D command buffer impls.
[chromium-blink-merge.git] / cc / base / worker_pool.cc
blob0097ad998878a5d12b05d9af3eab4c7b69ea868e
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "cc/base/worker_pool.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/debug/trace_event.h"
11 #include "base/stringprintf.h"
12 #include "base/synchronization/condition_variable.h"
13 #include "base/threading/simple_thread.h"
15 #if defined(OS_ANDROID)
16 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
17 #include <sys/resource.h>
18 #endif
20 namespace cc {
22 namespace {
24 class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
25 public:
26 WorkerPoolTaskImpl(const WorkerPool::Callback& task,
27 const base::Closure& reply)
28 : internal::WorkerPoolTask(reply),
29 task_(task) {}
31 virtual bool IsCheap() OVERRIDE { return false; }
33 virtual void Run() OVERRIDE {
34 task_.Run();
37 virtual void RunOnThread(unsigned thread_index) OVERRIDE {
38 task_.Run();
41 private:
42 WorkerPool::Callback task_;
45 } // namespace
47 namespace internal {
49 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
52 WorkerPoolTask::~WorkerPoolTask() {
55 void WorkerPoolTask::DidComplete() {
56 reply_.Run();
59 } // namespace internal
61 // Internal to the worker pool. Any data or logic that needs to be
62 // shared between threads lives in this class. All members are guarded
63 // by |lock_|.
64 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
65 public:
66 Inner(WorkerPool* worker_pool,
67 size_t num_threads,
68 const std::string& thread_name_prefix,
69 bool need_on_task_completed_callback);
70 virtual ~Inner();
72 void Shutdown();
74 void PostTask(scoped_ptr<internal::WorkerPoolTask> task, bool signal_workers);
76 // Appends all completed tasks to worker pool's completed tasks queue
77 // and returns true if idle.
78 bool CollectCompletedTasks();
80 // Runs cheap tasks on caller thread until |time_limit| is reached
81 // and returns true if idle.
82 bool RunCheapTasksUntilTimeLimit(base::TimeTicks time_limit);
84 private:
85 // Appends all completed tasks to |completed_tasks|. Lock must
86 // already be acquired before calling this function.
87 bool AppendCompletedTasksWithLockAcquired(
88 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
90 // Schedule a OnTaskCompletedOnOriginThread callback if not already
91 // pending. Lock must already be acquired before calling this function.
92 void ScheduleOnTaskCompletedWithLockAcquired();
93 void OnTaskCompletedOnOriginThread();
95 // Schedule an OnIdleOnOriginThread callback if not already pending.
96 // Lock must already be acquired before calling this function.
97 void ScheduleOnIdleWithLockAcquired();
98 void OnIdleOnOriginThread();
100 // Overridden from base::DelegateSimpleThread:
101 virtual void Run() OVERRIDE;
103 // Pointer to worker pool. Can only be used on origin thread.
104 // Not guarded by |lock_|.
105 WorkerPool* worker_pool_on_origin_thread_;
107 // This lock protects all members of this class except
108 // |worker_pool_on_origin_thread_|. Do not read or modify anything
109 // without holding this lock. Do not block while holding this lock.
110 mutable base::Lock lock_;
112 // Condition variable that is waited on by worker threads until new
113 // tasks are posted or shutdown starts.
114 base::ConditionVariable has_pending_tasks_cv_;
116 // Target message loop used for posting callbacks.
117 scoped_refptr<base::MessageLoopProxy> origin_loop_;
119 base::WeakPtrFactory<Inner> weak_ptr_factory_;
121 // Set to true when worker pool requires a callback for each
122 // completed task.
123 bool need_on_task_completed_callback_;
125 const base::Closure on_task_completed_callback_;
126 // Set when a OnTaskCompletedOnOriginThread() callback is pending.
127 bool on_task_completed_pending_;
129 const base::Closure on_idle_callback_;
130 // Set when a OnIdleOnOriginThread() callback is pending.
131 bool on_idle_pending_;
133 // Provides each running thread loop with a unique index. First thread
134 // loop index is 0.
135 unsigned next_thread_index_;
137 // Number of tasks currently running.
138 unsigned running_task_count_;
140 // Set during shutdown. Tells workers to exit when no more tasks
141 // are pending.
142 bool shutdown_;
144 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
145 TaskDeque pending_tasks_;
146 TaskDeque completed_tasks_;
148 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
150 DISALLOW_COPY_AND_ASSIGN(Inner);
153 WorkerPool::Inner::Inner(WorkerPool* worker_pool,
154 size_t num_threads,
155 const std::string& thread_name_prefix,
156 bool need_on_task_completed_callback)
157 : worker_pool_on_origin_thread_(worker_pool),
158 lock_(),
159 has_pending_tasks_cv_(&lock_),
160 origin_loop_(base::MessageLoopProxy::current()),
161 weak_ptr_factory_(this),
162 need_on_task_completed_callback_(need_on_task_completed_callback),
163 on_task_completed_callback_(
164 base::Bind(&WorkerPool::Inner::OnTaskCompletedOnOriginThread,
165 weak_ptr_factory_.GetWeakPtr())),
166 on_task_completed_pending_(false),
167 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
168 weak_ptr_factory_.GetWeakPtr())),
169 on_idle_pending_(false),
170 next_thread_index_(0),
171 running_task_count_(0),
172 shutdown_(false) {
173 base::AutoLock lock(lock_);
175 while (workers_.size() < num_threads) {
176 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
177 new base::DelegateSimpleThread(
178 this,
179 thread_name_prefix +
180 base::StringPrintf(
181 "Worker%lu",
182 static_cast<unsigned long>(workers_.size() + 1)).c_str()));
183 worker->Start();
184 workers_.push_back(worker.Pass());
188 WorkerPool::Inner::~Inner() {
189 base::AutoLock lock(lock_);
191 DCHECK(shutdown_);
193 // Cancel all pending callbacks.
194 weak_ptr_factory_.InvalidateWeakPtrs();
196 DCHECK_EQ(0u, pending_tasks_.size());
197 DCHECK_EQ(0u, completed_tasks_.size());
198 DCHECK_EQ(0u, running_task_count_);
201 void WorkerPool::Inner::Shutdown() {
203 base::AutoLock lock(lock_);
205 DCHECK(!shutdown_);
206 shutdown_ = true;
208 // Wake up a worker so it knows it should exit. This will cause all workers
209 // to exit as each will wake up another worker before exiting.
210 has_pending_tasks_cv_.Signal();
213 while (workers_.size()) {
214 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
215 worker->Join();
219 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task,
220 bool signal_workers) {
221 base::AutoLock lock(lock_);
223 pending_tasks_.push_back(task.Pass());
225 // There is more work available, so wake up worker thread.
226 if (signal_workers)
227 has_pending_tasks_cv_.Signal();
230 bool WorkerPool::Inner::CollectCompletedTasks() {
231 base::AutoLock lock(lock_);
233 return AppendCompletedTasksWithLockAcquired(
234 &worker_pool_on_origin_thread_->completed_tasks_);
237 bool WorkerPool::Inner::RunCheapTasksUntilTimeLimit(
238 base::TimeTicks time_limit) {
239 base::AutoLock lock(lock_);
241 while (base::TimeTicks::Now() < time_limit) {
242 scoped_ptr<internal::WorkerPoolTask> task;
244 // Find next cheap task.
245 for (TaskDeque::iterator iter = pending_tasks_.begin();
246 iter != pending_tasks_.end(); ++iter) {
247 if ((*iter)->IsCheap()) {
248 task = pending_tasks_.take(iter);
249 break;
253 if (!task) {
254 // Schedule an idle callback if requested and not pending.
255 if (!running_task_count_ && pending_tasks_.empty())
256 ScheduleOnIdleWithLockAcquired();
258 // Exit when no more cheap tasks are pending.
259 break;
262 // Increment |running_task_count_| before starting to run task.
263 running_task_count_++;
266 base::AutoUnlock unlock(lock_);
268 task->Run();
270 // Append tasks directly to worker pool's completed tasks queue.
271 worker_pool_on_origin_thread_->completed_tasks_.push_back(task.Pass());
272 if (need_on_task_completed_callback_)
273 worker_pool_on_origin_thread_->OnTaskCompleted();
276 // Decrement |running_task_count_| now that we are done running task.
277 running_task_count_--;
280 if (!pending_tasks_.empty())
281 has_pending_tasks_cv_.Signal();
283 // Append any other completed tasks before releasing lock.
284 return AppendCompletedTasksWithLockAcquired(
285 &worker_pool_on_origin_thread_->completed_tasks_);
288 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
289 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
290 lock_.AssertAcquired();
292 while (completed_tasks_.size())
293 completed_tasks->push_back(completed_tasks_.take_front().Pass());
295 return !running_task_count_ && pending_tasks_.empty();
298 void WorkerPool::Inner::ScheduleOnTaskCompletedWithLockAcquired() {
299 lock_.AssertAcquired();
301 if (on_task_completed_pending_ || !need_on_task_completed_callback_)
302 return;
303 origin_loop_->PostTask(FROM_HERE, on_task_completed_callback_);
304 on_task_completed_pending_ = true;
307 void WorkerPool::Inner::OnTaskCompletedOnOriginThread() {
309 base::AutoLock lock(lock_);
311 DCHECK(on_task_completed_pending_);
312 on_task_completed_pending_ = false;
314 AppendCompletedTasksWithLockAcquired(
315 &worker_pool_on_origin_thread_->completed_tasks_);
318 worker_pool_on_origin_thread_->OnTaskCompleted();
321 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
322 lock_.AssertAcquired();
324 if (on_idle_pending_)
325 return;
326 origin_loop_->PostTask(FROM_HERE, on_idle_callback_);
327 on_idle_pending_ = true;
330 void WorkerPool::Inner::OnIdleOnOriginThread() {
332 base::AutoLock lock(lock_);
334 DCHECK(on_idle_pending_);
335 on_idle_pending_ = false;
337 // Early out if no longer idle.
338 if (running_task_count_ || !pending_tasks_.empty())
339 return;
341 AppendCompletedTasksWithLockAcquired(
342 &worker_pool_on_origin_thread_->completed_tasks_);
345 worker_pool_on_origin_thread_->OnIdle();
348 void WorkerPool::Inner::Run() {
349 #if defined(OS_ANDROID)
350 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
351 int nice_value = 10; // Idle priority.
352 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value);
353 #endif
355 base::AutoLock lock(lock_);
357 // Get a unique thread index.
358 int thread_index = next_thread_index_++;
360 while (true) {
361 if (pending_tasks_.empty()) {
362 // Exit when shutdown is set and no more tasks are pending.
363 if (shutdown_)
364 break;
366 // Schedule an idle callback if requested and not pending.
367 if (!running_task_count_)
368 ScheduleOnIdleWithLockAcquired();
370 // Wait for new pending tasks.
371 has_pending_tasks_cv_.Wait();
372 continue;
375 // Get next task.
376 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
378 // Increment |running_task_count_| before starting to run task.
379 running_task_count_++;
381 // There may be more work available, so wake up another
382 // worker thread.
383 has_pending_tasks_cv_.Signal();
386 base::AutoUnlock unlock(lock_);
388 task->RunOnThread(thread_index);
391 completed_tasks_.push_back(task.Pass());
393 // Decrement |running_task_count_| now that we are done running task.
394 running_task_count_--;
396 // Schedule a task completed callback if requested and not pending.
397 ScheduleOnTaskCompletedWithLockAcquired();
400 // We noticed we should exit. Wake up the next worker so it knows it should
401 // exit as well (because the Shutdown() code only signals once).
402 has_pending_tasks_cv_.Signal();
406 WorkerPool::WorkerPool(WorkerPoolClient* client,
407 size_t num_threads,
408 base::TimeDelta check_for_completed_tasks_delay,
409 const std::string& thread_name_prefix)
410 : client_(client),
411 origin_loop_(base::MessageLoopProxy::current()),
412 weak_ptr_factory_(this),
413 check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
414 check_for_completed_tasks_pending_(false),
415 run_cheap_tasks_callback_(
416 base::Bind(&WorkerPool::RunCheapTasks,
417 weak_ptr_factory_.GetWeakPtr())),
418 run_cheap_tasks_pending_(false),
419 inner_(make_scoped_ptr(
420 new Inner(
421 this,
422 num_threads,
423 thread_name_prefix,
424 // Request OnTaskCompleted() callback when check
425 // for completed tasks delay is 0.
426 check_for_completed_tasks_delay == base::TimeDelta()))) {
429 WorkerPool::~WorkerPool() {
430 Shutdown();
432 // Cancel all pending callbacks.
433 weak_ptr_factory_.InvalidateWeakPtrs();
435 DCHECK_EQ(0u, completed_tasks_.size());
438 void WorkerPool::Shutdown() {
439 inner_->Shutdown();
440 DispatchCompletionCallbacks();
443 void WorkerPool::PostTaskAndReply(
444 const Callback& task, const base::Closure& reply) {
445 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
446 task,
447 reply)).PassAs<internal::WorkerPoolTask>());
450 void WorkerPool::SetRunCheapTasksTimeLimit(
451 base::TimeTicks run_cheap_tasks_time_limit) {
452 run_cheap_tasks_time_limit_ = run_cheap_tasks_time_limit;
453 ScheduleRunCheapTasks();
456 void WorkerPool::OnIdle() {
457 TRACE_EVENT0("cc", "WorkerPool::OnIdle");
459 DispatchCompletionCallbacks();
462 void WorkerPool::OnTaskCompleted() {
463 TRACE_EVENT0("cc", "WorkerPool::OnTaskCompleted");
465 DispatchCompletionCallbacks();
468 void WorkerPool::ScheduleCheckForCompletedTasks() {
469 if (check_for_completed_tasks_pending_ ||
470 check_for_completed_tasks_delay_ == base::TimeDelta())
471 return;
472 check_for_completed_tasks_callback_.Reset(
473 base::Bind(&WorkerPool::CheckForCompletedTasks,
474 weak_ptr_factory_.GetWeakPtr()));
475 check_for_completed_tasks_time_ = base::TimeTicks::Now() +
476 check_for_completed_tasks_delay_;
477 origin_loop_->PostDelayedTask(
478 FROM_HERE,
479 check_for_completed_tasks_callback_.callback(),
480 check_for_completed_tasks_delay_);
481 check_for_completed_tasks_pending_ = true;
484 void WorkerPool::CheckForCompletedTasks() {
485 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
486 DCHECK(check_for_completed_tasks_pending_);
487 check_for_completed_tasks_pending_ = false;
489 // Schedule another check for completed tasks if not idle.
490 if (!inner_->CollectCompletedTasks())
491 ScheduleCheckForCompletedTasks();
493 DispatchCompletionCallbacks();
496 void WorkerPool::CancelCheckForCompletedTasks() {
497 if (!check_for_completed_tasks_pending_)
498 return;
500 check_for_completed_tasks_callback_.Cancel();
501 check_for_completed_tasks_pending_ = false;
504 void WorkerPool::DispatchCompletionCallbacks() {
505 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
507 if (completed_tasks_.empty())
508 return;
510 while (completed_tasks_.size()) {
511 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
512 task->DidComplete();
515 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
518 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
519 bool signal_workers = true;
520 if (task->IsCheap()) {
521 // To make cheap tasks more likely to run on the origin thread, don't wake
522 // workers when posting them.
523 signal_workers = false;
524 ScheduleRunCheapTasks();
527 // Schedule check for completed tasks if not pending.
528 ScheduleCheckForCompletedTasks();
530 inner_->PostTask(task.Pass(), signal_workers);
533 void WorkerPool::ScheduleRunCheapTasks() {
534 if (run_cheap_tasks_pending_)
535 return;
536 origin_loop_->PostTask(FROM_HERE, run_cheap_tasks_callback_);
537 run_cheap_tasks_pending_ = true;
540 void WorkerPool::RunCheapTasks() {
541 TRACE_EVENT0("cc", "WorkerPool::RunCheapTasks");
542 DCHECK(run_cheap_tasks_pending_);
543 run_cheap_tasks_pending_ = false;
545 while (true) {
546 base::TimeTicks time_limit = run_cheap_tasks_time_limit_;
548 if (!check_for_completed_tasks_time_.is_null())
549 time_limit = std::min(time_limit, check_for_completed_tasks_time_);
551 bool is_idle = inner_->RunCheapTasksUntilTimeLimit(time_limit);
553 base::TimeTicks now = base::TimeTicks::Now();
554 if (now >= run_cheap_tasks_time_limit_) {
555 TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks out of time",
556 TRACE_EVENT_SCOPE_THREAD);
557 break;
560 // We must be out of cheap tasks if this happens.
561 if (!check_for_completed_tasks_pending_ ||
562 now < check_for_completed_tasks_time_)
563 break;
565 TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks check time",
566 TRACE_EVENT_SCOPE_THREAD);
567 CancelCheckForCompletedTasks();
568 DispatchCompletionCallbacks();
569 // Schedule another check for completed tasks if not idle.
570 if (!is_idle)
571 ScheduleCheckForCompletedTasks();
575 } // namespace cc