Update .DEPS.git
[chromium-blink-merge.git] / base / threading / sequenced_worker_pool.cc
blob56f908b8a60e2099646331e3664c840cd5dc0fcd
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
7 #include <list>
8 #include <map>
9 #include <set>
10 #include <utility>
11 #include <vector>
13 #include "base/atomicops.h"
14 #include "base/callback.h"
15 #include "base/compiler_specific.h"
16 #include "base/critical_closure.h"
17 #include "base/debug/trace_event.h"
18 #include "base/logging.h"
19 #include "base/memory/linked_ptr.h"
20 #include "base/message_loop_proxy.h"
21 #include "base/metrics/histogram.h"
22 #include "base/stl_util.h"
23 #include "base/stringprintf.h"
24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/lock.h"
26 #include "base/threading/platform_thread.h"
27 #include "base/threading/simple_thread.h"
28 #include "base/threading/thread_restrictions.h"
29 #include "base/time.h"
30 #include "base/tracked_objects.h"
32 #if defined(OS_MACOSX)
33 #include "base/mac/scoped_nsautorelease_pool.h"
34 #endif
36 namespace base {
38 namespace {
40 struct SequencedTask : public TrackingInfo {
41 SequencedTask()
42 : sequence_token_id(0),
43 trace_id(0),
44 sequence_task_number(0),
45 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
47 explicit SequencedTask(const tracked_objects::Location& from_here)
48 : base::TrackingInfo(from_here, TimeTicks()),
49 sequence_token_id(0),
50 trace_id(0),
51 sequence_task_number(0),
52 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
54 ~SequencedTask() {}
56 int sequence_token_id;
57 int trace_id;
58 int64 sequence_task_number;
59 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
60 tracked_objects::Location posted_from;
61 Closure task;
63 // Non-delayed tasks and delayed tasks are managed together by time-to-run
64 // order. We calculate the time by adding the posted time and the given delay.
65 TimeTicks time_to_run;
68 struct SequencedTaskLessThan {
69 public:
70 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
71 if (lhs.time_to_run < rhs.time_to_run)
72 return true;
74 if (lhs.time_to_run > rhs.time_to_run)
75 return false;
77 // If the time happen to match, then we use the sequence number to decide.
78 return lhs.sequence_task_number < rhs.sequence_task_number;
82 // SequencedWorkerPoolTaskRunner ---------------------------------------------
83 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
84 // fixed ShutdownBehavior.
86 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
87 class SequencedWorkerPoolTaskRunner : public TaskRunner {
88 public:
89 SequencedWorkerPoolTaskRunner(
90 const scoped_refptr<SequencedWorkerPool>& pool,
91 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
93 // TaskRunner implementation
94 virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
95 const Closure& task,
96 TimeDelta delay) OVERRIDE;
97 virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
99 private:
100 virtual ~SequencedWorkerPoolTaskRunner();
102 const scoped_refptr<SequencedWorkerPool> pool_;
104 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
106 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
109 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
110 const scoped_refptr<SequencedWorkerPool>& pool,
111 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
112 : pool_(pool),
113 shutdown_behavior_(shutdown_behavior) {
116 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
119 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
120 const tracked_objects::Location& from_here,
121 const Closure& task,
122 TimeDelta delay) {
123 if (delay == TimeDelta()) {
124 return pool_->PostWorkerTaskWithShutdownBehavior(
125 from_here, task, shutdown_behavior_);
127 return pool_->PostDelayedWorkerTask(from_here, task, delay);
130 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
131 return pool_->RunsTasksOnCurrentThread();
134 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
135 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
136 // fixed sequence token.
138 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
139 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
140 public:
141 SequencedWorkerPoolSequencedTaskRunner(
142 const scoped_refptr<SequencedWorkerPool>& pool,
143 SequencedWorkerPool::SequenceToken token,
144 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
146 // TaskRunner implementation
147 virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
148 const Closure& task,
149 TimeDelta delay) OVERRIDE;
150 virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
152 // SequencedTaskRunner implementation
153 virtual bool PostNonNestableDelayedTask(
154 const tracked_objects::Location& from_here,
155 const Closure& task,
156 TimeDelta delay) OVERRIDE;
158 private:
159 virtual ~SequencedWorkerPoolSequencedTaskRunner();
161 const scoped_refptr<SequencedWorkerPool> pool_;
163 const SequencedWorkerPool::SequenceToken token_;
165 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
167 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
170 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
171 const scoped_refptr<SequencedWorkerPool>& pool,
172 SequencedWorkerPool::SequenceToken token,
173 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
174 : pool_(pool),
175 token_(token),
176 shutdown_behavior_(shutdown_behavior) {
179 SequencedWorkerPoolSequencedTaskRunner::
180 ~SequencedWorkerPoolSequencedTaskRunner() {
183 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
184 const tracked_objects::Location& from_here,
185 const Closure& task,
186 TimeDelta delay) {
187 if (delay == TimeDelta()) {
188 return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
189 token_, from_here, task, shutdown_behavior_);
191 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
194 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
195 return pool_->IsRunningSequenceOnCurrentThread(token_);
198 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
199 const tracked_objects::Location& from_here,
200 const Closure& task,
201 TimeDelta delay) {
202 // There's no way to run nested tasks, so simply forward to
203 // PostDelayedTask.
204 return PostDelayedTask(from_here, task, delay);
207 // Create a process-wide unique ID to represent this task in trace events. This
208 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
209 // with MessageLoop pointers on other processes.
210 uint64 GetTaskTraceID(const SequencedTask& task,
211 void* pool) {
212 return (static_cast<uint64>(task.trace_id) << 32) |
213 static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
216 } // namespace
218 // Worker ---------------------------------------------------------------------
220 class SequencedWorkerPool::Worker : public SimpleThread {
221 public:
222 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
223 // around as long as we are running.
224 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
225 int thread_number,
226 const std::string& thread_name_prefix);
227 virtual ~Worker();
229 // SimpleThread implementation. This actually runs the background thread.
230 virtual void Run() OVERRIDE;
232 void set_running_sequence(SequenceToken token) {
233 running_sequence_ = token;
236 SequenceToken running_sequence() const {
237 return running_sequence_;
240 private:
241 scoped_refptr<SequencedWorkerPool> worker_pool_;
242 SequenceToken running_sequence_;
244 DISALLOW_COPY_AND_ASSIGN(Worker);
247 // Inner ----------------------------------------------------------------------
249 class SequencedWorkerPool::Inner {
250 public:
251 // Take a raw pointer to |worker| to avoid cycles (since we're owned
252 // by it).
253 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
254 const std::string& thread_name_prefix,
255 TestingObserver* observer);
257 ~Inner();
259 SequenceToken GetSequenceToken();
261 SequenceToken GetNamedSequenceToken(const std::string& name);
263 // This function accepts a name and an ID. If the name is null, the
264 // token ID is used. This allows us to implement the optional name lookup
265 // from a single function without having to enter the lock a separate time.
266 bool PostTask(const std::string* optional_token_name,
267 SequenceToken sequence_token,
268 WorkerShutdown shutdown_behavior,
269 const tracked_objects::Location& from_here,
270 const Closure& task,
271 TimeDelta delay);
273 bool RunsTasksOnCurrentThread() const;
275 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
277 void FlushForTesting();
279 void SignalHasWorkForTesting();
281 int GetWorkSignalCountForTesting() const;
283 void Shutdown();
285 // Runs the worker loop on the background thread.
286 void ThreadLoop(Worker* this_worker);
288 private:
289 enum GetWorkStatus {
290 GET_WORK_FOUND,
291 GET_WORK_NOT_FOUND,
292 GET_WORK_WAIT,
295 // Returns whether there are no more pending tasks and all threads
296 // are idle. Must be called under lock.
297 bool IsIdle() const;
299 // Called from within the lock, this converts the given token name into a
300 // token ID, creating a new one if necessary.
301 int LockedGetNamedTokenID(const std::string& name);
303 // Called from within the lock, this returns the next sequence task number.
304 int64 LockedGetNextSequenceTaskNumber();
306 // Gets new task. There are 3 cases depending on the return value:
308 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
309 // be run immediately.
310 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
311 // and |task| is not filled in. In this case, the caller should wait until
312 // a task is posted.
313 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
314 // immediately, and |task| is not filled in. Likewise, |wait_time| is
315 // filled in the time to wait until the next task to run. In this case, the
316 // caller should wait the time.
318 // In any case, the calling code should clear the given
319 // delete_these_outside_lock vector the next time the lock is released.
320 // See the implementation for a more detailed description.
321 GetWorkStatus GetWork(SequencedTask* task,
322 TimeDelta* wait_time,
323 std::vector<Closure>* delete_these_outside_lock);
325 // Peforms init and cleanup around running the given task. WillRun...
326 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
327 // The calling code should call FinishStartingAdditionalThread once the
328 // lock is released if the return values is nonzero.
329 int WillRunWorkerTask(const SequencedTask& task);
330 void DidRunWorkerTask(const SequencedTask& task);
332 // Returns true if there are no threads currently running the given
333 // sequence token.
334 bool IsSequenceTokenRunnable(int sequence_token_id) const;
336 // Checks if all threads are busy and the addition of one more could run an
337 // additional task waiting in the queue. This must be called from within
338 // the lock.
340 // If another thread is helpful, this will mark the thread as being in the
341 // process of starting and returns the index of the new thread which will be
342 // 0 or more. The caller should then call FinishStartingAdditionalThread to
343 // complete initialization once the lock is released.
345 // If another thread is not necessary, returne 0;
347 // See the implementedion for more.
348 int PrepareToStartAdditionalThreadIfHelpful();
350 // The second part of thread creation after
351 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
352 // generated. This actually creates the thread and should be called outside
353 // the lock to avoid blocking important work starting a thread in the lock.
354 void FinishStartingAdditionalThread(int thread_number);
356 // Signal |has_work_| and increment |has_work_signal_count_|.
357 void SignalHasWork();
359 // Checks whether there is work left that's blocking shutdown. Must be
360 // called inside the lock.
361 bool CanShutdown() const;
363 SequencedWorkerPool* const worker_pool_;
365 // The last sequence number used. Managed by GetSequenceToken, since this
366 // only does threadsafe increment operations, you do not need to hold the
367 // lock.
368 volatile subtle::Atomic32 last_sequence_number_;
370 // This lock protects |everything in this class|. Do not read or modify
371 // anything without holding this lock. Do not block while holding this
372 // lock.
373 mutable Lock lock_;
375 // Condition variable that is waited on by worker threads until new
376 // tasks are posted or shutdown starts.
377 ConditionVariable has_work_cv_;
379 // Condition variable that is waited on by non-worker threads (in
380 // FlushForTesting()) until IsIdle() goes to true.
381 ConditionVariable is_idle_cv_;
383 // Condition variable that is waited on by non-worker threads (in
384 // Shutdown()) until CanShutdown() goes to true.
385 ConditionVariable can_shutdown_cv_;
387 // The maximum number of worker threads we'll create.
388 const size_t max_threads_;
390 const std::string thread_name_prefix_;
392 // Associates all known sequence token names with their IDs.
393 std::map<std::string, int> named_sequence_tokens_;
395 // Owning pointers to all threads we've created so far, indexed by
396 // ID. Since we lazily create threads, this may be less than
397 // max_threads_ and will be initially empty.
398 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
399 ThreadMap threads_;
401 // Set to true when we're in the process of creating another thread.
402 // See PrepareToStartAdditionalThreadIfHelpful for more.
403 bool thread_being_created_;
405 // Number of threads currently waiting for work.
406 size_t waiting_thread_count_;
408 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
409 // or SKIP_ON_SHUTDOWN flag set.
410 size_t blocking_shutdown_thread_count_;
412 // A set of all pending tasks in time-to-run order. These are tasks that are
413 // either waiting for a thread to run on, waiting for their time to run,
414 // or blocked on a previous task in their sequence. We have to iterate over
415 // the tasks by time-to-run order, so we use the set instead of the
416 // traditional priority_queue.
417 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
418 PendingTaskSet pending_tasks_;
420 // The next sequence number for a new sequenced task.
421 int64 next_sequence_task_number_;
423 // Number of tasks in the pending_tasks_ list that are marked as blocking
424 // shutdown.
425 size_t blocking_shutdown_pending_task_count_;
427 // Lists all sequence tokens currently executing.
428 std::set<int> current_sequences_;
430 // An ID for each posted task to distinguish the task from others in traces.
431 int trace_id_;
433 // Set when Shutdown is called and no further tasks should be
434 // allowed, though we may still be running existing tasks.
435 bool shutdown_called_;
437 TestingObserver* const testing_observer_;
439 DISALLOW_COPY_AND_ASSIGN(Inner);
442 // Worker definitions ---------------------------------------------------------
444 SequencedWorkerPool::Worker::Worker(
445 const scoped_refptr<SequencedWorkerPool>& worker_pool,
446 int thread_number,
447 const std::string& prefix)
448 : SimpleThread(
449 prefix + StringPrintf("Worker%d", thread_number).c_str()),
450 worker_pool_(worker_pool) {
451 Start();
454 SequencedWorkerPool::Worker::~Worker() {
457 void SequencedWorkerPool::Worker::Run() {
458 // Just jump back to the Inner object to run the thread, since it has all the
459 // tracking information and queues. It might be more natural to implement
460 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
461 // having these worker objects at all, but that method lacks the ability to
462 // send thread-specific information easily to the thread loop.
463 worker_pool_->inner_->ThreadLoop(this);
464 // Release our cyclic reference once we're done.
465 worker_pool_ = NULL;
468 // Inner definitions ---------------------------------------------------------
470 SequencedWorkerPool::Inner::Inner(
471 SequencedWorkerPool* worker_pool,
472 size_t max_threads,
473 const std::string& thread_name_prefix,
474 TestingObserver* observer)
475 : worker_pool_(worker_pool),
476 last_sequence_number_(0),
477 lock_(),
478 has_work_cv_(&lock_),
479 is_idle_cv_(&lock_),
480 can_shutdown_cv_(&lock_),
481 max_threads_(max_threads),
482 thread_name_prefix_(thread_name_prefix),
483 thread_being_created_(false),
484 waiting_thread_count_(0),
485 blocking_shutdown_thread_count_(0),
486 next_sequence_task_number_(0),
487 blocking_shutdown_pending_task_count_(0),
488 trace_id_(0),
489 shutdown_called_(false),
490 testing_observer_(observer) {}
492 SequencedWorkerPool::Inner::~Inner() {
493 // You must call Shutdown() before destroying the pool.
494 DCHECK(shutdown_called_);
496 // Need to explicitly join with the threads before they're destroyed or else
497 // they will be running when our object is half torn down.
498 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
499 it->second->Join();
500 threads_.clear();
502 if (testing_observer_)
503 testing_observer_->OnDestruct();
506 SequencedWorkerPool::SequenceToken
507 SequencedWorkerPool::Inner::GetSequenceToken() {
508 subtle::Atomic32 result =
509 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
510 return SequenceToken(static_cast<int>(result));
513 SequencedWorkerPool::SequenceToken
514 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
515 AutoLock lock(lock_);
516 return SequenceToken(LockedGetNamedTokenID(name));
519 bool SequencedWorkerPool::Inner::PostTask(
520 const std::string* optional_token_name,
521 SequenceToken sequence_token,
522 WorkerShutdown shutdown_behavior,
523 const tracked_objects::Location& from_here,
524 const Closure& task,
525 TimeDelta delay) {
526 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
527 SequencedTask sequenced(from_here);
528 sequenced.sequence_token_id = sequence_token.id_;
529 sequenced.shutdown_behavior = shutdown_behavior;
530 sequenced.posted_from = from_here;
531 sequenced.task =
532 shutdown_behavior == BLOCK_SHUTDOWN ?
533 base::MakeCriticalClosure(task) : task;
534 sequenced.time_to_run = TimeTicks::Now() + delay;
536 int create_thread_id = 0;
538 AutoLock lock(lock_);
539 if (shutdown_called_)
540 return false;
542 // The trace_id is used for identifying the task in about:tracing.
543 sequenced.trace_id = trace_id_++;
545 TRACE_EVENT_FLOW_BEGIN0("task", "SequencedWorkerPool::PostTask",
546 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))));
548 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
550 // Now that we have the lock, apply the named token rules.
551 if (optional_token_name)
552 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
554 pending_tasks_.insert(sequenced);
555 if (shutdown_behavior == BLOCK_SHUTDOWN)
556 blocking_shutdown_pending_task_count_++;
558 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
561 // Actually start the additional thread or signal an existing one now that
562 // we're outside the lock.
563 if (create_thread_id)
564 FinishStartingAdditionalThread(create_thread_id);
565 else
566 SignalHasWork();
568 return true;
571 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
572 AutoLock lock(lock_);
573 return ContainsKey(threads_, PlatformThread::CurrentId());
576 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
577 SequenceToken sequence_token) const {
578 AutoLock lock(lock_);
579 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
580 if (found == threads_.end())
581 return false;
582 return found->second->running_sequence().Equals(sequence_token);
585 void SequencedWorkerPool::Inner::FlushForTesting() {
586 AutoLock lock(lock_);
587 while (!IsIdle())
588 is_idle_cv_.Wait();
591 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
592 SignalHasWork();
595 void SequencedWorkerPool::Inner::Shutdown() {
596 // Mark us as terminated and go through and drop all tasks that aren't
597 // required to run on shutdown. Since no new tasks will get posted once the
598 // terminated flag is set, this ensures that all remaining tasks are required
599 // for shutdown whenever the termianted_ flag is set.
601 AutoLock lock(lock_);
603 if (shutdown_called_)
604 return;
605 shutdown_called_ = true;
607 // Tickle the threads. This will wake up a waiting one so it will know that
608 // it can exit, which in turn will wake up any other waiting ones.
609 SignalHasWork();
611 // There are no pending or running tasks blocking shutdown, we're done.
612 if (CanShutdown())
613 return;
616 // If we're here, then something is blocking shutdown. So wait for
617 // CanShutdown() to go to true.
619 if (testing_observer_)
620 testing_observer_->WillWaitForShutdown();
622 TimeTicks shutdown_wait_begin = TimeTicks::Now();
625 base::ThreadRestrictions::ScopedAllowWait allow_wait;
626 AutoLock lock(lock_);
627 while (!CanShutdown())
628 can_shutdown_cv_.Wait();
630 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
631 TimeTicks::Now() - shutdown_wait_begin);
634 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
636 AutoLock lock(lock_);
637 DCHECK(thread_being_created_);
638 thread_being_created_ = false;
639 std::pair<ThreadMap::iterator, bool> result =
640 threads_.insert(
641 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
642 DCHECK(result.second);
644 while (true) {
645 #if defined(OS_MACOSX)
646 base::mac::ScopedNSAutoreleasePool autorelease_pool;
647 #endif
649 // See GetWork for what delete_these_outside_lock is doing.
650 SequencedTask task;
651 TimeDelta wait_time;
652 std::vector<Closure> delete_these_outside_lock;
653 GetWorkStatus status =
654 GetWork(&task, &wait_time, &delete_these_outside_lock);
655 if (status == GET_WORK_FOUND) {
656 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask",
657 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
658 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop",
659 "src_file", task.posted_from.file_name(),
660 "src_func", task.posted_from.function_name());
661 int new_thread_id = WillRunWorkerTask(task);
663 AutoUnlock unlock(lock_);
664 // There may be more work available, so wake up another
665 // worker thread. (Technically not required, since we
666 // already get a signal for each new task, but it doesn't
667 // hurt.)
668 SignalHasWork();
669 delete_these_outside_lock.clear();
671 // Complete thread creation outside the lock if necessary.
672 if (new_thread_id)
673 FinishStartingAdditionalThread(new_thread_id);
675 this_worker->set_running_sequence(
676 SequenceToken(task.sequence_token_id));
678 tracked_objects::TrackedTime start_time =
679 tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally);
681 task.task.Run();
683 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task,
684 start_time, tracked_objects::ThreadData::NowForEndOfRun());
686 this_worker->set_running_sequence(SequenceToken());
688 // Make sure our task is erased outside the lock for the same reason
689 // we do this with delete_these_oustide_lock.
690 task.task = Closure();
692 DidRunWorkerTask(task); // Must be done inside the lock.
693 } else {
694 // When we're terminating and there's no more work, we can
695 // shut down. You can't get more tasks posted once
696 // shutdown_called_ is set. There may be some tasks stuck
697 // behind running ones with the same sequence token, but
698 // additional threads won't help this case.
699 if (shutdown_called_ &&
700 blocking_shutdown_pending_task_count_ == 0)
701 break;
702 waiting_thread_count_++;
703 // This is the only time that IsIdle() can go to true.
704 if (IsIdle())
705 is_idle_cv_.Signal();
707 switch (status) {
708 case GET_WORK_NOT_FOUND:
709 has_work_cv_.Wait();
710 break;
711 case GET_WORK_WAIT:
712 has_work_cv_.TimedWait(wait_time);
713 break;
714 default:
715 NOTREACHED();
717 waiting_thread_count_--;
720 } // Release lock_.
722 // We noticed we should exit. Wake up the next worker so it knows it should
723 // exit as well (because the Shutdown() code only signals once).
724 SignalHasWork();
726 // Possibly unblock shutdown.
727 can_shutdown_cv_.Signal();
730 bool SequencedWorkerPool::Inner::IsIdle() const {
731 lock_.AssertAcquired();
732 return pending_tasks_.empty() && waiting_thread_count_ == threads_.size();
735 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
736 const std::string& name) {
737 lock_.AssertAcquired();
738 DCHECK(!name.empty());
740 std::map<std::string, int>::const_iterator found =
741 named_sequence_tokens_.find(name);
742 if (found != named_sequence_tokens_.end())
743 return found->second; // Got an existing one.
745 // Create a new one for this name.
746 SequenceToken result = GetSequenceToken();
747 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
748 return result.id_;
751 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
752 lock_.AssertAcquired();
753 // We assume that we never create enough tasks to wrap around.
754 return next_sequence_task_number_++;
757 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
758 SequencedTask* task,
759 TimeDelta* wait_time,
760 std::vector<Closure>* delete_these_outside_lock) {
761 lock_.AssertAcquired();
763 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
764 static_cast<int>(pending_tasks_.size()));
766 // Find the next task with a sequence token that's not currently in use.
767 // If the token is in use, that means another thread is running something
768 // in that sequence, and we can't run it without going out-of-order.
770 // This algorithm is simple and fair, but inefficient in some cases. For
771 // example, say somebody schedules 1000 slow tasks with the same sequence
772 // number. We'll have to go through all those tasks each time we feel like
773 // there might be work to schedule. If this proves to be a problem, we
774 // should make this more efficient.
776 // One possible enhancement would be to keep a map from sequence ID to a
777 // list of pending but currently blocked SequencedTasks for that ID.
778 // When a worker finishes a task of one sequence token, it can pick up the
779 // next one from that token right away.
781 // This may lead to starvation if there are sufficient numbers of sequences
782 // in use. To alleviate this, we could add an incrementing priority counter
783 // to each SequencedTask. Then maintain a priority_queue of all runnable
784 // tasks, sorted by priority counter. When a sequenced task is completed
785 // we would pop the head element off of that tasks pending list and add it
786 // to the priority queue. Then we would run the first item in the priority
787 // queue.
789 GetWorkStatus status = GET_WORK_NOT_FOUND;
790 int unrunnable_tasks = 0;
791 PendingTaskSet::iterator i = pending_tasks_.begin();
792 // We assume that the loop below doesn't take too long and so we can just do
793 // a single call to TimeTicks::Now().
794 const TimeTicks current_time = TimeTicks::Now();
795 while (i != pending_tasks_.end()) {
796 if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
797 unrunnable_tasks++;
798 ++i;
799 continue;
802 if (i->time_to_run > current_time) {
803 // The time to run has not come yet.
804 *wait_time = i->time_to_run - current_time;
805 status = GET_WORK_WAIT;
806 break;
809 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
810 // We're shutting down and the task we just found isn't blocking
811 // shutdown. Delete it and get more work.
813 // Note that we do not want to delete unrunnable tasks. Deleting a task
814 // can have side effects (like freeing some objects) and deleting a
815 // task that's supposed to run after one that's currently running could
816 // cause an obscure crash.
818 // We really want to delete these tasks outside the lock in case the
819 // closures are holding refs to objects that want to post work from
820 // their destructorss (which would deadlock). The closures are
821 // internally refcounted, so we just need to keep a copy of them alive
822 // until the lock is exited. The calling code can just clear() the
823 // vector they passed to us once the lock is exited to make this
824 // happen.
825 delete_these_outside_lock->push_back(i->task);
826 pending_tasks_.erase(i++);
827 } else {
828 // Found a runnable task.
829 *task = *i;
830 pending_tasks_.erase(i);
831 if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
832 blocking_shutdown_pending_task_count_--;
835 status = GET_WORK_FOUND;
836 break;
840 // Track the number of tasks we had to skip over to see if we should be
841 // making this more efficient. If this number ever becomes large or is
842 // frequently "some", we should consider the optimization above.
843 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
844 unrunnable_tasks);
845 return status;
848 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
849 lock_.AssertAcquired();
851 // Mark the task's sequence number as in use.
852 if (task.sequence_token_id)
853 current_sequences_.insert(task.sequence_token_id);
855 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
856 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
857 // completes.
858 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
859 blocking_shutdown_thread_count_++;
861 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
862 // creates a new thread if there is no free one, there is a race when posting
863 // tasks that many tasks could have been posted before a thread started
864 // running them, so only one thread would have been created. So we also check
865 // whether we should create more threads after removing our task from the
866 // queue, which also has the nice side effect of creating the workers from
867 // background threads rather than the main thread of the app.
869 // If another thread wasn't created, we want to wake up an existing thread
870 // if there is one waiting to pick up the next task.
872 // Note that we really need to do this *before* running the task, not
873 // after. Otherwise, if more than one task is posted, the creation of the
874 // second thread (since we only create one at a time) will be blocked by
875 // the execution of the first task, which could be arbitrarily long.
876 return PrepareToStartAdditionalThreadIfHelpful();
879 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
880 lock_.AssertAcquired();
882 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
883 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
884 blocking_shutdown_thread_count_--;
887 if (task.sequence_token_id)
888 current_sequences_.erase(task.sequence_token_id);
891 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
892 int sequence_token_id) const {
893 lock_.AssertAcquired();
894 return !sequence_token_id ||
895 current_sequences_.find(sequence_token_id) ==
896 current_sequences_.end();
899 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
900 lock_.AssertAcquired();
901 // How thread creation works:
903 // We'de like to avoid creating threads with the lock held. However, we
904 // need to be sure that we have an accurate accounting of the threads for
905 // proper Joining and deltion on shutdown.
907 // We need to figure out if we need another thread with the lock held, which
908 // is what this function does. It then marks us as in the process of creating
909 // a thread. When we do shutdown, we wait until the thread_being_created_
910 // flag is cleared, which ensures that the new thread is properly added to
911 // all the data structures and we can't leak it. Once shutdown starts, we'll
912 // refuse to create more threads or they would be leaked.
914 // Note that this creates a mostly benign race condition on shutdown that
915 // will cause fewer workers to be created than one would expect. It isn't
916 // much of an issue in real life, but affects some tests. Since we only spawn
917 // one worker at a time, the following sequence of events can happen:
919 // 1. Main thread posts a bunch of unrelated tasks that would normally be
920 // run on separate threads.
921 // 2. The first task post causes us to start a worker. Other tasks do not
922 // cause a worker to start since one is pending.
923 // 3. Main thread initiates shutdown.
924 // 4. No more threads are created since the shutdown_called_ flag is set.
926 // The result is that one may expect that max_threads_ workers to be created
927 // given the workload, but in reality fewer may be created because the
928 // sequence of thread creation on the background threads is racing with the
929 // shutdown call.
930 if (!shutdown_called_ &&
931 !thread_being_created_ &&
932 threads_.size() < max_threads_ &&
933 waiting_thread_count_ == 0) {
934 // We could use an additional thread if there's work to be done.
935 for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
936 i != pending_tasks_.end(); ++i) {
937 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
938 // Found a runnable task, mark the thread as being started.
939 thread_being_created_ = true;
940 return static_cast<int>(threads_.size() + 1);
944 return 0;
947 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
948 int thread_number) {
949 // Called outside of the lock.
950 DCHECK(thread_number > 0);
952 // The worker is assigned to the list when the thread actually starts, which
953 // will manage the memory of the pointer.
954 new Worker(worker_pool_, thread_number, thread_name_prefix_);
957 void SequencedWorkerPool::Inner::SignalHasWork() {
958 has_work_cv_.Signal();
959 if (testing_observer_) {
960 testing_observer_->OnHasWork();
964 bool SequencedWorkerPool::Inner::CanShutdown() const {
965 lock_.AssertAcquired();
966 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
967 return !thread_being_created_ &&
968 blocking_shutdown_thread_count_ == 0 &&
969 blocking_shutdown_pending_task_count_ == 0;
972 // SequencedWorkerPool --------------------------------------------------------
974 SequencedWorkerPool::SequencedWorkerPool(
975 size_t max_threads,
976 const std::string& thread_name_prefix)
977 : constructor_message_loop_(MessageLoopProxy::current()),
978 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this),
979 max_threads, thread_name_prefix, NULL)) {
982 SequencedWorkerPool::SequencedWorkerPool(
983 size_t max_threads,
984 const std::string& thread_name_prefix,
985 TestingObserver* observer)
986 : constructor_message_loop_(MessageLoopProxy::current()),
987 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this),
988 max_threads, thread_name_prefix, observer)) {
991 SequencedWorkerPool::~SequencedWorkerPool() {}
993 void SequencedWorkerPool::OnDestruct() const {
994 DCHECK(constructor_message_loop_.get());
995 // Avoid deleting ourselves on a worker thread (which would
996 // deadlock).
997 if (RunsTasksOnCurrentThread()) {
998 constructor_message_loop_->DeleteSoon(FROM_HERE, this);
999 } else {
1000 delete this;
1004 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1005 return inner_->GetSequenceToken();
1008 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1009 const std::string& name) {
1010 return inner_->GetNamedSequenceToken(name);
1013 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1014 SequenceToken token) {
1015 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1018 scoped_refptr<SequencedTaskRunner>
1019 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1020 SequenceToken token, WorkerShutdown shutdown_behavior) {
1021 return new SequencedWorkerPoolSequencedTaskRunner(
1022 this, token, shutdown_behavior);
1025 scoped_refptr<TaskRunner>
1026 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1027 WorkerShutdown shutdown_behavior) {
1028 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1031 bool SequencedWorkerPool::PostWorkerTask(
1032 const tracked_objects::Location& from_here,
1033 const Closure& task) {
1034 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1035 from_here, task, TimeDelta());
1038 bool SequencedWorkerPool::PostDelayedWorkerTask(
1039 const tracked_objects::Location& from_here,
1040 const Closure& task,
1041 TimeDelta delay) {
1042 WorkerShutdown shutdown_behavior =
1043 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1044 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1045 from_here, task, delay);
1048 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1049 const tracked_objects::Location& from_here,
1050 const Closure& task,
1051 WorkerShutdown shutdown_behavior) {
1052 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1053 from_here, task, TimeDelta());
1056 bool SequencedWorkerPool::PostSequencedWorkerTask(
1057 SequenceToken sequence_token,
1058 const tracked_objects::Location& from_here,
1059 const Closure& task) {
1060 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1061 from_here, task, TimeDelta());
1064 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1065 SequenceToken sequence_token,
1066 const tracked_objects::Location& from_here,
1067 const Closure& task,
1068 TimeDelta delay) {
1069 WorkerShutdown shutdown_behavior =
1070 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1071 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1072 from_here, task, delay);
1075 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1076 const std::string& token_name,
1077 const tracked_objects::Location& from_here,
1078 const Closure& task) {
1079 DCHECK(!token_name.empty());
1080 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1081 from_here, task, TimeDelta());
1084 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1085 SequenceToken sequence_token,
1086 const tracked_objects::Location& from_here,
1087 const Closure& task,
1088 WorkerShutdown shutdown_behavior) {
1089 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1090 from_here, task, TimeDelta());
1093 bool SequencedWorkerPool::PostDelayedTask(
1094 const tracked_objects::Location& from_here,
1095 const Closure& task,
1096 TimeDelta delay) {
1097 return PostDelayedWorkerTask(from_here, task, delay);
1100 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1101 return inner_->RunsTasksOnCurrentThread();
1104 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1105 SequenceToken sequence_token) const {
1106 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1109 void SequencedWorkerPool::FlushForTesting() {
1110 inner_->FlushForTesting();
1113 void SequencedWorkerPool::SignalHasWorkForTesting() {
1114 inner_->SignalHasWorkForTesting();
1117 void SequencedWorkerPool::Shutdown() {
1118 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1119 inner_->Shutdown();
1122 } // namespace base