Roll src/third_party/WebKit d9c6159:8139f33 (svn 201974:201975)
[chromium-blink-merge.git] / base / threading / sequenced_worker_pool.cc
blob54a6bc8245d48d2017b921c5a7b8ad91b5146882
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
7 #include <list>
8 #include <map>
9 #include <set>
10 #include <utility>
11 #include <vector>
13 #include "base/atomic_sequence_num.h"
14 #include "base/callback.h"
15 #include "base/compiler_specific.h"
16 #include "base/critical_closure.h"
17 #include "base/lazy_instance.h"
18 #include "base/logging.h"
19 #include "base/memory/linked_ptr.h"
20 #include "base/stl_util.h"
21 #include "base/strings/stringprintf.h"
22 #include "base/synchronization/condition_variable.h"
23 #include "base/synchronization/lock.h"
24 #include "base/thread_task_runner_handle.h"
25 #include "base/threading/platform_thread.h"
26 #include "base/threading/simple_thread.h"
27 #include "base/threading/thread_local.h"
28 #include "base/threading/thread_restrictions.h"
29 #include "base/time/time.h"
30 #include "base/trace_event/trace_event.h"
31 #include "base/tracked_objects.h"
33 #if defined(OS_MACOSX)
34 #include "base/mac/scoped_nsautorelease_pool.h"
35 #elif defined(OS_WIN)
36 #include "base/win/scoped_com_initializer.h"
37 #endif
39 #if !defined(OS_NACL)
40 #include "base/metrics/histogram.h"
41 #endif
43 namespace base {
45 namespace {
47 struct SequencedTask : public TrackingInfo {
48 SequencedTask()
49 : sequence_token_id(0),
50 trace_id(0),
51 sequence_task_number(0),
52 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
54 explicit SequencedTask(const tracked_objects::Location& from_here)
55 : base::TrackingInfo(from_here, TimeTicks()),
56 sequence_token_id(0),
57 trace_id(0),
58 sequence_task_number(0),
59 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
61 ~SequencedTask() {}
63 int sequence_token_id;
64 int trace_id;
65 int64 sequence_task_number;
66 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
67 tracked_objects::Location posted_from;
68 Closure task;
70 // Non-delayed tasks and delayed tasks are managed together by time-to-run
71 // order. We calculate the time by adding the posted time and the given delay.
72 TimeTicks time_to_run;
75 struct SequencedTaskLessThan {
76 public:
77 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
78 if (lhs.time_to_run < rhs.time_to_run)
79 return true;
81 if (lhs.time_to_run > rhs.time_to_run)
82 return false;
84 // If the time happen to match, then we use the sequence number to decide.
85 return lhs.sequence_task_number < rhs.sequence_task_number;
89 // SequencedWorkerPoolTaskRunner ---------------------------------------------
90 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
91 // fixed ShutdownBehavior.
93 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
94 class SequencedWorkerPoolTaskRunner : public TaskRunner {
95 public:
96 SequencedWorkerPoolTaskRunner(
97 const scoped_refptr<SequencedWorkerPool>& pool,
98 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
100 // TaskRunner implementation
101 bool PostDelayedTask(const tracked_objects::Location& from_here,
102 const Closure& task,
103 TimeDelta delay) override;
104 bool RunsTasksOnCurrentThread() const override;
106 private:
107 ~SequencedWorkerPoolTaskRunner() override;
109 const scoped_refptr<SequencedWorkerPool> pool_;
111 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
113 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
116 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
117 const scoped_refptr<SequencedWorkerPool>& pool,
118 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
119 : pool_(pool),
120 shutdown_behavior_(shutdown_behavior) {
123 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
126 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
127 const tracked_objects::Location& from_here,
128 const Closure& task,
129 TimeDelta delay) {
130 if (delay == TimeDelta()) {
131 return pool_->PostWorkerTaskWithShutdownBehavior(
132 from_here, task, shutdown_behavior_);
134 return pool_->PostDelayedWorkerTask(from_here, task, delay);
137 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
138 return pool_->RunsTasksOnCurrentThread();
141 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
142 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
143 // fixed sequence token.
145 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
146 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
147 public:
148 SequencedWorkerPoolSequencedTaskRunner(
149 const scoped_refptr<SequencedWorkerPool>& pool,
150 SequencedWorkerPool::SequenceToken token,
151 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
153 // TaskRunner implementation
154 bool PostDelayedTask(const tracked_objects::Location& from_here,
155 const Closure& task,
156 TimeDelta delay) override;
157 bool RunsTasksOnCurrentThread() const override;
159 // SequencedTaskRunner implementation
160 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
161 const Closure& task,
162 TimeDelta delay) override;
164 private:
165 ~SequencedWorkerPoolSequencedTaskRunner() override;
167 const scoped_refptr<SequencedWorkerPool> pool_;
169 const SequencedWorkerPool::SequenceToken token_;
171 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
173 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
176 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
177 const scoped_refptr<SequencedWorkerPool>& pool,
178 SequencedWorkerPool::SequenceToken token,
179 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
180 : pool_(pool),
181 token_(token),
182 shutdown_behavior_(shutdown_behavior) {
185 SequencedWorkerPoolSequencedTaskRunner::
186 ~SequencedWorkerPoolSequencedTaskRunner() {
189 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
190 const tracked_objects::Location& from_here,
191 const Closure& task,
192 TimeDelta delay) {
193 if (delay == TimeDelta()) {
194 return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
195 token_, from_here, task, shutdown_behavior_);
197 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
200 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
201 return pool_->IsRunningSequenceOnCurrentThread(token_);
204 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
205 const tracked_objects::Location& from_here,
206 const Closure& task,
207 TimeDelta delay) {
208 // There's no way to run nested tasks, so simply forward to
209 // PostDelayedTask.
210 return PostDelayedTask(from_here, task, delay);
213 // Create a process-wide unique ID to represent this task in trace events. This
214 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
215 // with MessageLoop pointers on other processes.
216 uint64 GetTaskTraceID(const SequencedTask& task,
217 void* pool) {
218 return (static_cast<uint64>(task.trace_id) << 32) |
219 static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
222 base::LazyInstance<base::ThreadLocalPointer<
223 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
224 LAZY_INSTANCE_INITIALIZER;
226 } // namespace
228 // Worker ---------------------------------------------------------------------
230 class SequencedWorkerPool::Worker : public SimpleThread {
231 public:
232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
233 // around as long as we are running.
234 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
235 int thread_number,
236 const std::string& thread_name_prefix);
237 ~Worker() override;
239 // SimpleThread implementation. This actually runs the background thread.
240 void Run() override;
242 // Indicates that a task is about to be run. The parameters provide
243 // additional metainformation about the task being run.
244 void set_running_task_info(SequenceToken token,
245 WorkerShutdown shutdown_behavior) {
246 is_processing_task_ = true;
247 task_sequence_token_ = token;
248 task_shutdown_behavior_ = shutdown_behavior;
251 // Indicates that the task has finished running.
252 void reset_running_task_info() { is_processing_task_ = false; }
254 // Whether the worker is processing a task.
255 bool is_processing_task() { return is_processing_task_; }
257 SequenceToken task_sequence_token() const {
258 DCHECK(is_processing_task_);
259 return task_sequence_token_;
262 WorkerShutdown task_shutdown_behavior() const {
263 DCHECK(is_processing_task_);
264 return task_shutdown_behavior_;
267 private:
268 scoped_refptr<SequencedWorkerPool> worker_pool_;
269 // The sequence token of the task being processed. Only valid when
270 // is_processing_task_ is true.
271 SequenceToken task_sequence_token_;
272 // The shutdown behavior of the task being processed. Only valid when
273 // is_processing_task_ is true.
274 WorkerShutdown task_shutdown_behavior_;
275 // Whether the Worker is processing a task.
276 bool is_processing_task_;
278 DISALLOW_COPY_AND_ASSIGN(Worker);
281 // Inner ----------------------------------------------------------------------
283 class SequencedWorkerPool::Inner {
284 public:
285 // Take a raw pointer to |worker| to avoid cycles (since we're owned
286 // by it).
287 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
288 const std::string& thread_name_prefix,
289 TestingObserver* observer);
291 ~Inner();
293 SequenceToken GetSequenceToken();
295 SequenceToken GetNamedSequenceToken(const std::string& name);
297 // This function accepts a name and an ID. If the name is null, the
298 // token ID is used. This allows us to implement the optional name lookup
299 // from a single function without having to enter the lock a separate time.
300 bool PostTask(const std::string* optional_token_name,
301 SequenceToken sequence_token,
302 WorkerShutdown shutdown_behavior,
303 const tracked_objects::Location& from_here,
304 const Closure& task,
305 TimeDelta delay);
307 bool RunsTasksOnCurrentThread() const;
309 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
311 void CleanupForTesting();
313 void SignalHasWorkForTesting();
315 int GetWorkSignalCountForTesting() const;
317 void Shutdown(int max_blocking_tasks_after_shutdown);
319 bool IsShutdownInProgress();
321 // Runs the worker loop on the background thread.
322 void ThreadLoop(Worker* this_worker);
324 private:
325 enum GetWorkStatus {
326 GET_WORK_FOUND,
327 GET_WORK_NOT_FOUND,
328 GET_WORK_WAIT,
331 enum CleanupState {
332 CLEANUP_REQUESTED,
333 CLEANUP_STARTING,
334 CLEANUP_RUNNING,
335 CLEANUP_FINISHING,
336 CLEANUP_DONE,
339 // Called from within the lock, this converts the given token name into a
340 // token ID, creating a new one if necessary.
341 int LockedGetNamedTokenID(const std::string& name);
343 // Called from within the lock, this returns the next sequence task number.
344 int64 LockedGetNextSequenceTaskNumber();
346 // Gets new task. There are 3 cases depending on the return value:
348 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
349 // be run immediately.
350 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
351 // and |task| is not filled in. In this case, the caller should wait until
352 // a task is posted.
353 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
354 // immediately, and |task| is not filled in. Likewise, |wait_time| is
355 // filled in the time to wait until the next task to run. In this case, the
356 // caller should wait the time.
358 // In any case, the calling code should clear the given
359 // delete_these_outside_lock vector the next time the lock is released.
360 // See the implementation for a more detailed description.
361 GetWorkStatus GetWork(SequencedTask* task,
362 TimeDelta* wait_time,
363 std::vector<Closure>* delete_these_outside_lock);
365 void HandleCleanup();
367 // Peforms init and cleanup around running the given task. WillRun...
368 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
369 // The calling code should call FinishStartingAdditionalThread once the
370 // lock is released if the return values is nonzero.
371 int WillRunWorkerTask(const SequencedTask& task);
372 void DidRunWorkerTask(const SequencedTask& task);
374 // Returns true if there are no threads currently running the given
375 // sequence token.
376 bool IsSequenceTokenRunnable(int sequence_token_id) const;
378 // Checks if all threads are busy and the addition of one more could run an
379 // additional task waiting in the queue. This must be called from within
380 // the lock.
382 // If another thread is helpful, this will mark the thread as being in the
383 // process of starting and returns the index of the new thread which will be
384 // 0 or more. The caller should then call FinishStartingAdditionalThread to
385 // complete initialization once the lock is released.
387 // If another thread is not necessary, returne 0;
389 // See the implementedion for more.
390 int PrepareToStartAdditionalThreadIfHelpful();
392 // The second part of thread creation after
393 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
394 // generated. This actually creates the thread and should be called outside
395 // the lock to avoid blocking important work starting a thread in the lock.
396 void FinishStartingAdditionalThread(int thread_number);
398 // Signal |has_work_| and increment |has_work_signal_count_|.
399 void SignalHasWork();
401 // Checks whether there is work left that's blocking shutdown. Must be
402 // called inside the lock.
403 bool CanShutdown() const;
405 SequencedWorkerPool* const worker_pool_;
407 // The last sequence number used. Managed by GetSequenceToken, since this
408 // only does threadsafe increment operations, you do not need to hold the
409 // lock. This is class-static to make SequenceTokens issued by
410 // GetSequenceToken unique across SequencedWorkerPool instances.
411 static base::StaticAtomicSequenceNumber g_last_sequence_number_;
413 // This lock protects |everything in this class|. Do not read or modify
414 // anything without holding this lock. Do not block while holding this
415 // lock.
416 mutable Lock lock_;
418 // Condition variable that is waited on by worker threads until new
419 // tasks are posted or shutdown starts.
420 ConditionVariable has_work_cv_;
422 // Condition variable that is waited on by non-worker threads (in
423 // Shutdown()) until CanShutdown() goes to true.
424 ConditionVariable can_shutdown_cv_;
426 // The maximum number of worker threads we'll create.
427 const size_t max_threads_;
429 const std::string thread_name_prefix_;
431 // Associates all known sequence token names with their IDs.
432 std::map<std::string, int> named_sequence_tokens_;
434 // Owning pointers to all threads we've created so far, indexed by
435 // ID. Since we lazily create threads, this may be less than
436 // max_threads_ and will be initially empty.
437 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
438 ThreadMap threads_;
440 // Set to true when we're in the process of creating another thread.
441 // See PrepareToStartAdditionalThreadIfHelpful for more.
442 bool thread_being_created_;
444 // Number of threads currently waiting for work.
445 size_t waiting_thread_count_;
447 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
448 // or SKIP_ON_SHUTDOWN flag set.
449 size_t blocking_shutdown_thread_count_;
451 // A set of all pending tasks in time-to-run order. These are tasks that are
452 // either waiting for a thread to run on, waiting for their time to run,
453 // or blocked on a previous task in their sequence. We have to iterate over
454 // the tasks by time-to-run order, so we use the set instead of the
455 // traditional priority_queue.
456 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
457 PendingTaskSet pending_tasks_;
459 // The next sequence number for a new sequenced task.
460 int64 next_sequence_task_number_;
462 // Number of tasks in the pending_tasks_ list that are marked as blocking
463 // shutdown.
464 size_t blocking_shutdown_pending_task_count_;
466 // Lists all sequence tokens currently executing.
467 std::set<int> current_sequences_;
469 // An ID for each posted task to distinguish the task from others in traces.
470 int trace_id_;
472 // Set when Shutdown is called and no further tasks should be
473 // allowed, though we may still be running existing tasks.
474 bool shutdown_called_;
476 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
477 // has been called.
478 int max_blocking_tasks_after_shutdown_;
480 // State used to cleanup for testing, all guarded by lock_.
481 CleanupState cleanup_state_;
482 size_t cleanup_idlers_;
483 ConditionVariable cleanup_cv_;
485 TestingObserver* const testing_observer_;
487 DISALLOW_COPY_AND_ASSIGN(Inner);
490 // Worker definitions ---------------------------------------------------------
492 SequencedWorkerPool::Worker::Worker(
493 const scoped_refptr<SequencedWorkerPool>& worker_pool,
494 int thread_number,
495 const std::string& prefix)
496 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
497 worker_pool_(worker_pool),
498 task_shutdown_behavior_(BLOCK_SHUTDOWN),
499 is_processing_task_(false) {
500 Start();
503 SequencedWorkerPool::Worker::~Worker() {
506 void SequencedWorkerPool::Worker::Run() {
507 #if defined(OS_WIN)
508 win::ScopedCOMInitializer com_initializer;
509 #endif
511 // Store a pointer to the running sequence in thread local storage for
512 // static function access.
513 g_lazy_tls_ptr.Get().Set(&task_sequence_token_);
515 // Just jump back to the Inner object to run the thread, since it has all the
516 // tracking information and queues. It might be more natural to implement
517 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
518 // having these worker objects at all, but that method lacks the ability to
519 // send thread-specific information easily to the thread loop.
520 worker_pool_->inner_->ThreadLoop(this);
521 // Release our cyclic reference once we're done.
522 worker_pool_ = NULL;
525 // Inner definitions ---------------------------------------------------------
527 SequencedWorkerPool::Inner::Inner(
528 SequencedWorkerPool* worker_pool,
529 size_t max_threads,
530 const std::string& thread_name_prefix,
531 TestingObserver* observer)
532 : worker_pool_(worker_pool),
533 lock_(),
534 has_work_cv_(&lock_),
535 can_shutdown_cv_(&lock_),
536 max_threads_(max_threads),
537 thread_name_prefix_(thread_name_prefix),
538 thread_being_created_(false),
539 waiting_thread_count_(0),
540 blocking_shutdown_thread_count_(0),
541 next_sequence_task_number_(0),
542 blocking_shutdown_pending_task_count_(0),
543 trace_id_(0),
544 shutdown_called_(false),
545 max_blocking_tasks_after_shutdown_(0),
546 cleanup_state_(CLEANUP_DONE),
547 cleanup_idlers_(0),
548 cleanup_cv_(&lock_),
549 testing_observer_(observer) {}
551 SequencedWorkerPool::Inner::~Inner() {
552 // You must call Shutdown() before destroying the pool.
553 DCHECK(shutdown_called_);
555 // Need to explicitly join with the threads before they're destroyed or else
556 // they will be running when our object is half torn down.
557 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
558 it->second->Join();
559 threads_.clear();
561 if (testing_observer_)
562 testing_observer_->OnDestruct();
565 SequencedWorkerPool::SequenceToken
566 SequencedWorkerPool::Inner::GetSequenceToken() {
567 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
568 // is used as a sentinel value in SequenceTokens.
569 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
572 SequencedWorkerPool::SequenceToken
573 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
574 AutoLock lock(lock_);
575 return SequenceToken(LockedGetNamedTokenID(name));
578 bool SequencedWorkerPool::Inner::PostTask(
579 const std::string* optional_token_name,
580 SequenceToken sequence_token,
581 WorkerShutdown shutdown_behavior,
582 const tracked_objects::Location& from_here,
583 const Closure& task,
584 TimeDelta delay) {
585 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
586 SequencedTask sequenced(from_here);
587 sequenced.sequence_token_id = sequence_token.id_;
588 sequenced.shutdown_behavior = shutdown_behavior;
589 sequenced.posted_from = from_here;
590 sequenced.task =
591 shutdown_behavior == BLOCK_SHUTDOWN ?
592 base::MakeCriticalClosure(task) : task;
593 sequenced.time_to_run = TimeTicks::Now() + delay;
595 int create_thread_id = 0;
597 AutoLock lock(lock_);
598 if (shutdown_called_) {
599 // Don't allow a new task to be posted if it doesn't block shutdown.
600 if (shutdown_behavior != BLOCK_SHUTDOWN)
601 return false;
603 // If the current thread is running a task, and that task doesn't block
604 // shutdown, then it shouldn't be allowed to post any more tasks.
605 ThreadMap::const_iterator found =
606 threads_.find(PlatformThread::CurrentId());
607 if (found != threads_.end() && found->second->is_processing_task() &&
608 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
609 return false;
612 if (max_blocking_tasks_after_shutdown_ <= 0) {
613 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
614 return false;
616 max_blocking_tasks_after_shutdown_ -= 1;
619 // The trace_id is used for identifying the task in about:tracing.
620 sequenced.trace_id = trace_id_++;
622 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
623 "SequencedWorkerPool::Inner::PostTask",
624 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
625 TRACE_EVENT_FLAG_FLOW_OUT);
627 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
629 // Now that we have the lock, apply the named token rules.
630 if (optional_token_name)
631 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
633 pending_tasks_.insert(sequenced);
634 if (shutdown_behavior == BLOCK_SHUTDOWN)
635 blocking_shutdown_pending_task_count_++;
637 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
640 // Actually start the additional thread or signal an existing one now that
641 // we're outside the lock.
642 if (create_thread_id)
643 FinishStartingAdditionalThread(create_thread_id);
644 else
645 SignalHasWork();
647 return true;
650 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
651 AutoLock lock(lock_);
652 return ContainsKey(threads_, PlatformThread::CurrentId());
655 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
656 SequenceToken sequence_token) const {
657 AutoLock lock(lock_);
658 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
659 if (found == threads_.end())
660 return false;
661 return found->second->is_processing_task() &&
662 sequence_token.Equals(found->second->task_sequence_token());
665 // See https://code.google.com/p/chromium/issues/detail?id=168415
666 void SequencedWorkerPool::Inner::CleanupForTesting() {
667 DCHECK(!RunsTasksOnCurrentThread());
668 base::ThreadRestrictions::ScopedAllowWait allow_wait;
669 AutoLock lock(lock_);
670 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
671 if (shutdown_called_)
672 return;
673 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
674 return;
675 cleanup_state_ = CLEANUP_REQUESTED;
676 cleanup_idlers_ = 0;
677 has_work_cv_.Signal();
678 while (cleanup_state_ != CLEANUP_DONE)
679 cleanup_cv_.Wait();
682 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
683 SignalHasWork();
686 void SequencedWorkerPool::Inner::Shutdown(
687 int max_new_blocking_tasks_after_shutdown) {
688 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
690 AutoLock lock(lock_);
691 // Cleanup and Shutdown should not be called concurrently.
692 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
693 if (shutdown_called_)
694 return;
695 shutdown_called_ = true;
696 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
698 // Tickle the threads. This will wake up a waiting one so it will know that
699 // it can exit, which in turn will wake up any other waiting ones.
700 SignalHasWork();
702 // There are no pending or running tasks blocking shutdown, we're done.
703 if (CanShutdown())
704 return;
707 // If we're here, then something is blocking shutdown. So wait for
708 // CanShutdown() to go to true.
710 if (testing_observer_)
711 testing_observer_->WillWaitForShutdown();
713 #if !defined(OS_NACL)
714 TimeTicks shutdown_wait_begin = TimeTicks::Now();
715 #endif
718 base::ThreadRestrictions::ScopedAllowWait allow_wait;
719 AutoLock lock(lock_);
720 while (!CanShutdown())
721 can_shutdown_cv_.Wait();
723 #if !defined(OS_NACL)
724 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
725 TimeTicks::Now() - shutdown_wait_begin);
726 #endif
729 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
730 AutoLock lock(lock_);
731 return shutdown_called_;
734 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
736 AutoLock lock(lock_);
737 DCHECK(thread_being_created_);
738 thread_being_created_ = false;
739 std::pair<ThreadMap::iterator, bool> result =
740 threads_.insert(
741 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
742 DCHECK(result.second);
744 while (true) {
745 #if defined(OS_MACOSX)
746 base::mac::ScopedNSAutoreleasePool autorelease_pool;
747 #endif
749 HandleCleanup();
751 // See GetWork for what delete_these_outside_lock is doing.
752 SequencedTask task;
753 TimeDelta wait_time;
754 std::vector<Closure> delete_these_outside_lock;
755 GetWorkStatus status =
756 GetWork(&task, &wait_time, &delete_these_outside_lock);
757 if (status == GET_WORK_FOUND) {
758 TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
759 "SequencedWorkerPool::Inner::ThreadLoop",
760 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))),
761 TRACE_EVENT_FLAG_FLOW_IN,
762 "src_file", task.posted_from.file_name(),
763 "src_func", task.posted_from.function_name());
764 int new_thread_id = WillRunWorkerTask(task);
766 AutoUnlock unlock(lock_);
767 // There may be more work available, so wake up another
768 // worker thread. (Technically not required, since we
769 // already get a signal for each new task, but it doesn't
770 // hurt.)
771 SignalHasWork();
772 delete_these_outside_lock.clear();
774 // Complete thread creation outside the lock if necessary.
775 if (new_thread_id)
776 FinishStartingAdditionalThread(new_thread_id);
778 this_worker->set_running_task_info(
779 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
781 tracked_objects::TaskStopwatch stopwatch;
782 stopwatch.Start();
783 task.task.Run();
784 stopwatch.Stop();
786 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
787 task, stopwatch);
789 // Make sure our task is erased outside the lock for the
790 // same reason we do this with delete_these_oustide_lock.
791 // Also, do it before calling reset_running_task_info() so
792 // that sequence-checking from within the task's destructor
793 // still works.
794 task.task = Closure();
796 this_worker->reset_running_task_info();
798 DidRunWorkerTask(task); // Must be done inside the lock.
799 } else if (cleanup_state_ == CLEANUP_RUNNING) {
800 switch (status) {
801 case GET_WORK_WAIT: {
802 AutoUnlock unlock(lock_);
803 delete_these_outside_lock.clear();
805 break;
806 case GET_WORK_NOT_FOUND:
807 CHECK(delete_these_outside_lock.empty());
808 cleanup_state_ = CLEANUP_FINISHING;
809 cleanup_cv_.Broadcast();
810 break;
811 default:
812 NOTREACHED();
814 } else {
815 // When we're terminating and there's no more work, we can
816 // shut down, other workers can complete any pending or new tasks.
817 // We can get additional tasks posted after shutdown_called_ is set
818 // but only worker threads are allowed to post tasks at that time, and
819 // the workers responsible for posting those tasks will be available
820 // to run them. Also, there may be some tasks stuck behind running
821 // ones with the same sequence token, but additional threads won't
822 // help this case.
823 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) {
824 AutoUnlock unlock(lock_);
825 delete_these_outside_lock.clear();
826 break;
829 // No work was found, but there are tasks that need deletion. The
830 // deletion must happen outside of the lock.
831 if (delete_these_outside_lock.size()) {
832 AutoUnlock unlock(lock_);
833 delete_these_outside_lock.clear();
835 // Since the lock has been released, |status| may no longer be
836 // accurate. It might read GET_WORK_WAIT even if there are tasks
837 // ready to perform work. Jump to the top of the loop to recalculate
838 // |status|.
839 continue;
842 waiting_thread_count_++;
844 switch (status) {
845 case GET_WORK_NOT_FOUND:
846 has_work_cv_.Wait();
847 break;
848 case GET_WORK_WAIT:
849 has_work_cv_.TimedWait(wait_time);
850 break;
851 default:
852 NOTREACHED();
854 waiting_thread_count_--;
857 } // Release lock_.
859 // We noticed we should exit. Wake up the next worker so it knows it should
860 // exit as well (because the Shutdown() code only signals once).
861 SignalHasWork();
863 // Possibly unblock shutdown.
864 can_shutdown_cv_.Signal();
867 void SequencedWorkerPool::Inner::HandleCleanup() {
868 lock_.AssertAcquired();
869 if (cleanup_state_ == CLEANUP_DONE)
870 return;
871 if (cleanup_state_ == CLEANUP_REQUESTED) {
872 // We win, we get to do the cleanup as soon as the others wise up and idle.
873 cleanup_state_ = CLEANUP_STARTING;
874 while (thread_being_created_ ||
875 cleanup_idlers_ != threads_.size() - 1) {
876 has_work_cv_.Signal();
877 cleanup_cv_.Wait();
879 cleanup_state_ = CLEANUP_RUNNING;
880 return;
882 if (cleanup_state_ == CLEANUP_STARTING) {
883 // Another worker thread is cleaning up, we idle here until thats done.
884 ++cleanup_idlers_;
885 cleanup_cv_.Broadcast();
886 while (cleanup_state_ != CLEANUP_FINISHING) {
887 cleanup_cv_.Wait();
889 --cleanup_idlers_;
890 cleanup_cv_.Broadcast();
891 return;
893 if (cleanup_state_ == CLEANUP_FINISHING) {
894 // We wait for all idlers to wake up prior to being DONE.
895 while (cleanup_idlers_ != 0) {
896 cleanup_cv_.Broadcast();
897 cleanup_cv_.Wait();
899 if (cleanup_state_ == CLEANUP_FINISHING) {
900 cleanup_state_ = CLEANUP_DONE;
901 cleanup_cv_.Signal();
903 return;
907 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
908 const std::string& name) {
909 lock_.AssertAcquired();
910 DCHECK(!name.empty());
912 std::map<std::string, int>::const_iterator found =
913 named_sequence_tokens_.find(name);
914 if (found != named_sequence_tokens_.end())
915 return found->second; // Got an existing one.
917 // Create a new one for this name.
918 SequenceToken result = GetSequenceToken();
919 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
920 return result.id_;
923 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
924 lock_.AssertAcquired();
925 // We assume that we never create enough tasks to wrap around.
926 return next_sequence_task_number_++;
929 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
930 SequencedTask* task,
931 TimeDelta* wait_time,
932 std::vector<Closure>* delete_these_outside_lock) {
933 lock_.AssertAcquired();
935 // Find the next task with a sequence token that's not currently in use.
936 // If the token is in use, that means another thread is running something
937 // in that sequence, and we can't run it without going out-of-order.
939 // This algorithm is simple and fair, but inefficient in some cases. For
940 // example, say somebody schedules 1000 slow tasks with the same sequence
941 // number. We'll have to go through all those tasks each time we feel like
942 // there might be work to schedule. If this proves to be a problem, we
943 // should make this more efficient.
945 // One possible enhancement would be to keep a map from sequence ID to a
946 // list of pending but currently blocked SequencedTasks for that ID.
947 // When a worker finishes a task of one sequence token, it can pick up the
948 // next one from that token right away.
950 // This may lead to starvation if there are sufficient numbers of sequences
951 // in use. To alleviate this, we could add an incrementing priority counter
952 // to each SequencedTask. Then maintain a priority_queue of all runnable
953 // tasks, sorted by priority counter. When a sequenced task is completed
954 // we would pop the head element off of that tasks pending list and add it
955 // to the priority queue. Then we would run the first item in the priority
956 // queue.
958 GetWorkStatus status = GET_WORK_NOT_FOUND;
959 int unrunnable_tasks = 0;
960 PendingTaskSet::iterator i = pending_tasks_.begin();
961 // We assume that the loop below doesn't take too long and so we can just do
962 // a single call to TimeTicks::Now().
963 const TimeTicks current_time = TimeTicks::Now();
964 while (i != pending_tasks_.end()) {
965 if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
966 unrunnable_tasks++;
967 ++i;
968 continue;
971 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
972 // We're shutting down and the task we just found isn't blocking
973 // shutdown. Delete it and get more work.
975 // Note that we do not want to delete unrunnable tasks. Deleting a task
976 // can have side effects (like freeing some objects) and deleting a
977 // task that's supposed to run after one that's currently running could
978 // cause an obscure crash.
980 // We really want to delete these tasks outside the lock in case the
981 // closures are holding refs to objects that want to post work from
982 // their destructorss (which would deadlock). The closures are
983 // internally refcounted, so we just need to keep a copy of them alive
984 // until the lock is exited. The calling code can just clear() the
985 // vector they passed to us once the lock is exited to make this
986 // happen.
987 delete_these_outside_lock->push_back(i->task);
988 pending_tasks_.erase(i++);
989 continue;
992 if (i->time_to_run > current_time) {
993 // The time to run has not come yet.
994 *wait_time = i->time_to_run - current_time;
995 status = GET_WORK_WAIT;
996 if (cleanup_state_ == CLEANUP_RUNNING) {
997 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
998 delete_these_outside_lock->push_back(i->task);
999 pending_tasks_.erase(i);
1001 break;
1004 // Found a runnable task.
1005 *task = *i;
1006 pending_tasks_.erase(i);
1007 if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
1008 blocking_shutdown_pending_task_count_--;
1011 status = GET_WORK_FOUND;
1012 break;
1015 return status;
1018 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1019 lock_.AssertAcquired();
1021 // Mark the task's sequence number as in use.
1022 if (task.sequence_token_id)
1023 current_sequences_.insert(task.sequence_token_id);
1025 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1026 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1027 // completes.
1028 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
1029 blocking_shutdown_thread_count_++;
1031 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1032 // creates a new thread if there is no free one, there is a race when posting
1033 // tasks that many tasks could have been posted before a thread started
1034 // running them, so only one thread would have been created. So we also check
1035 // whether we should create more threads after removing our task from the
1036 // queue, which also has the nice side effect of creating the workers from
1037 // background threads rather than the main thread of the app.
1039 // If another thread wasn't created, we want to wake up an existing thread
1040 // if there is one waiting to pick up the next task.
1042 // Note that we really need to do this *before* running the task, not
1043 // after. Otherwise, if more than one task is posted, the creation of the
1044 // second thread (since we only create one at a time) will be blocked by
1045 // the execution of the first task, which could be arbitrarily long.
1046 return PrepareToStartAdditionalThreadIfHelpful();
1049 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1050 lock_.AssertAcquired();
1052 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1053 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1054 blocking_shutdown_thread_count_--;
1057 if (task.sequence_token_id)
1058 current_sequences_.erase(task.sequence_token_id);
1061 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1062 int sequence_token_id) const {
1063 lock_.AssertAcquired();
1064 return !sequence_token_id ||
1065 current_sequences_.find(sequence_token_id) ==
1066 current_sequences_.end();
1069 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1070 lock_.AssertAcquired();
1071 // How thread creation works:
1073 // We'de like to avoid creating threads with the lock held. However, we
1074 // need to be sure that we have an accurate accounting of the threads for
1075 // proper Joining and deltion on shutdown.
1077 // We need to figure out if we need another thread with the lock held, which
1078 // is what this function does. It then marks us as in the process of creating
1079 // a thread. When we do shutdown, we wait until the thread_being_created_
1080 // flag is cleared, which ensures that the new thread is properly added to
1081 // all the data structures and we can't leak it. Once shutdown starts, we'll
1082 // refuse to create more threads or they would be leaked.
1084 // Note that this creates a mostly benign race condition on shutdown that
1085 // will cause fewer workers to be created than one would expect. It isn't
1086 // much of an issue in real life, but affects some tests. Since we only spawn
1087 // one worker at a time, the following sequence of events can happen:
1089 // 1. Main thread posts a bunch of unrelated tasks that would normally be
1090 // run on separate threads.
1091 // 2. The first task post causes us to start a worker. Other tasks do not
1092 // cause a worker to start since one is pending.
1093 // 3. Main thread initiates shutdown.
1094 // 4. No more threads are created since the shutdown_called_ flag is set.
1096 // The result is that one may expect that max_threads_ workers to be created
1097 // given the workload, but in reality fewer may be created because the
1098 // sequence of thread creation on the background threads is racing with the
1099 // shutdown call.
1100 if (!shutdown_called_ &&
1101 !thread_being_created_ &&
1102 cleanup_state_ == CLEANUP_DONE &&
1103 threads_.size() < max_threads_ &&
1104 waiting_thread_count_ == 0) {
1105 // We could use an additional thread if there's work to be done.
1106 for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1107 i != pending_tasks_.end(); ++i) {
1108 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1109 // Found a runnable task, mark the thread as being started.
1110 thread_being_created_ = true;
1111 return static_cast<int>(threads_.size() + 1);
1115 return 0;
1118 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1119 int thread_number) {
1120 // Called outside of the lock.
1121 DCHECK_GT(thread_number, 0);
1123 // The worker is assigned to the list when the thread actually starts, which
1124 // will manage the memory of the pointer.
1125 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1128 void SequencedWorkerPool::Inner::SignalHasWork() {
1129 has_work_cv_.Signal();
1130 if (testing_observer_) {
1131 testing_observer_->OnHasWork();
1135 bool SequencedWorkerPool::Inner::CanShutdown() const {
1136 lock_.AssertAcquired();
1137 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1138 return !thread_being_created_ &&
1139 blocking_shutdown_thread_count_ == 0 &&
1140 blocking_shutdown_pending_task_count_ == 0;
1143 base::StaticAtomicSequenceNumber
1144 SequencedWorkerPool::Inner::g_last_sequence_number_;
1146 // SequencedWorkerPool --------------------------------------------------------
1148 // static
1149 SequencedWorkerPool::SequenceToken
1150 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1151 // Don't construct lazy instance on check.
1152 if (g_lazy_tls_ptr == NULL)
1153 return SequenceToken();
1155 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
1156 if (!token)
1157 return SequenceToken();
1158 return *token;
1161 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1162 const std::string& thread_name_prefix)
1163 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1164 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1167 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1168 const std::string& thread_name_prefix,
1169 TestingObserver* observer)
1170 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1171 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1174 SequencedWorkerPool::~SequencedWorkerPool() {}
1176 void SequencedWorkerPool::OnDestruct() const {
1177 // Avoid deleting ourselves on a worker thread (which would
1178 // deadlock).
1179 if (RunsTasksOnCurrentThread()) {
1180 constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1181 } else {
1182 delete this;
1186 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1187 return inner_->GetSequenceToken();
1190 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1191 const std::string& name) {
1192 return inner_->GetNamedSequenceToken(name);
1195 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1196 SequenceToken token) {
1197 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1200 scoped_refptr<SequencedTaskRunner>
1201 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1202 SequenceToken token, WorkerShutdown shutdown_behavior) {
1203 return new SequencedWorkerPoolSequencedTaskRunner(
1204 this, token, shutdown_behavior);
1207 scoped_refptr<TaskRunner>
1208 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1209 WorkerShutdown shutdown_behavior) {
1210 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1213 bool SequencedWorkerPool::PostWorkerTask(
1214 const tracked_objects::Location& from_here,
1215 const Closure& task) {
1216 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1217 from_here, task, TimeDelta());
1220 bool SequencedWorkerPool::PostDelayedWorkerTask(
1221 const tracked_objects::Location& from_here,
1222 const Closure& task,
1223 TimeDelta delay) {
1224 WorkerShutdown shutdown_behavior =
1225 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1226 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1227 from_here, task, delay);
1230 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1231 const tracked_objects::Location& from_here,
1232 const Closure& task,
1233 WorkerShutdown shutdown_behavior) {
1234 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1235 from_here, task, TimeDelta());
1238 bool SequencedWorkerPool::PostSequencedWorkerTask(
1239 SequenceToken sequence_token,
1240 const tracked_objects::Location& from_here,
1241 const Closure& task) {
1242 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1243 from_here, task, TimeDelta());
1246 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1247 SequenceToken sequence_token,
1248 const tracked_objects::Location& from_here,
1249 const Closure& task,
1250 TimeDelta delay) {
1251 WorkerShutdown shutdown_behavior =
1252 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1253 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1254 from_here, task, delay);
1257 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1258 const std::string& token_name,
1259 const tracked_objects::Location& from_here,
1260 const Closure& task) {
1261 DCHECK(!token_name.empty());
1262 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1263 from_here, task, TimeDelta());
1266 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1267 SequenceToken sequence_token,
1268 const tracked_objects::Location& from_here,
1269 const Closure& task,
1270 WorkerShutdown shutdown_behavior) {
1271 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1272 from_here, task, TimeDelta());
1275 bool SequencedWorkerPool::PostDelayedTask(
1276 const tracked_objects::Location& from_here,
1277 const Closure& task,
1278 TimeDelta delay) {
1279 return PostDelayedWorkerTask(from_here, task, delay);
1282 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1283 return inner_->RunsTasksOnCurrentThread();
1286 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1287 SequenceToken sequence_token) const {
1288 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1291 void SequencedWorkerPool::FlushForTesting() {
1292 inner_->CleanupForTesting();
1295 void SequencedWorkerPool::SignalHasWorkForTesting() {
1296 inner_->SignalHasWorkForTesting();
1299 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1300 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1301 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1304 bool SequencedWorkerPool::IsShutdownInProgress() {
1305 return inner_->IsShutdownInProgress();
1308 } // namespace base