1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
13 #include "base/atomic_sequence_num.h"
14 #include "base/callback.h"
15 #include "base/compiler_specific.h"
16 #include "base/critical_closure.h"
17 #include "base/lazy_instance.h"
18 #include "base/logging.h"
19 #include "base/memory/linked_ptr.h"
20 #include "base/stl_util.h"
21 #include "base/strings/stringprintf.h"
22 #include "base/synchronization/condition_variable.h"
23 #include "base/synchronization/lock.h"
24 #include "base/thread_task_runner_handle.h"
25 #include "base/threading/platform_thread.h"
26 #include "base/threading/simple_thread.h"
27 #include "base/threading/thread_local.h"
28 #include "base/threading/thread_restrictions.h"
29 #include "base/time/time.h"
30 #include "base/trace_event/trace_event.h"
31 #include "base/tracked_objects.h"
33 #if defined(OS_MACOSX)
34 #include "base/mac/scoped_nsautorelease_pool.h"
36 #include "base/win/scoped_com_initializer.h"
40 #include "base/metrics/histogram.h"
47 struct SequencedTask
: public TrackingInfo
{
49 : sequence_token_id(0),
51 sequence_task_number(0),
52 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN
) {}
54 explicit SequencedTask(const tracked_objects::Location
& from_here
)
55 : base::TrackingInfo(from_here
, TimeTicks()),
58 sequence_task_number(0),
59 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN
) {}
63 int sequence_token_id
;
65 int64 sequence_task_number
;
66 SequencedWorkerPool::WorkerShutdown shutdown_behavior
;
67 tracked_objects::Location posted_from
;
70 // Non-delayed tasks and delayed tasks are managed together by time-to-run
71 // order. We calculate the time by adding the posted time and the given delay.
72 TimeTicks time_to_run
;
75 struct SequencedTaskLessThan
{
77 bool operator()(const SequencedTask
& lhs
, const SequencedTask
& rhs
) const {
78 if (lhs
.time_to_run
< rhs
.time_to_run
)
81 if (lhs
.time_to_run
> rhs
.time_to_run
)
84 // If the time happen to match, then we use the sequence number to decide.
85 return lhs
.sequence_task_number
< rhs
.sequence_task_number
;
89 // SequencedWorkerPoolTaskRunner ---------------------------------------------
90 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
91 // fixed ShutdownBehavior.
93 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
94 class SequencedWorkerPoolTaskRunner
: public TaskRunner
{
96 SequencedWorkerPoolTaskRunner(
97 const scoped_refptr
<SequencedWorkerPool
>& pool
,
98 SequencedWorkerPool::WorkerShutdown shutdown_behavior
);
100 // TaskRunner implementation
101 bool PostDelayedTask(const tracked_objects::Location
& from_here
,
103 TimeDelta delay
) override
;
104 bool RunsTasksOnCurrentThread() const override
;
107 ~SequencedWorkerPoolTaskRunner() override
;
109 const scoped_refptr
<SequencedWorkerPool
> pool_
;
111 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_
;
113 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner
);
116 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
117 const scoped_refptr
<SequencedWorkerPool
>& pool
,
118 SequencedWorkerPool::WorkerShutdown shutdown_behavior
)
120 shutdown_behavior_(shutdown_behavior
) {
123 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
126 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
127 const tracked_objects::Location
& from_here
,
130 if (delay
== TimeDelta()) {
131 return pool_
->PostWorkerTaskWithShutdownBehavior(
132 from_here
, task
, shutdown_behavior_
);
134 return pool_
->PostDelayedWorkerTask(from_here
, task
, delay
);
137 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
138 return pool_
->RunsTasksOnCurrentThread();
141 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
142 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
143 // fixed sequence token.
145 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
146 class SequencedWorkerPoolSequencedTaskRunner
: public SequencedTaskRunner
{
148 SequencedWorkerPoolSequencedTaskRunner(
149 const scoped_refptr
<SequencedWorkerPool
>& pool
,
150 SequencedWorkerPool::SequenceToken token
,
151 SequencedWorkerPool::WorkerShutdown shutdown_behavior
);
153 // TaskRunner implementation
154 bool PostDelayedTask(const tracked_objects::Location
& from_here
,
156 TimeDelta delay
) override
;
157 bool RunsTasksOnCurrentThread() const override
;
159 // SequencedTaskRunner implementation
160 bool PostNonNestableDelayedTask(const tracked_objects::Location
& from_here
,
162 TimeDelta delay
) override
;
165 ~SequencedWorkerPoolSequencedTaskRunner() override
;
167 const scoped_refptr
<SequencedWorkerPool
> pool_
;
169 const SequencedWorkerPool::SequenceToken token_
;
171 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_
;
173 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner
);
176 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
177 const scoped_refptr
<SequencedWorkerPool
>& pool
,
178 SequencedWorkerPool::SequenceToken token
,
179 SequencedWorkerPool::WorkerShutdown shutdown_behavior
)
182 shutdown_behavior_(shutdown_behavior
) {
185 SequencedWorkerPoolSequencedTaskRunner::
186 ~SequencedWorkerPoolSequencedTaskRunner() {
189 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
190 const tracked_objects::Location
& from_here
,
193 if (delay
== TimeDelta()) {
194 return pool_
->PostSequencedWorkerTaskWithShutdownBehavior(
195 token_
, from_here
, task
, shutdown_behavior_
);
197 return pool_
->PostDelayedSequencedWorkerTask(token_
, from_here
, task
, delay
);
200 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
201 return pool_
->IsRunningSequenceOnCurrentThread(token_
);
204 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
205 const tracked_objects::Location
& from_here
,
208 // There's no way to run nested tasks, so simply forward to
210 return PostDelayedTask(from_here
, task
, delay
);
213 // Create a process-wide unique ID to represent this task in trace events. This
214 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
215 // with MessageLoop pointers on other processes.
216 uint64
GetTaskTraceID(const SequencedTask
& task
,
218 return (static_cast<uint64
>(task
.trace_id
) << 32) |
219 static_cast<uint64
>(reinterpret_cast<intptr_t>(pool
));
222 base::LazyInstance
<base::ThreadLocalPointer
<
223 SequencedWorkerPool::SequenceToken
> >::Leaky g_lazy_tls_ptr
=
224 LAZY_INSTANCE_INITIALIZER
;
228 // Worker ---------------------------------------------------------------------
230 class SequencedWorkerPool::Worker
: public SimpleThread
{
232 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
233 // around as long as we are running.
234 Worker(const scoped_refptr
<SequencedWorkerPool
>& worker_pool
,
236 const std::string
& thread_name_prefix
);
239 // SimpleThread implementation. This actually runs the background thread.
242 // Indicates that a task is about to be run. The parameters provide
243 // additional metainformation about the task being run.
244 void set_running_task_info(SequenceToken token
,
245 WorkerShutdown shutdown_behavior
) {
246 is_processing_task_
= true;
247 task_sequence_token_
= token
;
248 task_shutdown_behavior_
= shutdown_behavior
;
251 // Indicates that the task has finished running.
252 void reset_running_task_info() { is_processing_task_
= false; }
254 // Whether the worker is processing a task.
255 bool is_processing_task() { return is_processing_task_
; }
257 SequenceToken
task_sequence_token() const {
258 DCHECK(is_processing_task_
);
259 return task_sequence_token_
;
262 WorkerShutdown
task_shutdown_behavior() const {
263 DCHECK(is_processing_task_
);
264 return task_shutdown_behavior_
;
268 scoped_refptr
<SequencedWorkerPool
> worker_pool_
;
269 // The sequence token of the task being processed. Only valid when
270 // is_processing_task_ is true.
271 SequenceToken task_sequence_token_
;
272 // The shutdown behavior of the task being processed. Only valid when
273 // is_processing_task_ is true.
274 WorkerShutdown task_shutdown_behavior_
;
275 // Whether the Worker is processing a task.
276 bool is_processing_task_
;
278 DISALLOW_COPY_AND_ASSIGN(Worker
);
281 // Inner ----------------------------------------------------------------------
283 class SequencedWorkerPool::Inner
{
285 // Take a raw pointer to |worker| to avoid cycles (since we're owned
287 Inner(SequencedWorkerPool
* worker_pool
, size_t max_threads
,
288 const std::string
& thread_name_prefix
,
289 TestingObserver
* observer
);
293 SequenceToken
GetSequenceToken();
295 SequenceToken
GetNamedSequenceToken(const std::string
& name
);
297 // This function accepts a name and an ID. If the name is null, the
298 // token ID is used. This allows us to implement the optional name lookup
299 // from a single function without having to enter the lock a separate time.
300 bool PostTask(const std::string
* optional_token_name
,
301 SequenceToken sequence_token
,
302 WorkerShutdown shutdown_behavior
,
303 const tracked_objects::Location
& from_here
,
307 bool RunsTasksOnCurrentThread() const;
309 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token
) const;
311 void CleanupForTesting();
313 void SignalHasWorkForTesting();
315 int GetWorkSignalCountForTesting() const;
317 void Shutdown(int max_blocking_tasks_after_shutdown
);
319 bool IsShutdownInProgress();
321 // Runs the worker loop on the background thread.
322 void ThreadLoop(Worker
* this_worker
);
339 // Called from within the lock, this converts the given token name into a
340 // token ID, creating a new one if necessary.
341 int LockedGetNamedTokenID(const std::string
& name
);
343 // Called from within the lock, this returns the next sequence task number.
344 int64
LockedGetNextSequenceTaskNumber();
346 // Gets new task. There are 3 cases depending on the return value:
348 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
349 // be run immediately.
350 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
351 // and |task| is not filled in. In this case, the caller should wait until
353 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
354 // immediately, and |task| is not filled in. Likewise, |wait_time| is
355 // filled in the time to wait until the next task to run. In this case, the
356 // caller should wait the time.
358 // In any case, the calling code should clear the given
359 // delete_these_outside_lock vector the next time the lock is released.
360 // See the implementation for a more detailed description.
361 GetWorkStatus
GetWork(SequencedTask
* task
,
362 TimeDelta
* wait_time
,
363 std::vector
<Closure
>* delete_these_outside_lock
);
365 void HandleCleanup();
367 // Peforms init and cleanup around running the given task. WillRun...
368 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
369 // The calling code should call FinishStartingAdditionalThread once the
370 // lock is released if the return values is nonzero.
371 int WillRunWorkerTask(const SequencedTask
& task
);
372 void DidRunWorkerTask(const SequencedTask
& task
);
374 // Returns true if there are no threads currently running the given
376 bool IsSequenceTokenRunnable(int sequence_token_id
) const;
378 // Checks if all threads are busy and the addition of one more could run an
379 // additional task waiting in the queue. This must be called from within
382 // If another thread is helpful, this will mark the thread as being in the
383 // process of starting and returns the index of the new thread which will be
384 // 0 or more. The caller should then call FinishStartingAdditionalThread to
385 // complete initialization once the lock is released.
387 // If another thread is not necessary, returne 0;
389 // See the implementedion for more.
390 int PrepareToStartAdditionalThreadIfHelpful();
392 // The second part of thread creation after
393 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
394 // generated. This actually creates the thread and should be called outside
395 // the lock to avoid blocking important work starting a thread in the lock.
396 void FinishStartingAdditionalThread(int thread_number
);
398 // Signal |has_work_| and increment |has_work_signal_count_|.
399 void SignalHasWork();
401 // Checks whether there is work left that's blocking shutdown. Must be
402 // called inside the lock.
403 bool CanShutdown() const;
405 SequencedWorkerPool
* const worker_pool_
;
407 // The last sequence number used. Managed by GetSequenceToken, since this
408 // only does threadsafe increment operations, you do not need to hold the
409 // lock. This is class-static to make SequenceTokens issued by
410 // GetSequenceToken unique across SequencedWorkerPool instances.
411 static base::StaticAtomicSequenceNumber g_last_sequence_number_
;
413 // This lock protects |everything in this class|. Do not read or modify
414 // anything without holding this lock. Do not block while holding this
418 // Condition variable that is waited on by worker threads until new
419 // tasks are posted or shutdown starts.
420 ConditionVariable has_work_cv_
;
422 // Condition variable that is waited on by non-worker threads (in
423 // Shutdown()) until CanShutdown() goes to true.
424 ConditionVariable can_shutdown_cv_
;
426 // The maximum number of worker threads we'll create.
427 const size_t max_threads_
;
429 const std::string thread_name_prefix_
;
431 // Associates all known sequence token names with their IDs.
432 std::map
<std::string
, int> named_sequence_tokens_
;
434 // Owning pointers to all threads we've created so far, indexed by
435 // ID. Since we lazily create threads, this may be less than
436 // max_threads_ and will be initially empty.
437 typedef std::map
<PlatformThreadId
, linked_ptr
<Worker
> > ThreadMap
;
440 // Set to true when we're in the process of creating another thread.
441 // See PrepareToStartAdditionalThreadIfHelpful for more.
442 bool thread_being_created_
;
444 // Number of threads currently waiting for work.
445 size_t waiting_thread_count_
;
447 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
448 // or SKIP_ON_SHUTDOWN flag set.
449 size_t blocking_shutdown_thread_count_
;
451 // A set of all pending tasks in time-to-run order. These are tasks that are
452 // either waiting for a thread to run on, waiting for their time to run,
453 // or blocked on a previous task in their sequence. We have to iterate over
454 // the tasks by time-to-run order, so we use the set instead of the
455 // traditional priority_queue.
456 typedef std::set
<SequencedTask
, SequencedTaskLessThan
> PendingTaskSet
;
457 PendingTaskSet pending_tasks_
;
459 // The next sequence number for a new sequenced task.
460 int64 next_sequence_task_number_
;
462 // Number of tasks in the pending_tasks_ list that are marked as blocking
464 size_t blocking_shutdown_pending_task_count_
;
466 // Lists all sequence tokens currently executing.
467 std::set
<int> current_sequences_
;
469 // An ID for each posted task to distinguish the task from others in traces.
472 // Set when Shutdown is called and no further tasks should be
473 // allowed, though we may still be running existing tasks.
474 bool shutdown_called_
;
476 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
478 int max_blocking_tasks_after_shutdown_
;
480 // State used to cleanup for testing, all guarded by lock_.
481 CleanupState cleanup_state_
;
482 size_t cleanup_idlers_
;
483 ConditionVariable cleanup_cv_
;
485 TestingObserver
* const testing_observer_
;
487 DISALLOW_COPY_AND_ASSIGN(Inner
);
490 // Worker definitions ---------------------------------------------------------
492 SequencedWorkerPool::Worker::Worker(
493 const scoped_refptr
<SequencedWorkerPool
>& worker_pool
,
495 const std::string
& prefix
)
496 : SimpleThread(prefix
+ StringPrintf("Worker%d", thread_number
)),
497 worker_pool_(worker_pool
),
498 task_shutdown_behavior_(BLOCK_SHUTDOWN
),
499 is_processing_task_(false) {
503 SequencedWorkerPool::Worker::~Worker() {
506 void SequencedWorkerPool::Worker::Run() {
508 win::ScopedCOMInitializer com_initializer
;
511 // Store a pointer to the running sequence in thread local storage for
512 // static function access.
513 g_lazy_tls_ptr
.Get().Set(&task_sequence_token_
);
515 // Just jump back to the Inner object to run the thread, since it has all the
516 // tracking information and queues. It might be more natural to implement
517 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
518 // having these worker objects at all, but that method lacks the ability to
519 // send thread-specific information easily to the thread loop.
520 worker_pool_
->inner_
->ThreadLoop(this);
521 // Release our cyclic reference once we're done.
525 // Inner definitions ---------------------------------------------------------
527 SequencedWorkerPool::Inner::Inner(
528 SequencedWorkerPool
* worker_pool
,
530 const std::string
& thread_name_prefix
,
531 TestingObserver
* observer
)
532 : worker_pool_(worker_pool
),
534 has_work_cv_(&lock_
),
535 can_shutdown_cv_(&lock_
),
536 max_threads_(max_threads
),
537 thread_name_prefix_(thread_name_prefix
),
538 thread_being_created_(false),
539 waiting_thread_count_(0),
540 blocking_shutdown_thread_count_(0),
541 next_sequence_task_number_(0),
542 blocking_shutdown_pending_task_count_(0),
544 shutdown_called_(false),
545 max_blocking_tasks_after_shutdown_(0),
546 cleanup_state_(CLEANUP_DONE
),
549 testing_observer_(observer
) {}
551 SequencedWorkerPool::Inner::~Inner() {
552 // You must call Shutdown() before destroying the pool.
553 DCHECK(shutdown_called_
);
555 // Need to explicitly join with the threads before they're destroyed or else
556 // they will be running when our object is half torn down.
557 for (ThreadMap::iterator it
= threads_
.begin(); it
!= threads_
.end(); ++it
)
561 if (testing_observer_
)
562 testing_observer_
->OnDestruct();
565 SequencedWorkerPool::SequenceToken
566 SequencedWorkerPool::Inner::GetSequenceToken() {
567 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
568 // is used as a sentinel value in SequenceTokens.
569 return SequenceToken(g_last_sequence_number_
.GetNext() + 1);
572 SequencedWorkerPool::SequenceToken
573 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string
& name
) {
574 AutoLock
lock(lock_
);
575 return SequenceToken(LockedGetNamedTokenID(name
));
578 bool SequencedWorkerPool::Inner::PostTask(
579 const std::string
* optional_token_name
,
580 SequenceToken sequence_token
,
581 WorkerShutdown shutdown_behavior
,
582 const tracked_objects::Location
& from_here
,
585 DCHECK(delay
== TimeDelta() || shutdown_behavior
== SKIP_ON_SHUTDOWN
);
586 SequencedTask
sequenced(from_here
);
587 sequenced
.sequence_token_id
= sequence_token
.id_
;
588 sequenced
.shutdown_behavior
= shutdown_behavior
;
589 sequenced
.posted_from
= from_here
;
591 shutdown_behavior
== BLOCK_SHUTDOWN
?
592 base::MakeCriticalClosure(task
) : task
;
593 sequenced
.time_to_run
= TimeTicks::Now() + delay
;
595 int create_thread_id
= 0;
597 AutoLock
lock(lock_
);
598 if (shutdown_called_
) {
599 // Don't allow a new task to be posted if it doesn't block shutdown.
600 if (shutdown_behavior
!= BLOCK_SHUTDOWN
)
603 // If the current thread is running a task, and that task doesn't block
604 // shutdown, then it shouldn't be allowed to post any more tasks.
605 ThreadMap::const_iterator found
=
606 threads_
.find(PlatformThread::CurrentId());
607 if (found
!= threads_
.end() && found
->second
->is_processing_task() &&
608 found
->second
->task_shutdown_behavior() != BLOCK_SHUTDOWN
) {
612 if (max_blocking_tasks_after_shutdown_
<= 0) {
613 DLOG(WARNING
) << "BLOCK_SHUTDOWN task disallowed";
616 max_blocking_tasks_after_shutdown_
-= 1;
619 // The trace_id is used for identifying the task in about:tracing.
620 sequenced
.trace_id
= trace_id_
++;
622 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
623 "SequencedWorkerPool::PostTask",
624 TRACE_ID_MANGLE(GetTaskTraceID(sequenced
, static_cast<void*>(this))));
626 sequenced
.sequence_task_number
= LockedGetNextSequenceTaskNumber();
628 // Now that we have the lock, apply the named token rules.
629 if (optional_token_name
)
630 sequenced
.sequence_token_id
= LockedGetNamedTokenID(*optional_token_name
);
632 pending_tasks_
.insert(sequenced
);
633 if (shutdown_behavior
== BLOCK_SHUTDOWN
)
634 blocking_shutdown_pending_task_count_
++;
636 create_thread_id
= PrepareToStartAdditionalThreadIfHelpful();
639 // Actually start the additional thread or signal an existing one now that
640 // we're outside the lock.
641 if (create_thread_id
)
642 FinishStartingAdditionalThread(create_thread_id
);
649 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
650 AutoLock
lock(lock_
);
651 return ContainsKey(threads_
, PlatformThread::CurrentId());
654 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
655 SequenceToken sequence_token
) const {
656 AutoLock
lock(lock_
);
657 ThreadMap::const_iterator found
= threads_
.find(PlatformThread::CurrentId());
658 if (found
== threads_
.end())
660 return found
->second
->is_processing_task() &&
661 sequence_token
.Equals(found
->second
->task_sequence_token());
664 // See https://code.google.com/p/chromium/issues/detail?id=168415
665 void SequencedWorkerPool::Inner::CleanupForTesting() {
666 DCHECK(!RunsTasksOnCurrentThread());
667 base::ThreadRestrictions::ScopedAllowWait allow_wait
;
668 AutoLock
lock(lock_
);
669 CHECK_EQ(CLEANUP_DONE
, cleanup_state_
);
670 if (shutdown_called_
)
672 if (pending_tasks_
.empty() && waiting_thread_count_
== threads_
.size())
674 cleanup_state_
= CLEANUP_REQUESTED
;
676 has_work_cv_
.Signal();
677 while (cleanup_state_
!= CLEANUP_DONE
)
681 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
685 void SequencedWorkerPool::Inner::Shutdown(
686 int max_new_blocking_tasks_after_shutdown
) {
687 DCHECK_GE(max_new_blocking_tasks_after_shutdown
, 0);
689 AutoLock
lock(lock_
);
690 // Cleanup and Shutdown should not be called concurrently.
691 CHECK_EQ(CLEANUP_DONE
, cleanup_state_
);
692 if (shutdown_called_
)
694 shutdown_called_
= true;
695 max_blocking_tasks_after_shutdown_
= max_new_blocking_tasks_after_shutdown
;
697 // Tickle the threads. This will wake up a waiting one so it will know that
698 // it can exit, which in turn will wake up any other waiting ones.
701 // There are no pending or running tasks blocking shutdown, we're done.
706 // If we're here, then something is blocking shutdown. So wait for
707 // CanShutdown() to go to true.
709 if (testing_observer_
)
710 testing_observer_
->WillWaitForShutdown();
712 #if !defined(OS_NACL)
713 TimeTicks shutdown_wait_begin
= TimeTicks::Now();
717 base::ThreadRestrictions::ScopedAllowWait allow_wait
;
718 AutoLock
lock(lock_
);
719 while (!CanShutdown())
720 can_shutdown_cv_
.Wait();
722 #if !defined(OS_NACL)
723 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
724 TimeTicks::Now() - shutdown_wait_begin
);
728 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
729 AutoLock
lock(lock_
);
730 return shutdown_called_
;
733 void SequencedWorkerPool::Inner::ThreadLoop(Worker
* this_worker
) {
735 AutoLock
lock(lock_
);
736 DCHECK(thread_being_created_
);
737 thread_being_created_
= false;
738 std::pair
<ThreadMap::iterator
, bool> result
=
740 std::make_pair(this_worker
->tid(), make_linked_ptr(this_worker
)));
741 DCHECK(result
.second
);
744 #if defined(OS_MACOSX)
745 base::mac::ScopedNSAutoreleasePool autorelease_pool
;
750 // See GetWork for what delete_these_outside_lock is doing.
753 std::vector
<Closure
> delete_these_outside_lock
;
754 GetWorkStatus status
=
755 GetWork(&task
, &wait_time
, &delete_these_outside_lock
);
756 if (status
== GET_WORK_FOUND
) {
757 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
758 "SequencedWorkerPool::PostTask",
759 TRACE_ID_MANGLE(GetTaskTraceID(task
, static_cast<void*>(this))));
760 TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop",
761 "src_file", task
.posted_from
.file_name(),
762 "src_func", task
.posted_from
.function_name());
763 int new_thread_id
= WillRunWorkerTask(task
);
765 AutoUnlock
unlock(lock_
);
766 // There may be more work available, so wake up another
767 // worker thread. (Technically not required, since we
768 // already get a signal for each new task, but it doesn't
771 delete_these_outside_lock
.clear();
773 // Complete thread creation outside the lock if necessary.
775 FinishStartingAdditionalThread(new_thread_id
);
777 this_worker
->set_running_task_info(
778 SequenceToken(task
.sequence_token_id
), task
.shutdown_behavior
);
780 tracked_objects::TaskStopwatch stopwatch
;
785 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
788 // Make sure our task is erased outside the lock for the
789 // same reason we do this with delete_these_oustide_lock.
790 // Also, do it before calling reset_running_task_info() so
791 // that sequence-checking from within the task's destructor
793 task
.task
= Closure();
795 this_worker
->reset_running_task_info();
797 DidRunWorkerTask(task
); // Must be done inside the lock.
798 } else if (cleanup_state_
== CLEANUP_RUNNING
) {
800 case GET_WORK_WAIT
: {
801 AutoUnlock
unlock(lock_
);
802 delete_these_outside_lock
.clear();
805 case GET_WORK_NOT_FOUND
:
806 CHECK(delete_these_outside_lock
.empty());
807 cleanup_state_
= CLEANUP_FINISHING
;
808 cleanup_cv_
.Broadcast();
814 // When we're terminating and there's no more work, we can
815 // shut down, other workers can complete any pending or new tasks.
816 // We can get additional tasks posted after shutdown_called_ is set
817 // but only worker threads are allowed to post tasks at that time, and
818 // the workers responsible for posting those tasks will be available
819 // to run them. Also, there may be some tasks stuck behind running
820 // ones with the same sequence token, but additional threads won't
822 if (shutdown_called_
&& blocking_shutdown_pending_task_count_
== 0) {
823 AutoUnlock
unlock(lock_
);
824 delete_these_outside_lock
.clear();
828 // No work was found, but there are tasks that need deletion. The
829 // deletion must happen outside of the lock.
830 if (delete_these_outside_lock
.size()) {
831 AutoUnlock
unlock(lock_
);
832 delete_these_outside_lock
.clear();
834 // Since the lock has been released, |status| may no longer be
835 // accurate. It might read GET_WORK_WAIT even if there are tasks
836 // ready to perform work. Jump to the top of the loop to recalculate
841 waiting_thread_count_
++;
844 case GET_WORK_NOT_FOUND
:
848 has_work_cv_
.TimedWait(wait_time
);
853 waiting_thread_count_
--;
858 // We noticed we should exit. Wake up the next worker so it knows it should
859 // exit as well (because the Shutdown() code only signals once).
862 // Possibly unblock shutdown.
863 can_shutdown_cv_
.Signal();
866 void SequencedWorkerPool::Inner::HandleCleanup() {
867 lock_
.AssertAcquired();
868 if (cleanup_state_
== CLEANUP_DONE
)
870 if (cleanup_state_
== CLEANUP_REQUESTED
) {
871 // We win, we get to do the cleanup as soon as the others wise up and idle.
872 cleanup_state_
= CLEANUP_STARTING
;
873 while (thread_being_created_
||
874 cleanup_idlers_
!= threads_
.size() - 1) {
875 has_work_cv_
.Signal();
878 cleanup_state_
= CLEANUP_RUNNING
;
881 if (cleanup_state_
== CLEANUP_STARTING
) {
882 // Another worker thread is cleaning up, we idle here until thats done.
884 cleanup_cv_
.Broadcast();
885 while (cleanup_state_
!= CLEANUP_FINISHING
) {
889 cleanup_cv_
.Broadcast();
892 if (cleanup_state_
== CLEANUP_FINISHING
) {
893 // We wait for all idlers to wake up prior to being DONE.
894 while (cleanup_idlers_
!= 0) {
895 cleanup_cv_
.Broadcast();
898 if (cleanup_state_
== CLEANUP_FINISHING
) {
899 cleanup_state_
= CLEANUP_DONE
;
900 cleanup_cv_
.Signal();
906 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
907 const std::string
& name
) {
908 lock_
.AssertAcquired();
909 DCHECK(!name
.empty());
911 std::map
<std::string
, int>::const_iterator found
=
912 named_sequence_tokens_
.find(name
);
913 if (found
!= named_sequence_tokens_
.end())
914 return found
->second
; // Got an existing one.
916 // Create a new one for this name.
917 SequenceToken result
= GetSequenceToken();
918 named_sequence_tokens_
.insert(std::make_pair(name
, result
.id_
));
922 int64
SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
923 lock_
.AssertAcquired();
924 // We assume that we never create enough tasks to wrap around.
925 return next_sequence_task_number_
++;
928 SequencedWorkerPool::Inner::GetWorkStatus
SequencedWorkerPool::Inner::GetWork(
930 TimeDelta
* wait_time
,
931 std::vector
<Closure
>* delete_these_outside_lock
) {
932 lock_
.AssertAcquired();
934 // Find the next task with a sequence token that's not currently in use.
935 // If the token is in use, that means another thread is running something
936 // in that sequence, and we can't run it without going out-of-order.
938 // This algorithm is simple and fair, but inefficient in some cases. For
939 // example, say somebody schedules 1000 slow tasks with the same sequence
940 // number. We'll have to go through all those tasks each time we feel like
941 // there might be work to schedule. If this proves to be a problem, we
942 // should make this more efficient.
944 // One possible enhancement would be to keep a map from sequence ID to a
945 // list of pending but currently blocked SequencedTasks for that ID.
946 // When a worker finishes a task of one sequence token, it can pick up the
947 // next one from that token right away.
949 // This may lead to starvation if there are sufficient numbers of sequences
950 // in use. To alleviate this, we could add an incrementing priority counter
951 // to each SequencedTask. Then maintain a priority_queue of all runnable
952 // tasks, sorted by priority counter. When a sequenced task is completed
953 // we would pop the head element off of that tasks pending list and add it
954 // to the priority queue. Then we would run the first item in the priority
957 GetWorkStatus status
= GET_WORK_NOT_FOUND
;
958 int unrunnable_tasks
= 0;
959 PendingTaskSet::iterator i
= pending_tasks_
.begin();
960 // We assume that the loop below doesn't take too long and so we can just do
961 // a single call to TimeTicks::Now().
962 const TimeTicks current_time
= TimeTicks::Now();
963 while (i
!= pending_tasks_
.end()) {
964 if (!IsSequenceTokenRunnable(i
->sequence_token_id
)) {
970 if (shutdown_called_
&& i
->shutdown_behavior
!= BLOCK_SHUTDOWN
) {
971 // We're shutting down and the task we just found isn't blocking
972 // shutdown. Delete it and get more work.
974 // Note that we do not want to delete unrunnable tasks. Deleting a task
975 // can have side effects (like freeing some objects) and deleting a
976 // task that's supposed to run after one that's currently running could
977 // cause an obscure crash.
979 // We really want to delete these tasks outside the lock in case the
980 // closures are holding refs to objects that want to post work from
981 // their destructorss (which would deadlock). The closures are
982 // internally refcounted, so we just need to keep a copy of them alive
983 // until the lock is exited. The calling code can just clear() the
984 // vector they passed to us once the lock is exited to make this
986 delete_these_outside_lock
->push_back(i
->task
);
987 pending_tasks_
.erase(i
++);
991 if (i
->time_to_run
> current_time
) {
992 // The time to run has not come yet.
993 *wait_time
= i
->time_to_run
- current_time
;
994 status
= GET_WORK_WAIT
;
995 if (cleanup_state_
== CLEANUP_RUNNING
) {
996 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
997 delete_these_outside_lock
->push_back(i
->task
);
998 pending_tasks_
.erase(i
);
1003 // Found a runnable task.
1005 pending_tasks_
.erase(i
);
1006 if (task
->shutdown_behavior
== BLOCK_SHUTDOWN
) {
1007 blocking_shutdown_pending_task_count_
--;
1010 status
= GET_WORK_FOUND
;
1017 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask
& task
) {
1018 lock_
.AssertAcquired();
1020 // Mark the task's sequence number as in use.
1021 if (task
.sequence_token_id
)
1022 current_sequences_
.insert(task
.sequence_token_id
);
1024 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1025 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1027 if (task
.shutdown_behavior
!= CONTINUE_ON_SHUTDOWN
)
1028 blocking_shutdown_thread_count_
++;
1030 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1031 // creates a new thread if there is no free one, there is a race when posting
1032 // tasks that many tasks could have been posted before a thread started
1033 // running them, so only one thread would have been created. So we also check
1034 // whether we should create more threads after removing our task from the
1035 // queue, which also has the nice side effect of creating the workers from
1036 // background threads rather than the main thread of the app.
1038 // If another thread wasn't created, we want to wake up an existing thread
1039 // if there is one waiting to pick up the next task.
1041 // Note that we really need to do this *before* running the task, not
1042 // after. Otherwise, if more than one task is posted, the creation of the
1043 // second thread (since we only create one at a time) will be blocked by
1044 // the execution of the first task, which could be arbitrarily long.
1045 return PrepareToStartAdditionalThreadIfHelpful();
1048 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask
& task
) {
1049 lock_
.AssertAcquired();
1051 if (task
.shutdown_behavior
!= CONTINUE_ON_SHUTDOWN
) {
1052 DCHECK_GT(blocking_shutdown_thread_count_
, 0u);
1053 blocking_shutdown_thread_count_
--;
1056 if (task
.sequence_token_id
)
1057 current_sequences_
.erase(task
.sequence_token_id
);
1060 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1061 int sequence_token_id
) const {
1062 lock_
.AssertAcquired();
1063 return !sequence_token_id
||
1064 current_sequences_
.find(sequence_token_id
) ==
1065 current_sequences_
.end();
1068 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1069 lock_
.AssertAcquired();
1070 // How thread creation works:
1072 // We'de like to avoid creating threads with the lock held. However, we
1073 // need to be sure that we have an accurate accounting of the threads for
1074 // proper Joining and deltion on shutdown.
1076 // We need to figure out if we need another thread with the lock held, which
1077 // is what this function does. It then marks us as in the process of creating
1078 // a thread. When we do shutdown, we wait until the thread_being_created_
1079 // flag is cleared, which ensures that the new thread is properly added to
1080 // all the data structures and we can't leak it. Once shutdown starts, we'll
1081 // refuse to create more threads or they would be leaked.
1083 // Note that this creates a mostly benign race condition on shutdown that
1084 // will cause fewer workers to be created than one would expect. It isn't
1085 // much of an issue in real life, but affects some tests. Since we only spawn
1086 // one worker at a time, the following sequence of events can happen:
1088 // 1. Main thread posts a bunch of unrelated tasks that would normally be
1089 // run on separate threads.
1090 // 2. The first task post causes us to start a worker. Other tasks do not
1091 // cause a worker to start since one is pending.
1092 // 3. Main thread initiates shutdown.
1093 // 4. No more threads are created since the shutdown_called_ flag is set.
1095 // The result is that one may expect that max_threads_ workers to be created
1096 // given the workload, but in reality fewer may be created because the
1097 // sequence of thread creation on the background threads is racing with the
1099 if (!shutdown_called_
&&
1100 !thread_being_created_
&&
1101 cleanup_state_
== CLEANUP_DONE
&&
1102 threads_
.size() < max_threads_
&&
1103 waiting_thread_count_
== 0) {
1104 // We could use an additional thread if there's work to be done.
1105 for (PendingTaskSet::const_iterator i
= pending_tasks_
.begin();
1106 i
!= pending_tasks_
.end(); ++i
) {
1107 if (IsSequenceTokenRunnable(i
->sequence_token_id
)) {
1108 // Found a runnable task, mark the thread as being started.
1109 thread_being_created_
= true;
1110 return static_cast<int>(threads_
.size() + 1);
1117 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1118 int thread_number
) {
1119 // Called outside of the lock.
1120 DCHECK_GT(thread_number
, 0);
1122 // The worker is assigned to the list when the thread actually starts, which
1123 // will manage the memory of the pointer.
1124 new Worker(worker_pool_
, thread_number
, thread_name_prefix_
);
1127 void SequencedWorkerPool::Inner::SignalHasWork() {
1128 has_work_cv_
.Signal();
1129 if (testing_observer_
) {
1130 testing_observer_
->OnHasWork();
1134 bool SequencedWorkerPool::Inner::CanShutdown() const {
1135 lock_
.AssertAcquired();
1136 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1137 return !thread_being_created_
&&
1138 blocking_shutdown_thread_count_
== 0 &&
1139 blocking_shutdown_pending_task_count_
== 0;
1142 base::StaticAtomicSequenceNumber
1143 SequencedWorkerPool::Inner::g_last_sequence_number_
;
1145 // SequencedWorkerPool --------------------------------------------------------
1148 SequencedWorkerPool::SequenceToken
1149 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1150 // Don't construct lazy instance on check.
1151 if (g_lazy_tls_ptr
== NULL
)
1152 return SequenceToken();
1154 SequencedWorkerPool::SequenceToken
* token
= g_lazy_tls_ptr
.Get().Get();
1156 return SequenceToken();
1160 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads
,
1161 const std::string
& thread_name_prefix
)
1162 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1163 inner_(new Inner(this, max_threads
, thread_name_prefix
, NULL
)) {
1166 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads
,
1167 const std::string
& thread_name_prefix
,
1168 TestingObserver
* observer
)
1169 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1170 inner_(new Inner(this, max_threads
, thread_name_prefix
, observer
)) {
1173 SequencedWorkerPool::~SequencedWorkerPool() {}
1175 void SequencedWorkerPool::OnDestruct() const {
1176 // Avoid deleting ourselves on a worker thread (which would
1178 if (RunsTasksOnCurrentThread()) {
1179 constructor_task_runner_
->DeleteSoon(FROM_HERE
, this);
1185 SequencedWorkerPool::SequenceToken
SequencedWorkerPool::GetSequenceToken() {
1186 return inner_
->GetSequenceToken();
1189 SequencedWorkerPool::SequenceToken
SequencedWorkerPool::GetNamedSequenceToken(
1190 const std::string
& name
) {
1191 return inner_
->GetNamedSequenceToken(name
);
1194 scoped_refptr
<SequencedTaskRunner
> SequencedWorkerPool::GetSequencedTaskRunner(
1195 SequenceToken token
) {
1196 return GetSequencedTaskRunnerWithShutdownBehavior(token
, BLOCK_SHUTDOWN
);
1199 scoped_refptr
<SequencedTaskRunner
>
1200 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1201 SequenceToken token
, WorkerShutdown shutdown_behavior
) {
1202 return new SequencedWorkerPoolSequencedTaskRunner(
1203 this, token
, shutdown_behavior
);
1206 scoped_refptr
<TaskRunner
>
1207 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1208 WorkerShutdown shutdown_behavior
) {
1209 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior
);
1212 bool SequencedWorkerPool::PostWorkerTask(
1213 const tracked_objects::Location
& from_here
,
1214 const Closure
& task
) {
1215 return inner_
->PostTask(NULL
, SequenceToken(), BLOCK_SHUTDOWN
,
1216 from_here
, task
, TimeDelta());
1219 bool SequencedWorkerPool::PostDelayedWorkerTask(
1220 const tracked_objects::Location
& from_here
,
1221 const Closure
& task
,
1223 WorkerShutdown shutdown_behavior
=
1224 delay
== TimeDelta() ? BLOCK_SHUTDOWN
: SKIP_ON_SHUTDOWN
;
1225 return inner_
->PostTask(NULL
, SequenceToken(), shutdown_behavior
,
1226 from_here
, task
, delay
);
1229 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1230 const tracked_objects::Location
& from_here
,
1231 const Closure
& task
,
1232 WorkerShutdown shutdown_behavior
) {
1233 return inner_
->PostTask(NULL
, SequenceToken(), shutdown_behavior
,
1234 from_here
, task
, TimeDelta());
1237 bool SequencedWorkerPool::PostSequencedWorkerTask(
1238 SequenceToken sequence_token
,
1239 const tracked_objects::Location
& from_here
,
1240 const Closure
& task
) {
1241 return inner_
->PostTask(NULL
, sequence_token
, BLOCK_SHUTDOWN
,
1242 from_here
, task
, TimeDelta());
1245 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1246 SequenceToken sequence_token
,
1247 const tracked_objects::Location
& from_here
,
1248 const Closure
& task
,
1250 WorkerShutdown shutdown_behavior
=
1251 delay
== TimeDelta() ? BLOCK_SHUTDOWN
: SKIP_ON_SHUTDOWN
;
1252 return inner_
->PostTask(NULL
, sequence_token
, shutdown_behavior
,
1253 from_here
, task
, delay
);
1256 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1257 const std::string
& token_name
,
1258 const tracked_objects::Location
& from_here
,
1259 const Closure
& task
) {
1260 DCHECK(!token_name
.empty());
1261 return inner_
->PostTask(&token_name
, SequenceToken(), BLOCK_SHUTDOWN
,
1262 from_here
, task
, TimeDelta());
1265 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1266 SequenceToken sequence_token
,
1267 const tracked_objects::Location
& from_here
,
1268 const Closure
& task
,
1269 WorkerShutdown shutdown_behavior
) {
1270 return inner_
->PostTask(NULL
, sequence_token
, shutdown_behavior
,
1271 from_here
, task
, TimeDelta());
1274 bool SequencedWorkerPool::PostDelayedTask(
1275 const tracked_objects::Location
& from_here
,
1276 const Closure
& task
,
1278 return PostDelayedWorkerTask(from_here
, task
, delay
);
1281 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1282 return inner_
->RunsTasksOnCurrentThread();
1285 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1286 SequenceToken sequence_token
) const {
1287 return inner_
->IsRunningSequenceOnCurrentThread(sequence_token
);
1290 void SequencedWorkerPool::FlushForTesting() {
1291 inner_
->CleanupForTesting();
1294 void SequencedWorkerPool::SignalHasWorkForTesting() {
1295 inner_
->SignalHasWorkForTesting();
1298 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown
) {
1299 DCHECK(constructor_task_runner_
->BelongsToCurrentThread());
1300 inner_
->Shutdown(max_new_blocking_tasks_after_shutdown
);
1303 bool SequencedWorkerPool::IsShutdownInProgress() {
1304 return inner_
->IsShutdownInProgress();