Move prefs::kLastPolicyStatisticsUpdate to the policy component.
[chromium-blink-merge.git] / base / threading / sequenced_worker_pool.cc
blob4fc090d11861b59955cdb283e4854f71d389ea31
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
7 #include <list>
8 #include <map>
9 #include <set>
10 #include <utility>
11 #include <vector>
13 #include "base/atomic_sequence_num.h"
14 #include "base/callback.h"
15 #include "base/compiler_specific.h"
16 #include "base/critical_closure.h"
17 #include "base/debug/trace_event.h"
18 #include "base/lazy_instance.h"
19 #include "base/logging.h"
20 #include "base/memory/linked_ptr.h"
21 #include "base/message_loop/message_loop_proxy.h"
22 #include "base/stl_util.h"
23 #include "base/strings/stringprintf.h"
24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/lock.h"
26 #include "base/threading/platform_thread.h"
27 #include "base/threading/simple_thread.h"
28 #include "base/threading/thread_local.h"
29 #include "base/threading/thread_restrictions.h"
30 #include "base/time/time.h"
31 #include "base/tracked_objects.h"
33 #if defined(OS_MACOSX)
34 #include "base/mac/scoped_nsautorelease_pool.h"
35 #endif
37 #if !defined(OS_NACL)
38 #include "base/metrics/histogram.h"
39 #endif
41 namespace base {
43 namespace {
45 struct SequencedTask : public TrackingInfo {
46 SequencedTask()
47 : sequence_token_id(0),
48 trace_id(0),
49 sequence_task_number(0),
50 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
52 explicit SequencedTask(const tracked_objects::Location& from_here)
53 : base::TrackingInfo(from_here, TimeTicks()),
54 sequence_token_id(0),
55 trace_id(0),
56 sequence_task_number(0),
57 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
59 ~SequencedTask() {}
61 int sequence_token_id;
62 int trace_id;
63 int64 sequence_task_number;
64 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
65 tracked_objects::Location posted_from;
66 Closure task;
68 // Non-delayed tasks and delayed tasks are managed together by time-to-run
69 // order. We calculate the time by adding the posted time and the given delay.
70 TimeTicks time_to_run;
73 struct SequencedTaskLessThan {
74 public:
75 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
76 if (lhs.time_to_run < rhs.time_to_run)
77 return true;
79 if (lhs.time_to_run > rhs.time_to_run)
80 return false;
82 // If the time happen to match, then we use the sequence number to decide.
83 return lhs.sequence_task_number < rhs.sequence_task_number;
87 // SequencedWorkerPoolTaskRunner ---------------------------------------------
88 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
89 // fixed ShutdownBehavior.
91 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
92 class SequencedWorkerPoolTaskRunner : public TaskRunner {
93 public:
94 SequencedWorkerPoolTaskRunner(
95 const scoped_refptr<SequencedWorkerPool>& pool,
96 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
98 // TaskRunner implementation
99 virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
100 const Closure& task,
101 TimeDelta delay) OVERRIDE;
102 virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
104 private:
105 virtual ~SequencedWorkerPoolTaskRunner();
107 const scoped_refptr<SequencedWorkerPool> pool_;
109 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
111 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
114 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
115 const scoped_refptr<SequencedWorkerPool>& pool,
116 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
117 : pool_(pool),
118 shutdown_behavior_(shutdown_behavior) {
121 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
124 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
125 const tracked_objects::Location& from_here,
126 const Closure& task,
127 TimeDelta delay) {
128 if (delay == TimeDelta()) {
129 return pool_->PostWorkerTaskWithShutdownBehavior(
130 from_here, task, shutdown_behavior_);
132 return pool_->PostDelayedWorkerTask(from_here, task, delay);
135 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
136 return pool_->RunsTasksOnCurrentThread();
139 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
140 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
141 // fixed sequence token.
143 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
144 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
145 public:
146 SequencedWorkerPoolSequencedTaskRunner(
147 const scoped_refptr<SequencedWorkerPool>& pool,
148 SequencedWorkerPool::SequenceToken token,
149 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
151 // TaskRunner implementation
152 virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
153 const Closure& task,
154 TimeDelta delay) OVERRIDE;
155 virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
157 // SequencedTaskRunner implementation
158 virtual bool PostNonNestableDelayedTask(
159 const tracked_objects::Location& from_here,
160 const Closure& task,
161 TimeDelta delay) OVERRIDE;
163 private:
164 virtual ~SequencedWorkerPoolSequencedTaskRunner();
166 const scoped_refptr<SequencedWorkerPool> pool_;
168 const SequencedWorkerPool::SequenceToken token_;
170 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
172 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
175 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
176 const scoped_refptr<SequencedWorkerPool>& pool,
177 SequencedWorkerPool::SequenceToken token,
178 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
179 : pool_(pool),
180 token_(token),
181 shutdown_behavior_(shutdown_behavior) {
184 SequencedWorkerPoolSequencedTaskRunner::
185 ~SequencedWorkerPoolSequencedTaskRunner() {
188 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
189 const tracked_objects::Location& from_here,
190 const Closure& task,
191 TimeDelta delay) {
192 if (delay == TimeDelta()) {
193 return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
194 token_, from_here, task, shutdown_behavior_);
196 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
199 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
200 return pool_->IsRunningSequenceOnCurrentThread(token_);
203 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
204 const tracked_objects::Location& from_here,
205 const Closure& task,
206 TimeDelta delay) {
207 // There's no way to run nested tasks, so simply forward to
208 // PostDelayedTask.
209 return PostDelayedTask(from_here, task, delay);
212 // Create a process-wide unique ID to represent this task in trace events. This
213 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
214 // with MessageLoop pointers on other processes.
215 uint64 GetTaskTraceID(const SequencedTask& task,
216 void* pool) {
217 return (static_cast<uint64>(task.trace_id) << 32) |
218 static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
221 base::LazyInstance<base::ThreadLocalPointer<
222 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
223 LAZY_INSTANCE_INITIALIZER;
225 } // namespace
227 // Worker ---------------------------------------------------------------------
229 class SequencedWorkerPool::Worker : public SimpleThread {
230 public:
231 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
232 // around as long as we are running.
233 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
234 int thread_number,
235 const std::string& thread_name_prefix);
236 virtual ~Worker();
238 // SimpleThread implementation. This actually runs the background thread.
239 virtual void Run() OVERRIDE;
241 void set_running_task_info(SequenceToken token,
242 WorkerShutdown shutdown_behavior) {
243 running_sequence_ = token;
244 running_shutdown_behavior_ = shutdown_behavior;
247 SequenceToken running_sequence() const {
248 return running_sequence_;
251 WorkerShutdown running_shutdown_behavior() const {
252 return running_shutdown_behavior_;
255 private:
256 scoped_refptr<SequencedWorkerPool> worker_pool_;
257 SequenceToken running_sequence_;
258 WorkerShutdown running_shutdown_behavior_;
260 DISALLOW_COPY_AND_ASSIGN(Worker);
263 // Inner ----------------------------------------------------------------------
265 class SequencedWorkerPool::Inner {
266 public:
267 // Take a raw pointer to |worker| to avoid cycles (since we're owned
268 // by it).
269 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
270 const std::string& thread_name_prefix,
271 TestingObserver* observer);
273 ~Inner();
275 SequenceToken GetSequenceToken();
277 SequenceToken GetNamedSequenceToken(const std::string& name);
279 // This function accepts a name and an ID. If the name is null, the
280 // token ID is used. This allows us to implement the optional name lookup
281 // from a single function without having to enter the lock a separate time.
282 bool PostTask(const std::string* optional_token_name,
283 SequenceToken sequence_token,
284 WorkerShutdown shutdown_behavior,
285 const tracked_objects::Location& from_here,
286 const Closure& task,
287 TimeDelta delay);
289 bool RunsTasksOnCurrentThread() const;
291 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
293 void CleanupForTesting();
295 void SignalHasWorkForTesting();
297 int GetWorkSignalCountForTesting() const;
299 void Shutdown(int max_blocking_tasks_after_shutdown);
301 bool IsShutdownInProgress();
303 // Runs the worker loop on the background thread.
304 void ThreadLoop(Worker* this_worker);
306 private:
307 enum GetWorkStatus {
308 GET_WORK_FOUND,
309 GET_WORK_NOT_FOUND,
310 GET_WORK_WAIT,
313 enum CleanupState {
314 CLEANUP_REQUESTED,
315 CLEANUP_STARTING,
316 CLEANUP_RUNNING,
317 CLEANUP_FINISHING,
318 CLEANUP_DONE,
321 // Called from within the lock, this converts the given token name into a
322 // token ID, creating a new one if necessary.
323 int LockedGetNamedTokenID(const std::string& name);
325 // Called from within the lock, this returns the next sequence task number.
326 int64 LockedGetNextSequenceTaskNumber();
328 // Called from within the lock, returns the shutdown behavior of the task
329 // running on the currently executing worker thread. If invoked from a thread
330 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
331 WorkerShutdown LockedCurrentThreadShutdownBehavior() const;
333 // Gets new task. There are 3 cases depending on the return value:
335 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
336 // be run immediately.
337 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
338 // and |task| is not filled in. In this case, the caller should wait until
339 // a task is posted.
340 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
341 // immediately, and |task| is not filled in. Likewise, |wait_time| is
342 // filled in the time to wait until the next task to run. In this case, the
343 // caller should wait the time.
345 // In any case, the calling code should clear the given
346 // delete_these_outside_lock vector the next time the lock is released.
347 // See the implementation for a more detailed description.
348 GetWorkStatus GetWork(SequencedTask* task,
349 TimeDelta* wait_time,
350 std::vector<Closure>* delete_these_outside_lock);
352 void HandleCleanup();
354 // Peforms init and cleanup around running the given task. WillRun...
355 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
356 // The calling code should call FinishStartingAdditionalThread once the
357 // lock is released if the return values is nonzero.
358 int WillRunWorkerTask(const SequencedTask& task);
359 void DidRunWorkerTask(const SequencedTask& task);
361 // Returns true if there are no threads currently running the given
362 // sequence token.
363 bool IsSequenceTokenRunnable(int sequence_token_id) const;
365 // Checks if all threads are busy and the addition of one more could run an
366 // additional task waiting in the queue. This must be called from within
367 // the lock.
369 // If another thread is helpful, this will mark the thread as being in the
370 // process of starting and returns the index of the new thread which will be
371 // 0 or more. The caller should then call FinishStartingAdditionalThread to
372 // complete initialization once the lock is released.
374 // If another thread is not necessary, returne 0;
376 // See the implementedion for more.
377 int PrepareToStartAdditionalThreadIfHelpful();
379 // The second part of thread creation after
380 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
381 // generated. This actually creates the thread and should be called outside
382 // the lock to avoid blocking important work starting a thread in the lock.
383 void FinishStartingAdditionalThread(int thread_number);
385 // Signal |has_work_| and increment |has_work_signal_count_|.
386 void SignalHasWork();
388 // Checks whether there is work left that's blocking shutdown. Must be
389 // called inside the lock.
390 bool CanShutdown() const;
392 SequencedWorkerPool* const worker_pool_;
394 // The last sequence number used. Managed by GetSequenceToken, since this
395 // only does threadsafe increment operations, you do not need to hold the
396 // lock. This is class-static to make SequenceTokens issued by
397 // GetSequenceToken unique across SequencedWorkerPool instances.
398 static base::StaticAtomicSequenceNumber g_last_sequence_number_;
400 // This lock protects |everything in this class|. Do not read or modify
401 // anything without holding this lock. Do not block while holding this
402 // lock.
403 mutable Lock lock_;
405 // Condition variable that is waited on by worker threads until new
406 // tasks are posted or shutdown starts.
407 ConditionVariable has_work_cv_;
409 // Condition variable that is waited on by non-worker threads (in
410 // Shutdown()) until CanShutdown() goes to true.
411 ConditionVariable can_shutdown_cv_;
413 // The maximum number of worker threads we'll create.
414 const size_t max_threads_;
416 const std::string thread_name_prefix_;
418 // Associates all known sequence token names with their IDs.
419 std::map<std::string, int> named_sequence_tokens_;
421 // Owning pointers to all threads we've created so far, indexed by
422 // ID. Since we lazily create threads, this may be less than
423 // max_threads_ and will be initially empty.
424 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
425 ThreadMap threads_;
427 // Set to true when we're in the process of creating another thread.
428 // See PrepareToStartAdditionalThreadIfHelpful for more.
429 bool thread_being_created_;
431 // Number of threads currently waiting for work.
432 size_t waiting_thread_count_;
434 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
435 // or SKIP_ON_SHUTDOWN flag set.
436 size_t blocking_shutdown_thread_count_;
438 // A set of all pending tasks in time-to-run order. These are tasks that are
439 // either waiting for a thread to run on, waiting for their time to run,
440 // or blocked on a previous task in their sequence. We have to iterate over
441 // the tasks by time-to-run order, so we use the set instead of the
442 // traditional priority_queue.
443 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
444 PendingTaskSet pending_tasks_;
446 // The next sequence number for a new sequenced task.
447 int64 next_sequence_task_number_;
449 // Number of tasks in the pending_tasks_ list that are marked as blocking
450 // shutdown.
451 size_t blocking_shutdown_pending_task_count_;
453 // Lists all sequence tokens currently executing.
454 std::set<int> current_sequences_;
456 // An ID for each posted task to distinguish the task from others in traces.
457 int trace_id_;
459 // Set when Shutdown is called and no further tasks should be
460 // allowed, though we may still be running existing tasks.
461 bool shutdown_called_;
463 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
464 // has been called.
465 int max_blocking_tasks_after_shutdown_;
467 // State used to cleanup for testing, all guarded by lock_.
468 CleanupState cleanup_state_;
469 size_t cleanup_idlers_;
470 ConditionVariable cleanup_cv_;
472 TestingObserver* const testing_observer_;
474 DISALLOW_COPY_AND_ASSIGN(Inner);
477 // Worker definitions ---------------------------------------------------------
479 SequencedWorkerPool::Worker::Worker(
480 const scoped_refptr<SequencedWorkerPool>& worker_pool,
481 int thread_number,
482 const std::string& prefix)
483 : SimpleThread(
484 prefix + StringPrintf("Worker%d", thread_number).c_str()),
485 worker_pool_(worker_pool),
486 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
487 Start();
490 SequencedWorkerPool::Worker::~Worker() {
493 void SequencedWorkerPool::Worker::Run() {
494 // Store a pointer to the running sequence in thread local storage for
495 // static function access.
496 g_lazy_tls_ptr.Get().Set(&running_sequence_);
498 // Just jump back to the Inner object to run the thread, since it has all the
499 // tracking information and queues. It might be more natural to implement
500 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
501 // having these worker objects at all, but that method lacks the ability to
502 // send thread-specific information easily to the thread loop.
503 worker_pool_->inner_->ThreadLoop(this);
504 // Release our cyclic reference once we're done.
505 worker_pool_ = NULL;
508 // Inner definitions ---------------------------------------------------------
510 SequencedWorkerPool::Inner::Inner(
511 SequencedWorkerPool* worker_pool,
512 size_t max_threads,
513 const std::string& thread_name_prefix,
514 TestingObserver* observer)
515 : worker_pool_(worker_pool),
516 lock_(),
517 has_work_cv_(&lock_),
518 can_shutdown_cv_(&lock_),
519 max_threads_(max_threads),
520 thread_name_prefix_(thread_name_prefix),
521 thread_being_created_(false),
522 waiting_thread_count_(0),
523 blocking_shutdown_thread_count_(0),
524 next_sequence_task_number_(0),
525 blocking_shutdown_pending_task_count_(0),
526 trace_id_(0),
527 shutdown_called_(false),
528 max_blocking_tasks_after_shutdown_(0),
529 cleanup_state_(CLEANUP_DONE),
530 cleanup_idlers_(0),
531 cleanup_cv_(&lock_),
532 testing_observer_(observer) {}
534 SequencedWorkerPool::Inner::~Inner() {
535 // You must call Shutdown() before destroying the pool.
536 DCHECK(shutdown_called_);
538 // Need to explicitly join with the threads before they're destroyed or else
539 // they will be running when our object is half torn down.
540 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
541 it->second->Join();
542 threads_.clear();
544 if (testing_observer_)
545 testing_observer_->OnDestruct();
548 SequencedWorkerPool::SequenceToken
549 SequencedWorkerPool::Inner::GetSequenceToken() {
550 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
551 // is used as a sentinel value in SequenceTokens.
552 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
555 SequencedWorkerPool::SequenceToken
556 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
557 AutoLock lock(lock_);
558 return SequenceToken(LockedGetNamedTokenID(name));
561 bool SequencedWorkerPool::Inner::PostTask(
562 const std::string* optional_token_name,
563 SequenceToken sequence_token,
564 WorkerShutdown shutdown_behavior,
565 const tracked_objects::Location& from_here,
566 const Closure& task,
567 TimeDelta delay) {
568 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
569 SequencedTask sequenced(from_here);
570 sequenced.sequence_token_id = sequence_token.id_;
571 sequenced.shutdown_behavior = shutdown_behavior;
572 sequenced.posted_from = from_here;
573 sequenced.task =
574 shutdown_behavior == BLOCK_SHUTDOWN ?
575 base::MakeCriticalClosure(task) : task;
576 sequenced.time_to_run = TimeTicks::Now() + delay;
578 int create_thread_id = 0;
580 AutoLock lock(lock_);
581 if (shutdown_called_) {
582 if (shutdown_behavior != BLOCK_SHUTDOWN ||
583 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) {
584 return false;
586 if (max_blocking_tasks_after_shutdown_ <= 0) {
587 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
588 return false;
590 max_blocking_tasks_after_shutdown_ -= 1;
593 // The trace_id is used for identifying the task in about:tracing.
594 sequenced.trace_id = trace_id_++;
596 TRACE_EVENT_FLOW_BEGIN0("task", "SequencedWorkerPool::PostTask",
597 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))));
599 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
601 // Now that we have the lock, apply the named token rules.
602 if (optional_token_name)
603 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
605 pending_tasks_.insert(sequenced);
606 if (shutdown_behavior == BLOCK_SHUTDOWN)
607 blocking_shutdown_pending_task_count_++;
609 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
612 // Actually start the additional thread or signal an existing one now that
613 // we're outside the lock.
614 if (create_thread_id)
615 FinishStartingAdditionalThread(create_thread_id);
616 else
617 SignalHasWork();
619 return true;
622 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
623 AutoLock lock(lock_);
624 return ContainsKey(threads_, PlatformThread::CurrentId());
627 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
628 SequenceToken sequence_token) const {
629 AutoLock lock(lock_);
630 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
631 if (found == threads_.end())
632 return false;
633 return sequence_token.Equals(found->second->running_sequence());
636 // See https://code.google.com/p/chromium/issues/detail?id=168415
637 void SequencedWorkerPool::Inner::CleanupForTesting() {
638 DCHECK(!RunsTasksOnCurrentThread());
639 base::ThreadRestrictions::ScopedAllowWait allow_wait;
640 AutoLock lock(lock_);
641 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
642 if (shutdown_called_)
643 return;
644 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
645 return;
646 cleanup_state_ = CLEANUP_REQUESTED;
647 cleanup_idlers_ = 0;
648 has_work_cv_.Signal();
649 while (cleanup_state_ != CLEANUP_DONE)
650 cleanup_cv_.Wait();
653 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
654 SignalHasWork();
657 void SequencedWorkerPool::Inner::Shutdown(
658 int max_new_blocking_tasks_after_shutdown) {
659 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
661 AutoLock lock(lock_);
662 // Cleanup and Shutdown should not be called concurrently.
663 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
664 if (shutdown_called_)
665 return;
666 shutdown_called_ = true;
667 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
669 // Tickle the threads. This will wake up a waiting one so it will know that
670 // it can exit, which in turn will wake up any other waiting ones.
671 SignalHasWork();
673 // There are no pending or running tasks blocking shutdown, we're done.
674 if (CanShutdown())
675 return;
678 // If we're here, then something is blocking shutdown. So wait for
679 // CanShutdown() to go to true.
681 if (testing_observer_)
682 testing_observer_->WillWaitForShutdown();
684 #if !defined(OS_NACL)
685 TimeTicks shutdown_wait_begin = TimeTicks::Now();
686 #endif
689 base::ThreadRestrictions::ScopedAllowWait allow_wait;
690 AutoLock lock(lock_);
691 while (!CanShutdown())
692 can_shutdown_cv_.Wait();
694 #if !defined(OS_NACL)
695 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
696 TimeTicks::Now() - shutdown_wait_begin);
697 #endif
700 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
701 AutoLock lock(lock_);
702 return shutdown_called_;
705 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
707 AutoLock lock(lock_);
708 DCHECK(thread_being_created_);
709 thread_being_created_ = false;
710 std::pair<ThreadMap::iterator, bool> result =
711 threads_.insert(
712 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
713 DCHECK(result.second);
715 while (true) {
716 #if defined(OS_MACOSX)
717 base::mac::ScopedNSAutoreleasePool autorelease_pool;
718 #endif
720 HandleCleanup();
722 // See GetWork for what delete_these_outside_lock is doing.
723 SequencedTask task;
724 TimeDelta wait_time;
725 std::vector<Closure> delete_these_outside_lock;
726 GetWorkStatus status =
727 GetWork(&task, &wait_time, &delete_these_outside_lock);
728 if (status == GET_WORK_FOUND) {
729 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask",
730 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
731 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop",
732 "src_file", task.posted_from.file_name(),
733 "src_func", task.posted_from.function_name());
734 int new_thread_id = WillRunWorkerTask(task);
736 AutoUnlock unlock(lock_);
737 // There may be more work available, so wake up another
738 // worker thread. (Technically not required, since we
739 // already get a signal for each new task, but it doesn't
740 // hurt.)
741 SignalHasWork();
742 delete_these_outside_lock.clear();
744 // Complete thread creation outside the lock if necessary.
745 if (new_thread_id)
746 FinishStartingAdditionalThread(new_thread_id);
748 this_worker->set_running_task_info(
749 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
751 tracked_objects::TrackedTime start_time =
752 tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally);
754 task.task.Run();
756 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task,
757 start_time, tracked_objects::ThreadData::NowForEndOfRun());
759 // Make sure our task is erased outside the lock for the
760 // same reason we do this with delete_these_oustide_lock.
761 // Also, do it before calling set_running_task_info() so
762 // that sequence-checking from within the task's destructor
763 // still works.
764 task.task = Closure();
766 this_worker->set_running_task_info(
767 SequenceToken(), CONTINUE_ON_SHUTDOWN);
769 DidRunWorkerTask(task); // Must be done inside the lock.
770 } else if (cleanup_state_ == CLEANUP_RUNNING) {
771 switch (status) {
772 case GET_WORK_WAIT: {
773 AutoUnlock unlock(lock_);
774 delete_these_outside_lock.clear();
776 break;
777 case GET_WORK_NOT_FOUND:
778 CHECK(delete_these_outside_lock.empty());
779 cleanup_state_ = CLEANUP_FINISHING;
780 cleanup_cv_.Broadcast();
781 break;
782 default:
783 NOTREACHED();
785 } else {
786 // When we're terminating and there's no more work, we can
787 // shut down, other workers can complete any pending or new tasks.
788 // We can get additional tasks posted after shutdown_called_ is set
789 // but only worker threads are allowed to post tasks at that time, and
790 // the workers responsible for posting those tasks will be available
791 // to run them. Also, there may be some tasks stuck behind running
792 // ones with the same sequence token, but additional threads won't
793 // help this case.
794 if (shutdown_called_ &&
795 blocking_shutdown_pending_task_count_ == 0)
796 break;
797 waiting_thread_count_++;
799 switch (status) {
800 case GET_WORK_NOT_FOUND:
801 has_work_cv_.Wait();
802 break;
803 case GET_WORK_WAIT:
804 has_work_cv_.TimedWait(wait_time);
805 break;
806 default:
807 NOTREACHED();
809 waiting_thread_count_--;
812 } // Release lock_.
814 // We noticed we should exit. Wake up the next worker so it knows it should
815 // exit as well (because the Shutdown() code only signals once).
816 SignalHasWork();
818 // Possibly unblock shutdown.
819 can_shutdown_cv_.Signal();
822 void SequencedWorkerPool::Inner::HandleCleanup() {
823 lock_.AssertAcquired();
824 if (cleanup_state_ == CLEANUP_DONE)
825 return;
826 if (cleanup_state_ == CLEANUP_REQUESTED) {
827 // We win, we get to do the cleanup as soon as the others wise up and idle.
828 cleanup_state_ = CLEANUP_STARTING;
829 while (thread_being_created_ ||
830 cleanup_idlers_ != threads_.size() - 1) {
831 has_work_cv_.Signal();
832 cleanup_cv_.Wait();
834 cleanup_state_ = CLEANUP_RUNNING;
835 return;
837 if (cleanup_state_ == CLEANUP_STARTING) {
838 // Another worker thread is cleaning up, we idle here until thats done.
839 ++cleanup_idlers_;
840 cleanup_cv_.Broadcast();
841 while (cleanup_state_ != CLEANUP_FINISHING) {
842 cleanup_cv_.Wait();
844 --cleanup_idlers_;
845 cleanup_cv_.Broadcast();
846 return;
848 if (cleanup_state_ == CLEANUP_FINISHING) {
849 // We wait for all idlers to wake up prior to being DONE.
850 while (cleanup_idlers_ != 0) {
851 cleanup_cv_.Broadcast();
852 cleanup_cv_.Wait();
854 if (cleanup_state_ == CLEANUP_FINISHING) {
855 cleanup_state_ = CLEANUP_DONE;
856 cleanup_cv_.Signal();
858 return;
862 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
863 const std::string& name) {
864 lock_.AssertAcquired();
865 DCHECK(!name.empty());
867 std::map<std::string, int>::const_iterator found =
868 named_sequence_tokens_.find(name);
869 if (found != named_sequence_tokens_.end())
870 return found->second; // Got an existing one.
872 // Create a new one for this name.
873 SequenceToken result = GetSequenceToken();
874 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
875 return result.id_;
878 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
879 lock_.AssertAcquired();
880 // We assume that we never create enough tasks to wrap around.
881 return next_sequence_task_number_++;
884 SequencedWorkerPool::WorkerShutdown
885 SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
886 lock_.AssertAcquired();
887 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
888 if (found == threads_.end())
889 return CONTINUE_ON_SHUTDOWN;
890 return found->second->running_shutdown_behavior();
893 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
894 SequencedTask* task,
895 TimeDelta* wait_time,
896 std::vector<Closure>* delete_these_outside_lock) {
897 lock_.AssertAcquired();
899 #if !defined(OS_NACL)
900 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
901 static_cast<int>(pending_tasks_.size()));
902 #endif
904 // Find the next task with a sequence token that's not currently in use.
905 // If the token is in use, that means another thread is running something
906 // in that sequence, and we can't run it without going out-of-order.
908 // This algorithm is simple and fair, but inefficient in some cases. For
909 // example, say somebody schedules 1000 slow tasks with the same sequence
910 // number. We'll have to go through all those tasks each time we feel like
911 // there might be work to schedule. If this proves to be a problem, we
912 // should make this more efficient.
914 // One possible enhancement would be to keep a map from sequence ID to a
915 // list of pending but currently blocked SequencedTasks for that ID.
916 // When a worker finishes a task of one sequence token, it can pick up the
917 // next one from that token right away.
919 // This may lead to starvation if there are sufficient numbers of sequences
920 // in use. To alleviate this, we could add an incrementing priority counter
921 // to each SequencedTask. Then maintain a priority_queue of all runnable
922 // tasks, sorted by priority counter. When a sequenced task is completed
923 // we would pop the head element off of that tasks pending list and add it
924 // to the priority queue. Then we would run the first item in the priority
925 // queue.
927 GetWorkStatus status = GET_WORK_NOT_FOUND;
928 int unrunnable_tasks = 0;
929 PendingTaskSet::iterator i = pending_tasks_.begin();
930 // We assume that the loop below doesn't take too long and so we can just do
931 // a single call to TimeTicks::Now().
932 const TimeTicks current_time = TimeTicks::Now();
933 while (i != pending_tasks_.end()) {
934 if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
935 unrunnable_tasks++;
936 ++i;
937 continue;
940 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
941 // We're shutting down and the task we just found isn't blocking
942 // shutdown. Delete it and get more work.
944 // Note that we do not want to delete unrunnable tasks. Deleting a task
945 // can have side effects (like freeing some objects) and deleting a
946 // task that's supposed to run after one that's currently running could
947 // cause an obscure crash.
949 // We really want to delete these tasks outside the lock in case the
950 // closures are holding refs to objects that want to post work from
951 // their destructorss (which would deadlock). The closures are
952 // internally refcounted, so we just need to keep a copy of them alive
953 // until the lock is exited. The calling code can just clear() the
954 // vector they passed to us once the lock is exited to make this
955 // happen.
956 delete_these_outside_lock->push_back(i->task);
957 pending_tasks_.erase(i++);
958 continue;
961 if (i->time_to_run > current_time) {
962 // The time to run has not come yet.
963 *wait_time = i->time_to_run - current_time;
964 status = GET_WORK_WAIT;
965 if (cleanup_state_ == CLEANUP_RUNNING) {
966 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
967 delete_these_outside_lock->push_back(i->task);
968 pending_tasks_.erase(i);
970 break;
973 // Found a runnable task.
974 *task = *i;
975 pending_tasks_.erase(i);
976 if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
977 blocking_shutdown_pending_task_count_--;
980 status = GET_WORK_FOUND;
981 break;
984 // Track the number of tasks we had to skip over to see if we should be
985 // making this more efficient. If this number ever becomes large or is
986 // frequently "some", we should consider the optimization above.
987 #if !defined(OS_NACL)
988 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
989 unrunnable_tasks);
990 #endif
991 return status;
994 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
995 lock_.AssertAcquired();
997 // Mark the task's sequence number as in use.
998 if (task.sequence_token_id)
999 current_sequences_.insert(task.sequence_token_id);
1001 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1002 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1003 // completes.
1004 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
1005 blocking_shutdown_thread_count_++;
1007 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1008 // creates a new thread if there is no free one, there is a race when posting
1009 // tasks that many tasks could have been posted before a thread started
1010 // running them, so only one thread would have been created. So we also check
1011 // whether we should create more threads after removing our task from the
1012 // queue, which also has the nice side effect of creating the workers from
1013 // background threads rather than the main thread of the app.
1015 // If another thread wasn't created, we want to wake up an existing thread
1016 // if there is one waiting to pick up the next task.
1018 // Note that we really need to do this *before* running the task, not
1019 // after. Otherwise, if more than one task is posted, the creation of the
1020 // second thread (since we only create one at a time) will be blocked by
1021 // the execution of the first task, which could be arbitrarily long.
1022 return PrepareToStartAdditionalThreadIfHelpful();
1025 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1026 lock_.AssertAcquired();
1028 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1029 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1030 blocking_shutdown_thread_count_--;
1033 if (task.sequence_token_id)
1034 current_sequences_.erase(task.sequence_token_id);
1037 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1038 int sequence_token_id) const {
1039 lock_.AssertAcquired();
1040 return !sequence_token_id ||
1041 current_sequences_.find(sequence_token_id) ==
1042 current_sequences_.end();
1045 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1046 lock_.AssertAcquired();
1047 // How thread creation works:
1049 // We'de like to avoid creating threads with the lock held. However, we
1050 // need to be sure that we have an accurate accounting of the threads for
1051 // proper Joining and deltion on shutdown.
1053 // We need to figure out if we need another thread with the lock held, which
1054 // is what this function does. It then marks us as in the process of creating
1055 // a thread. When we do shutdown, we wait until the thread_being_created_
1056 // flag is cleared, which ensures that the new thread is properly added to
1057 // all the data structures and we can't leak it. Once shutdown starts, we'll
1058 // refuse to create more threads or they would be leaked.
1060 // Note that this creates a mostly benign race condition on shutdown that
1061 // will cause fewer workers to be created than one would expect. It isn't
1062 // much of an issue in real life, but affects some tests. Since we only spawn
1063 // one worker at a time, the following sequence of events can happen:
1065 // 1. Main thread posts a bunch of unrelated tasks that would normally be
1066 // run on separate threads.
1067 // 2. The first task post causes us to start a worker. Other tasks do not
1068 // cause a worker to start since one is pending.
1069 // 3. Main thread initiates shutdown.
1070 // 4. No more threads are created since the shutdown_called_ flag is set.
1072 // The result is that one may expect that max_threads_ workers to be created
1073 // given the workload, but in reality fewer may be created because the
1074 // sequence of thread creation on the background threads is racing with the
1075 // shutdown call.
1076 if (!shutdown_called_ &&
1077 !thread_being_created_ &&
1078 cleanup_state_ == CLEANUP_DONE &&
1079 threads_.size() < max_threads_ &&
1080 waiting_thread_count_ == 0) {
1081 // We could use an additional thread if there's work to be done.
1082 for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1083 i != pending_tasks_.end(); ++i) {
1084 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1085 // Found a runnable task, mark the thread as being started.
1086 thread_being_created_ = true;
1087 return static_cast<int>(threads_.size() + 1);
1091 return 0;
1094 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1095 int thread_number) {
1096 // Called outside of the lock.
1097 DCHECK(thread_number > 0);
1099 // The worker is assigned to the list when the thread actually starts, which
1100 // will manage the memory of the pointer.
1101 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1104 void SequencedWorkerPool::Inner::SignalHasWork() {
1105 has_work_cv_.Signal();
1106 if (testing_observer_) {
1107 testing_observer_->OnHasWork();
1111 bool SequencedWorkerPool::Inner::CanShutdown() const {
1112 lock_.AssertAcquired();
1113 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1114 return !thread_being_created_ &&
1115 blocking_shutdown_thread_count_ == 0 &&
1116 blocking_shutdown_pending_task_count_ == 0;
1119 base::StaticAtomicSequenceNumber
1120 SequencedWorkerPool::Inner::g_last_sequence_number_;
1122 // SequencedWorkerPool --------------------------------------------------------
1124 // static
1125 SequencedWorkerPool::SequenceToken
1126 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1127 // Don't construct lazy instance on check.
1128 if (g_lazy_tls_ptr == NULL)
1129 return SequenceToken();
1131 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
1132 if (!token)
1133 return SequenceToken();
1134 return *token;
1137 SequencedWorkerPool::SequencedWorkerPool(
1138 size_t max_threads,
1139 const std::string& thread_name_prefix)
1140 : constructor_message_loop_(MessageLoopProxy::current()),
1141 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1144 SequencedWorkerPool::SequencedWorkerPool(
1145 size_t max_threads,
1146 const std::string& thread_name_prefix,
1147 TestingObserver* observer)
1148 : constructor_message_loop_(MessageLoopProxy::current()),
1149 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1152 SequencedWorkerPool::~SequencedWorkerPool() {}
1154 void SequencedWorkerPool::OnDestruct() const {
1155 DCHECK(constructor_message_loop_.get());
1156 // Avoid deleting ourselves on a worker thread (which would
1157 // deadlock).
1158 if (RunsTasksOnCurrentThread()) {
1159 constructor_message_loop_->DeleteSoon(FROM_HERE, this);
1160 } else {
1161 delete this;
1165 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1166 return inner_->GetSequenceToken();
1169 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1170 const std::string& name) {
1171 return inner_->GetNamedSequenceToken(name);
1174 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1175 SequenceToken token) {
1176 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1179 scoped_refptr<SequencedTaskRunner>
1180 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1181 SequenceToken token, WorkerShutdown shutdown_behavior) {
1182 return new SequencedWorkerPoolSequencedTaskRunner(
1183 this, token, shutdown_behavior);
1186 scoped_refptr<TaskRunner>
1187 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1188 WorkerShutdown shutdown_behavior) {
1189 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1192 bool SequencedWorkerPool::PostWorkerTask(
1193 const tracked_objects::Location& from_here,
1194 const Closure& task) {
1195 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1196 from_here, task, TimeDelta());
1199 bool SequencedWorkerPool::PostDelayedWorkerTask(
1200 const tracked_objects::Location& from_here,
1201 const Closure& task,
1202 TimeDelta delay) {
1203 WorkerShutdown shutdown_behavior =
1204 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1205 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1206 from_here, task, delay);
1209 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1210 const tracked_objects::Location& from_here,
1211 const Closure& task,
1212 WorkerShutdown shutdown_behavior) {
1213 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1214 from_here, task, TimeDelta());
1217 bool SequencedWorkerPool::PostSequencedWorkerTask(
1218 SequenceToken sequence_token,
1219 const tracked_objects::Location& from_here,
1220 const Closure& task) {
1221 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1222 from_here, task, TimeDelta());
1225 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1226 SequenceToken sequence_token,
1227 const tracked_objects::Location& from_here,
1228 const Closure& task,
1229 TimeDelta delay) {
1230 WorkerShutdown shutdown_behavior =
1231 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1232 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1233 from_here, task, delay);
1236 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1237 const std::string& token_name,
1238 const tracked_objects::Location& from_here,
1239 const Closure& task) {
1240 DCHECK(!token_name.empty());
1241 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1242 from_here, task, TimeDelta());
1245 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1246 SequenceToken sequence_token,
1247 const tracked_objects::Location& from_here,
1248 const Closure& task,
1249 WorkerShutdown shutdown_behavior) {
1250 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1251 from_here, task, TimeDelta());
1254 bool SequencedWorkerPool::PostDelayedTask(
1255 const tracked_objects::Location& from_here,
1256 const Closure& task,
1257 TimeDelta delay) {
1258 return PostDelayedWorkerTask(from_here, task, delay);
1261 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1262 return inner_->RunsTasksOnCurrentThread();
1265 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1266 SequenceToken sequence_token) const {
1267 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1270 void SequencedWorkerPool::FlushForTesting() {
1271 inner_->CleanupForTesting();
1274 void SequencedWorkerPool::SignalHasWorkForTesting() {
1275 inner_->SignalHasWorkForTesting();
1278 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1279 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1280 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1283 bool SequencedWorkerPool::IsShutdownInProgress() {
1284 return inner_->IsShutdownInProgress();
1287 } // namespace base