1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
13 #include "base/atomic_sequence_num.h"
14 #include "base/callback.h"
15 #include "base/compiler_specific.h"
16 #include "base/critical_closure.h"
17 #include "base/debug/trace_event.h"
18 #include "base/lazy_instance.h"
19 #include "base/logging.h"
20 #include "base/memory/linked_ptr.h"
21 #include "base/message_loop/message_loop_proxy.h"
22 #include "base/stl_util.h"
23 #include "base/strings/stringprintf.h"
24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/lock.h"
26 #include "base/threading/platform_thread.h"
27 #include "base/threading/simple_thread.h"
28 #include "base/threading/thread_local.h"
29 #include "base/threading/thread_restrictions.h"
30 #include "base/time/time.h"
31 #include "base/tracked_objects.h"
33 #if defined(OS_MACOSX)
34 #include "base/mac/scoped_nsautorelease_pool.h"
38 #include "base/metrics/histogram.h"
45 struct SequencedTask
: public TrackingInfo
{
47 : sequence_token_id(0),
49 sequence_task_number(0),
50 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN
) {}
52 explicit SequencedTask(const tracked_objects::Location
& from_here
)
53 : base::TrackingInfo(from_here
, TimeTicks()),
56 sequence_task_number(0),
57 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN
) {}
61 int sequence_token_id
;
63 int64 sequence_task_number
;
64 SequencedWorkerPool::WorkerShutdown shutdown_behavior
;
65 tracked_objects::Location posted_from
;
68 // Non-delayed tasks and delayed tasks are managed together by time-to-run
69 // order. We calculate the time by adding the posted time and the given delay.
70 TimeTicks time_to_run
;
73 struct SequencedTaskLessThan
{
75 bool operator()(const SequencedTask
& lhs
, const SequencedTask
& rhs
) const {
76 if (lhs
.time_to_run
< rhs
.time_to_run
)
79 if (lhs
.time_to_run
> rhs
.time_to_run
)
82 // If the time happen to match, then we use the sequence number to decide.
83 return lhs
.sequence_task_number
< rhs
.sequence_task_number
;
87 // SequencedWorkerPoolTaskRunner ---------------------------------------------
88 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
89 // fixed ShutdownBehavior.
91 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
92 class SequencedWorkerPoolTaskRunner
: public TaskRunner
{
94 SequencedWorkerPoolTaskRunner(
95 const scoped_refptr
<SequencedWorkerPool
>& pool
,
96 SequencedWorkerPool::WorkerShutdown shutdown_behavior
);
98 // TaskRunner implementation
99 virtual bool PostDelayedTask(const tracked_objects::Location
& from_here
,
101 TimeDelta delay
) OVERRIDE
;
102 virtual bool RunsTasksOnCurrentThread() const OVERRIDE
;
105 virtual ~SequencedWorkerPoolTaskRunner();
107 const scoped_refptr
<SequencedWorkerPool
> pool_
;
109 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_
;
111 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner
);
114 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
115 const scoped_refptr
<SequencedWorkerPool
>& pool
,
116 SequencedWorkerPool::WorkerShutdown shutdown_behavior
)
118 shutdown_behavior_(shutdown_behavior
) {
121 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
124 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
125 const tracked_objects::Location
& from_here
,
128 if (delay
== TimeDelta()) {
129 return pool_
->PostWorkerTaskWithShutdownBehavior(
130 from_here
, task
, shutdown_behavior_
);
132 return pool_
->PostDelayedWorkerTask(from_here
, task
, delay
);
135 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
136 return pool_
->RunsTasksOnCurrentThread();
139 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
140 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
141 // fixed sequence token.
143 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
144 class SequencedWorkerPoolSequencedTaskRunner
: public SequencedTaskRunner
{
146 SequencedWorkerPoolSequencedTaskRunner(
147 const scoped_refptr
<SequencedWorkerPool
>& pool
,
148 SequencedWorkerPool::SequenceToken token
,
149 SequencedWorkerPool::WorkerShutdown shutdown_behavior
);
151 // TaskRunner implementation
152 virtual bool PostDelayedTask(const tracked_objects::Location
& from_here
,
154 TimeDelta delay
) OVERRIDE
;
155 virtual bool RunsTasksOnCurrentThread() const OVERRIDE
;
157 // SequencedTaskRunner implementation
158 virtual bool PostNonNestableDelayedTask(
159 const tracked_objects::Location
& from_here
,
161 TimeDelta delay
) OVERRIDE
;
164 virtual ~SequencedWorkerPoolSequencedTaskRunner();
166 const scoped_refptr
<SequencedWorkerPool
> pool_
;
168 const SequencedWorkerPool::SequenceToken token_
;
170 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_
;
172 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner
);
175 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
176 const scoped_refptr
<SequencedWorkerPool
>& pool
,
177 SequencedWorkerPool::SequenceToken token
,
178 SequencedWorkerPool::WorkerShutdown shutdown_behavior
)
181 shutdown_behavior_(shutdown_behavior
) {
184 SequencedWorkerPoolSequencedTaskRunner::
185 ~SequencedWorkerPoolSequencedTaskRunner() {
188 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
189 const tracked_objects::Location
& from_here
,
192 if (delay
== TimeDelta()) {
193 return pool_
->PostSequencedWorkerTaskWithShutdownBehavior(
194 token_
, from_here
, task
, shutdown_behavior_
);
196 return pool_
->PostDelayedSequencedWorkerTask(token_
, from_here
, task
, delay
);
199 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
200 return pool_
->IsRunningSequenceOnCurrentThread(token_
);
203 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
204 const tracked_objects::Location
& from_here
,
207 // There's no way to run nested tasks, so simply forward to
209 return PostDelayedTask(from_here
, task
, delay
);
212 // Create a process-wide unique ID to represent this task in trace events. This
213 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
214 // with MessageLoop pointers on other processes.
215 uint64
GetTaskTraceID(const SequencedTask
& task
,
217 return (static_cast<uint64
>(task
.trace_id
) << 32) |
218 static_cast<uint64
>(reinterpret_cast<intptr_t>(pool
));
221 base::LazyInstance
<base::ThreadLocalPointer
<
222 SequencedWorkerPool::SequenceToken
> >::Leaky g_lazy_tls_ptr
=
223 LAZY_INSTANCE_INITIALIZER
;
227 // Worker ---------------------------------------------------------------------
229 class SequencedWorkerPool::Worker
: public SimpleThread
{
231 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
232 // around as long as we are running.
233 Worker(const scoped_refptr
<SequencedWorkerPool
>& worker_pool
,
235 const std::string
& thread_name_prefix
);
238 // SimpleThread implementation. This actually runs the background thread.
239 virtual void Run() OVERRIDE
;
241 void set_running_task_info(SequenceToken token
,
242 WorkerShutdown shutdown_behavior
) {
243 running_sequence_
= token
;
244 running_shutdown_behavior_
= shutdown_behavior
;
247 SequenceToken
running_sequence() const {
248 return running_sequence_
;
251 WorkerShutdown
running_shutdown_behavior() const {
252 return running_shutdown_behavior_
;
256 scoped_refptr
<SequencedWorkerPool
> worker_pool_
;
257 SequenceToken running_sequence_
;
258 WorkerShutdown running_shutdown_behavior_
;
260 DISALLOW_COPY_AND_ASSIGN(Worker
);
263 // Inner ----------------------------------------------------------------------
265 class SequencedWorkerPool::Inner
{
267 // Take a raw pointer to |worker| to avoid cycles (since we're owned
269 Inner(SequencedWorkerPool
* worker_pool
, size_t max_threads
,
270 const std::string
& thread_name_prefix
,
271 TestingObserver
* observer
);
275 SequenceToken
GetSequenceToken();
277 SequenceToken
GetNamedSequenceToken(const std::string
& name
);
279 // This function accepts a name and an ID. If the name is null, the
280 // token ID is used. This allows us to implement the optional name lookup
281 // from a single function without having to enter the lock a separate time.
282 bool PostTask(const std::string
* optional_token_name
,
283 SequenceToken sequence_token
,
284 WorkerShutdown shutdown_behavior
,
285 const tracked_objects::Location
& from_here
,
289 bool RunsTasksOnCurrentThread() const;
291 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token
) const;
293 void CleanupForTesting();
295 void SignalHasWorkForTesting();
297 int GetWorkSignalCountForTesting() const;
299 void Shutdown(int max_blocking_tasks_after_shutdown
);
301 bool IsShutdownInProgress();
303 // Runs the worker loop on the background thread.
304 void ThreadLoop(Worker
* this_worker
);
321 // Called from within the lock, this converts the given token name into a
322 // token ID, creating a new one if necessary.
323 int LockedGetNamedTokenID(const std::string
& name
);
325 // Called from within the lock, this returns the next sequence task number.
326 int64
LockedGetNextSequenceTaskNumber();
328 // Called from within the lock, returns the shutdown behavior of the task
329 // running on the currently executing worker thread. If invoked from a thread
330 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
331 WorkerShutdown
LockedCurrentThreadShutdownBehavior() const;
333 // Gets new task. There are 3 cases depending on the return value:
335 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
336 // be run immediately.
337 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
338 // and |task| is not filled in. In this case, the caller should wait until
340 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
341 // immediately, and |task| is not filled in. Likewise, |wait_time| is
342 // filled in the time to wait until the next task to run. In this case, the
343 // caller should wait the time.
345 // In any case, the calling code should clear the given
346 // delete_these_outside_lock vector the next time the lock is released.
347 // See the implementation for a more detailed description.
348 GetWorkStatus
GetWork(SequencedTask
* task
,
349 TimeDelta
* wait_time
,
350 std::vector
<Closure
>* delete_these_outside_lock
);
352 void HandleCleanup();
354 // Peforms init and cleanup around running the given task. WillRun...
355 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
356 // The calling code should call FinishStartingAdditionalThread once the
357 // lock is released if the return values is nonzero.
358 int WillRunWorkerTask(const SequencedTask
& task
);
359 void DidRunWorkerTask(const SequencedTask
& task
);
361 // Returns true if there are no threads currently running the given
363 bool IsSequenceTokenRunnable(int sequence_token_id
) const;
365 // Checks if all threads are busy and the addition of one more could run an
366 // additional task waiting in the queue. This must be called from within
369 // If another thread is helpful, this will mark the thread as being in the
370 // process of starting and returns the index of the new thread which will be
371 // 0 or more. The caller should then call FinishStartingAdditionalThread to
372 // complete initialization once the lock is released.
374 // If another thread is not necessary, returne 0;
376 // See the implementedion for more.
377 int PrepareToStartAdditionalThreadIfHelpful();
379 // The second part of thread creation after
380 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
381 // generated. This actually creates the thread and should be called outside
382 // the lock to avoid blocking important work starting a thread in the lock.
383 void FinishStartingAdditionalThread(int thread_number
);
385 // Signal |has_work_| and increment |has_work_signal_count_|.
386 void SignalHasWork();
388 // Checks whether there is work left that's blocking shutdown. Must be
389 // called inside the lock.
390 bool CanShutdown() const;
392 SequencedWorkerPool
* const worker_pool_
;
394 // The last sequence number used. Managed by GetSequenceToken, since this
395 // only does threadsafe increment operations, you do not need to hold the
396 // lock. This is class-static to make SequenceTokens issued by
397 // GetSequenceToken unique across SequencedWorkerPool instances.
398 static base::StaticAtomicSequenceNumber g_last_sequence_number_
;
400 // This lock protects |everything in this class|. Do not read or modify
401 // anything without holding this lock. Do not block while holding this
405 // Condition variable that is waited on by worker threads until new
406 // tasks are posted or shutdown starts.
407 ConditionVariable has_work_cv_
;
409 // Condition variable that is waited on by non-worker threads (in
410 // Shutdown()) until CanShutdown() goes to true.
411 ConditionVariable can_shutdown_cv_
;
413 // The maximum number of worker threads we'll create.
414 const size_t max_threads_
;
416 const std::string thread_name_prefix_
;
418 // Associates all known sequence token names with their IDs.
419 std::map
<std::string
, int> named_sequence_tokens_
;
421 // Owning pointers to all threads we've created so far, indexed by
422 // ID. Since we lazily create threads, this may be less than
423 // max_threads_ and will be initially empty.
424 typedef std::map
<PlatformThreadId
, linked_ptr
<Worker
> > ThreadMap
;
427 // Set to true when we're in the process of creating another thread.
428 // See PrepareToStartAdditionalThreadIfHelpful for more.
429 bool thread_being_created_
;
431 // Number of threads currently waiting for work.
432 size_t waiting_thread_count_
;
434 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
435 // or SKIP_ON_SHUTDOWN flag set.
436 size_t blocking_shutdown_thread_count_
;
438 // A set of all pending tasks in time-to-run order. These are tasks that are
439 // either waiting for a thread to run on, waiting for their time to run,
440 // or blocked on a previous task in their sequence. We have to iterate over
441 // the tasks by time-to-run order, so we use the set instead of the
442 // traditional priority_queue.
443 typedef std::set
<SequencedTask
, SequencedTaskLessThan
> PendingTaskSet
;
444 PendingTaskSet pending_tasks_
;
446 // The next sequence number for a new sequenced task.
447 int64 next_sequence_task_number_
;
449 // Number of tasks in the pending_tasks_ list that are marked as blocking
451 size_t blocking_shutdown_pending_task_count_
;
453 // Lists all sequence tokens currently executing.
454 std::set
<int> current_sequences_
;
456 // An ID for each posted task to distinguish the task from others in traces.
459 // Set when Shutdown is called and no further tasks should be
460 // allowed, though we may still be running existing tasks.
461 bool shutdown_called_
;
463 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
465 int max_blocking_tasks_after_shutdown_
;
467 // State used to cleanup for testing, all guarded by lock_.
468 CleanupState cleanup_state_
;
469 size_t cleanup_idlers_
;
470 ConditionVariable cleanup_cv_
;
472 TestingObserver
* const testing_observer_
;
474 DISALLOW_COPY_AND_ASSIGN(Inner
);
477 // Worker definitions ---------------------------------------------------------
479 SequencedWorkerPool::Worker::Worker(
480 const scoped_refptr
<SequencedWorkerPool
>& worker_pool
,
482 const std::string
& prefix
)
484 prefix
+ StringPrintf("Worker%d", thread_number
).c_str()),
485 worker_pool_(worker_pool
),
486 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN
) {
490 SequencedWorkerPool::Worker::~Worker() {
493 void SequencedWorkerPool::Worker::Run() {
494 // Store a pointer to the running sequence in thread local storage for
495 // static function access.
496 g_lazy_tls_ptr
.Get().Set(&running_sequence_
);
498 // Just jump back to the Inner object to run the thread, since it has all the
499 // tracking information and queues. It might be more natural to implement
500 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
501 // having these worker objects at all, but that method lacks the ability to
502 // send thread-specific information easily to the thread loop.
503 worker_pool_
->inner_
->ThreadLoop(this);
504 // Release our cyclic reference once we're done.
508 // Inner definitions ---------------------------------------------------------
510 SequencedWorkerPool::Inner::Inner(
511 SequencedWorkerPool
* worker_pool
,
513 const std::string
& thread_name_prefix
,
514 TestingObserver
* observer
)
515 : worker_pool_(worker_pool
),
517 has_work_cv_(&lock_
),
518 can_shutdown_cv_(&lock_
),
519 max_threads_(max_threads
),
520 thread_name_prefix_(thread_name_prefix
),
521 thread_being_created_(false),
522 waiting_thread_count_(0),
523 blocking_shutdown_thread_count_(0),
524 next_sequence_task_number_(0),
525 blocking_shutdown_pending_task_count_(0),
527 shutdown_called_(false),
528 max_blocking_tasks_after_shutdown_(0),
529 cleanup_state_(CLEANUP_DONE
),
532 testing_observer_(observer
) {}
534 SequencedWorkerPool::Inner::~Inner() {
535 // You must call Shutdown() before destroying the pool.
536 DCHECK(shutdown_called_
);
538 // Need to explicitly join with the threads before they're destroyed or else
539 // they will be running when our object is half torn down.
540 for (ThreadMap::iterator it
= threads_
.begin(); it
!= threads_
.end(); ++it
)
544 if (testing_observer_
)
545 testing_observer_
->OnDestruct();
548 SequencedWorkerPool::SequenceToken
549 SequencedWorkerPool::Inner::GetSequenceToken() {
550 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
551 // is used as a sentinel value in SequenceTokens.
552 return SequenceToken(g_last_sequence_number_
.GetNext() + 1);
555 SequencedWorkerPool::SequenceToken
556 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string
& name
) {
557 AutoLock
lock(lock_
);
558 return SequenceToken(LockedGetNamedTokenID(name
));
561 bool SequencedWorkerPool::Inner::PostTask(
562 const std::string
* optional_token_name
,
563 SequenceToken sequence_token
,
564 WorkerShutdown shutdown_behavior
,
565 const tracked_objects::Location
& from_here
,
568 DCHECK(delay
== TimeDelta() || shutdown_behavior
== SKIP_ON_SHUTDOWN
);
569 SequencedTask
sequenced(from_here
);
570 sequenced
.sequence_token_id
= sequence_token
.id_
;
571 sequenced
.shutdown_behavior
= shutdown_behavior
;
572 sequenced
.posted_from
= from_here
;
574 shutdown_behavior
== BLOCK_SHUTDOWN
?
575 base::MakeCriticalClosure(task
) : task
;
576 sequenced
.time_to_run
= TimeTicks::Now() + delay
;
578 int create_thread_id
= 0;
580 AutoLock
lock(lock_
);
581 if (shutdown_called_
) {
582 if (shutdown_behavior
!= BLOCK_SHUTDOWN
||
583 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN
) {
586 if (max_blocking_tasks_after_shutdown_
<= 0) {
587 DLOG(WARNING
) << "BLOCK_SHUTDOWN task disallowed";
590 max_blocking_tasks_after_shutdown_
-= 1;
593 // The trace_id is used for identifying the task in about:tracing.
594 sequenced
.trace_id
= trace_id_
++;
596 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
597 "SequencedWorkerPool::PostTask",
598 TRACE_ID_MANGLE(GetTaskTraceID(sequenced
, static_cast<void*>(this))));
600 sequenced
.sequence_task_number
= LockedGetNextSequenceTaskNumber();
602 // Now that we have the lock, apply the named token rules.
603 if (optional_token_name
)
604 sequenced
.sequence_token_id
= LockedGetNamedTokenID(*optional_token_name
);
606 pending_tasks_
.insert(sequenced
);
607 if (shutdown_behavior
== BLOCK_SHUTDOWN
)
608 blocking_shutdown_pending_task_count_
++;
610 create_thread_id
= PrepareToStartAdditionalThreadIfHelpful();
613 // Actually start the additional thread or signal an existing one now that
614 // we're outside the lock.
615 if (create_thread_id
)
616 FinishStartingAdditionalThread(create_thread_id
);
623 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
624 AutoLock
lock(lock_
);
625 return ContainsKey(threads_
, PlatformThread::CurrentId());
628 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
629 SequenceToken sequence_token
) const {
630 AutoLock
lock(lock_
);
631 ThreadMap::const_iterator found
= threads_
.find(PlatformThread::CurrentId());
632 if (found
== threads_
.end())
634 return sequence_token
.Equals(found
->second
->running_sequence());
637 // See https://code.google.com/p/chromium/issues/detail?id=168415
638 void SequencedWorkerPool::Inner::CleanupForTesting() {
639 DCHECK(!RunsTasksOnCurrentThread());
640 base::ThreadRestrictions::ScopedAllowWait allow_wait
;
641 AutoLock
lock(lock_
);
642 CHECK_EQ(CLEANUP_DONE
, cleanup_state_
);
643 if (shutdown_called_
)
645 if (pending_tasks_
.empty() && waiting_thread_count_
== threads_
.size())
647 cleanup_state_
= CLEANUP_REQUESTED
;
649 has_work_cv_
.Signal();
650 while (cleanup_state_
!= CLEANUP_DONE
)
654 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
658 void SequencedWorkerPool::Inner::Shutdown(
659 int max_new_blocking_tasks_after_shutdown
) {
660 DCHECK_GE(max_new_blocking_tasks_after_shutdown
, 0);
662 AutoLock
lock(lock_
);
663 // Cleanup and Shutdown should not be called concurrently.
664 CHECK_EQ(CLEANUP_DONE
, cleanup_state_
);
665 if (shutdown_called_
)
667 shutdown_called_
= true;
668 max_blocking_tasks_after_shutdown_
= max_new_blocking_tasks_after_shutdown
;
670 // Tickle the threads. This will wake up a waiting one so it will know that
671 // it can exit, which in turn will wake up any other waiting ones.
674 // There are no pending or running tasks blocking shutdown, we're done.
679 // If we're here, then something is blocking shutdown. So wait for
680 // CanShutdown() to go to true.
682 if (testing_observer_
)
683 testing_observer_
->WillWaitForShutdown();
685 #if !defined(OS_NACL)
686 TimeTicks shutdown_wait_begin
= TimeTicks::Now();
690 base::ThreadRestrictions::ScopedAllowWait allow_wait
;
691 AutoLock
lock(lock_
);
692 while (!CanShutdown())
693 can_shutdown_cv_
.Wait();
695 #if !defined(OS_NACL)
696 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
697 TimeTicks::Now() - shutdown_wait_begin
);
701 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
702 AutoLock
lock(lock_
);
703 return shutdown_called_
;
706 void SequencedWorkerPool::Inner::ThreadLoop(Worker
* this_worker
) {
708 AutoLock
lock(lock_
);
709 DCHECK(thread_being_created_
);
710 thread_being_created_
= false;
711 std::pair
<ThreadMap::iterator
, bool> result
=
713 std::make_pair(this_worker
->tid(), make_linked_ptr(this_worker
)));
714 DCHECK(result
.second
);
717 #if defined(OS_MACOSX)
718 base::mac::ScopedNSAutoreleasePool autorelease_pool
;
723 // See GetWork for what delete_these_outside_lock is doing.
726 std::vector
<Closure
> delete_these_outside_lock
;
727 GetWorkStatus status
=
728 GetWork(&task
, &wait_time
, &delete_these_outside_lock
);
729 if (status
== GET_WORK_FOUND
) {
730 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
731 "SequencedWorkerPool::PostTask",
732 TRACE_ID_MANGLE(GetTaskTraceID(task
, static_cast<void*>(this))));
733 TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop",
734 "src_file", task
.posted_from
.file_name(),
735 "src_func", task
.posted_from
.function_name());
736 int new_thread_id
= WillRunWorkerTask(task
);
738 AutoUnlock
unlock(lock_
);
739 // There may be more work available, so wake up another
740 // worker thread. (Technically not required, since we
741 // already get a signal for each new task, but it doesn't
744 delete_these_outside_lock
.clear();
746 // Complete thread creation outside the lock if necessary.
748 FinishStartingAdditionalThread(new_thread_id
);
750 this_worker
->set_running_task_info(
751 SequenceToken(task
.sequence_token_id
), task
.shutdown_behavior
);
753 tracked_objects::TrackedTime start_time
=
754 tracked_objects::ThreadData::NowForStartOfRun(task
.birth_tally
);
758 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task
,
759 start_time
, tracked_objects::ThreadData::NowForEndOfRun());
761 // Make sure our task is erased outside the lock for the
762 // same reason we do this with delete_these_oustide_lock.
763 // Also, do it before calling set_running_task_info() so
764 // that sequence-checking from within the task's destructor
766 task
.task
= Closure();
768 this_worker
->set_running_task_info(
769 SequenceToken(), CONTINUE_ON_SHUTDOWN
);
771 DidRunWorkerTask(task
); // Must be done inside the lock.
772 } else if (cleanup_state_
== CLEANUP_RUNNING
) {
774 case GET_WORK_WAIT
: {
775 AutoUnlock
unlock(lock_
);
776 delete_these_outside_lock
.clear();
779 case GET_WORK_NOT_FOUND
:
780 CHECK(delete_these_outside_lock
.empty());
781 cleanup_state_
= CLEANUP_FINISHING
;
782 cleanup_cv_
.Broadcast();
788 // When we're terminating and there's no more work, we can
789 // shut down, other workers can complete any pending or new tasks.
790 // We can get additional tasks posted after shutdown_called_ is set
791 // but only worker threads are allowed to post tasks at that time, and
792 // the workers responsible for posting those tasks will be available
793 // to run them. Also, there may be some tasks stuck behind running
794 // ones with the same sequence token, but additional threads won't
796 if (shutdown_called_
&&
797 blocking_shutdown_pending_task_count_
== 0)
799 waiting_thread_count_
++;
802 case GET_WORK_NOT_FOUND
:
806 has_work_cv_
.TimedWait(wait_time
);
811 waiting_thread_count_
--;
816 // We noticed we should exit. Wake up the next worker so it knows it should
817 // exit as well (because the Shutdown() code only signals once).
820 // Possibly unblock shutdown.
821 can_shutdown_cv_
.Signal();
824 void SequencedWorkerPool::Inner::HandleCleanup() {
825 lock_
.AssertAcquired();
826 if (cleanup_state_
== CLEANUP_DONE
)
828 if (cleanup_state_
== CLEANUP_REQUESTED
) {
829 // We win, we get to do the cleanup as soon as the others wise up and idle.
830 cleanup_state_
= CLEANUP_STARTING
;
831 while (thread_being_created_
||
832 cleanup_idlers_
!= threads_
.size() - 1) {
833 has_work_cv_
.Signal();
836 cleanup_state_
= CLEANUP_RUNNING
;
839 if (cleanup_state_
== CLEANUP_STARTING
) {
840 // Another worker thread is cleaning up, we idle here until thats done.
842 cleanup_cv_
.Broadcast();
843 while (cleanup_state_
!= CLEANUP_FINISHING
) {
847 cleanup_cv_
.Broadcast();
850 if (cleanup_state_
== CLEANUP_FINISHING
) {
851 // We wait for all idlers to wake up prior to being DONE.
852 while (cleanup_idlers_
!= 0) {
853 cleanup_cv_
.Broadcast();
856 if (cleanup_state_
== CLEANUP_FINISHING
) {
857 cleanup_state_
= CLEANUP_DONE
;
858 cleanup_cv_
.Signal();
864 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
865 const std::string
& name
) {
866 lock_
.AssertAcquired();
867 DCHECK(!name
.empty());
869 std::map
<std::string
, int>::const_iterator found
=
870 named_sequence_tokens_
.find(name
);
871 if (found
!= named_sequence_tokens_
.end())
872 return found
->second
; // Got an existing one.
874 // Create a new one for this name.
875 SequenceToken result
= GetSequenceToken();
876 named_sequence_tokens_
.insert(std::make_pair(name
, result
.id_
));
880 int64
SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
881 lock_
.AssertAcquired();
882 // We assume that we never create enough tasks to wrap around.
883 return next_sequence_task_number_
++;
886 SequencedWorkerPool::WorkerShutdown
887 SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
888 lock_
.AssertAcquired();
889 ThreadMap::const_iterator found
= threads_
.find(PlatformThread::CurrentId());
890 if (found
== threads_
.end())
891 return CONTINUE_ON_SHUTDOWN
;
892 return found
->second
->running_shutdown_behavior();
895 SequencedWorkerPool::Inner::GetWorkStatus
SequencedWorkerPool::Inner::GetWork(
897 TimeDelta
* wait_time
,
898 std::vector
<Closure
>* delete_these_outside_lock
) {
899 lock_
.AssertAcquired();
901 #if !defined(OS_NACL)
902 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
903 static_cast<int>(pending_tasks_
.size()));
906 // Find the next task with a sequence token that's not currently in use.
907 // If the token is in use, that means another thread is running something
908 // in that sequence, and we can't run it without going out-of-order.
910 // This algorithm is simple and fair, but inefficient in some cases. For
911 // example, say somebody schedules 1000 slow tasks with the same sequence
912 // number. We'll have to go through all those tasks each time we feel like
913 // there might be work to schedule. If this proves to be a problem, we
914 // should make this more efficient.
916 // One possible enhancement would be to keep a map from sequence ID to a
917 // list of pending but currently blocked SequencedTasks for that ID.
918 // When a worker finishes a task of one sequence token, it can pick up the
919 // next one from that token right away.
921 // This may lead to starvation if there are sufficient numbers of sequences
922 // in use. To alleviate this, we could add an incrementing priority counter
923 // to each SequencedTask. Then maintain a priority_queue of all runnable
924 // tasks, sorted by priority counter. When a sequenced task is completed
925 // we would pop the head element off of that tasks pending list and add it
926 // to the priority queue. Then we would run the first item in the priority
929 GetWorkStatus status
= GET_WORK_NOT_FOUND
;
930 int unrunnable_tasks
= 0;
931 PendingTaskSet::iterator i
= pending_tasks_
.begin();
932 // We assume that the loop below doesn't take too long and so we can just do
933 // a single call to TimeTicks::Now().
934 const TimeTicks current_time
= TimeTicks::Now();
935 while (i
!= pending_tasks_
.end()) {
936 if (!IsSequenceTokenRunnable(i
->sequence_token_id
)) {
942 if (shutdown_called_
&& i
->shutdown_behavior
!= BLOCK_SHUTDOWN
) {
943 // We're shutting down and the task we just found isn't blocking
944 // shutdown. Delete it and get more work.
946 // Note that we do not want to delete unrunnable tasks. Deleting a task
947 // can have side effects (like freeing some objects) and deleting a
948 // task that's supposed to run after one that's currently running could
949 // cause an obscure crash.
951 // We really want to delete these tasks outside the lock in case the
952 // closures are holding refs to objects that want to post work from
953 // their destructorss (which would deadlock). The closures are
954 // internally refcounted, so we just need to keep a copy of them alive
955 // until the lock is exited. The calling code can just clear() the
956 // vector they passed to us once the lock is exited to make this
958 delete_these_outside_lock
->push_back(i
->task
);
959 pending_tasks_
.erase(i
++);
963 if (i
->time_to_run
> current_time
) {
964 // The time to run has not come yet.
965 *wait_time
= i
->time_to_run
- current_time
;
966 status
= GET_WORK_WAIT
;
967 if (cleanup_state_
== CLEANUP_RUNNING
) {
968 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
969 delete_these_outside_lock
->push_back(i
->task
);
970 pending_tasks_
.erase(i
);
975 // Found a runnable task.
977 pending_tasks_
.erase(i
);
978 if (task
->shutdown_behavior
== BLOCK_SHUTDOWN
) {
979 blocking_shutdown_pending_task_count_
--;
982 status
= GET_WORK_FOUND
;
986 // Track the number of tasks we had to skip over to see if we should be
987 // making this more efficient. If this number ever becomes large or is
988 // frequently "some", we should consider the optimization above.
989 #if !defined(OS_NACL)
990 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
996 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask
& task
) {
997 lock_
.AssertAcquired();
999 // Mark the task's sequence number as in use.
1000 if (task
.sequence_token_id
)
1001 current_sequences_
.insert(task
.sequence_token_id
);
1003 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1004 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1006 if (task
.shutdown_behavior
!= CONTINUE_ON_SHUTDOWN
)
1007 blocking_shutdown_thread_count_
++;
1009 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1010 // creates a new thread if there is no free one, there is a race when posting
1011 // tasks that many tasks could have been posted before a thread started
1012 // running them, so only one thread would have been created. So we also check
1013 // whether we should create more threads after removing our task from the
1014 // queue, which also has the nice side effect of creating the workers from
1015 // background threads rather than the main thread of the app.
1017 // If another thread wasn't created, we want to wake up an existing thread
1018 // if there is one waiting to pick up the next task.
1020 // Note that we really need to do this *before* running the task, not
1021 // after. Otherwise, if more than one task is posted, the creation of the
1022 // second thread (since we only create one at a time) will be blocked by
1023 // the execution of the first task, which could be arbitrarily long.
1024 return PrepareToStartAdditionalThreadIfHelpful();
1027 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask
& task
) {
1028 lock_
.AssertAcquired();
1030 if (task
.shutdown_behavior
!= CONTINUE_ON_SHUTDOWN
) {
1031 DCHECK_GT(blocking_shutdown_thread_count_
, 0u);
1032 blocking_shutdown_thread_count_
--;
1035 if (task
.sequence_token_id
)
1036 current_sequences_
.erase(task
.sequence_token_id
);
1039 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1040 int sequence_token_id
) const {
1041 lock_
.AssertAcquired();
1042 return !sequence_token_id
||
1043 current_sequences_
.find(sequence_token_id
) ==
1044 current_sequences_
.end();
1047 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1048 lock_
.AssertAcquired();
1049 // How thread creation works:
1051 // We'de like to avoid creating threads with the lock held. However, we
1052 // need to be sure that we have an accurate accounting of the threads for
1053 // proper Joining and deltion on shutdown.
1055 // We need to figure out if we need another thread with the lock held, which
1056 // is what this function does. It then marks us as in the process of creating
1057 // a thread. When we do shutdown, we wait until the thread_being_created_
1058 // flag is cleared, which ensures that the new thread is properly added to
1059 // all the data structures and we can't leak it. Once shutdown starts, we'll
1060 // refuse to create more threads or they would be leaked.
1062 // Note that this creates a mostly benign race condition on shutdown that
1063 // will cause fewer workers to be created than one would expect. It isn't
1064 // much of an issue in real life, but affects some tests. Since we only spawn
1065 // one worker at a time, the following sequence of events can happen:
1067 // 1. Main thread posts a bunch of unrelated tasks that would normally be
1068 // run on separate threads.
1069 // 2. The first task post causes us to start a worker. Other tasks do not
1070 // cause a worker to start since one is pending.
1071 // 3. Main thread initiates shutdown.
1072 // 4. No more threads are created since the shutdown_called_ flag is set.
1074 // The result is that one may expect that max_threads_ workers to be created
1075 // given the workload, but in reality fewer may be created because the
1076 // sequence of thread creation on the background threads is racing with the
1078 if (!shutdown_called_
&&
1079 !thread_being_created_
&&
1080 cleanup_state_
== CLEANUP_DONE
&&
1081 threads_
.size() < max_threads_
&&
1082 waiting_thread_count_
== 0) {
1083 // We could use an additional thread if there's work to be done.
1084 for (PendingTaskSet::const_iterator i
= pending_tasks_
.begin();
1085 i
!= pending_tasks_
.end(); ++i
) {
1086 if (IsSequenceTokenRunnable(i
->sequence_token_id
)) {
1087 // Found a runnable task, mark the thread as being started.
1088 thread_being_created_
= true;
1089 return static_cast<int>(threads_
.size() + 1);
1096 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1097 int thread_number
) {
1098 // Called outside of the lock.
1099 DCHECK(thread_number
> 0);
1101 // The worker is assigned to the list when the thread actually starts, which
1102 // will manage the memory of the pointer.
1103 new Worker(worker_pool_
, thread_number
, thread_name_prefix_
);
1106 void SequencedWorkerPool::Inner::SignalHasWork() {
1107 has_work_cv_
.Signal();
1108 if (testing_observer_
) {
1109 testing_observer_
->OnHasWork();
1113 bool SequencedWorkerPool::Inner::CanShutdown() const {
1114 lock_
.AssertAcquired();
1115 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1116 return !thread_being_created_
&&
1117 blocking_shutdown_thread_count_
== 0 &&
1118 blocking_shutdown_pending_task_count_
== 0;
1121 base::StaticAtomicSequenceNumber
1122 SequencedWorkerPool::Inner::g_last_sequence_number_
;
1124 // SequencedWorkerPool --------------------------------------------------------
1127 SequencedWorkerPool::SequenceToken
1128 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1129 // Don't construct lazy instance on check.
1130 if (g_lazy_tls_ptr
== NULL
)
1131 return SequenceToken();
1133 SequencedWorkerPool::SequenceToken
* token
= g_lazy_tls_ptr
.Get().Get();
1135 return SequenceToken();
1139 SequencedWorkerPool::SequencedWorkerPool(
1141 const std::string
& thread_name_prefix
)
1142 : constructor_message_loop_(MessageLoopProxy::current()),
1143 inner_(new Inner(this, max_threads
, thread_name_prefix
, NULL
)) {
1146 SequencedWorkerPool::SequencedWorkerPool(
1148 const std::string
& thread_name_prefix
,
1149 TestingObserver
* observer
)
1150 : constructor_message_loop_(MessageLoopProxy::current()),
1151 inner_(new Inner(this, max_threads
, thread_name_prefix
, observer
)) {
1154 SequencedWorkerPool::~SequencedWorkerPool() {}
1156 void SequencedWorkerPool::OnDestruct() const {
1157 DCHECK(constructor_message_loop_
.get());
1158 // Avoid deleting ourselves on a worker thread (which would
1160 if (RunsTasksOnCurrentThread()) {
1161 constructor_message_loop_
->DeleteSoon(FROM_HERE
, this);
1167 SequencedWorkerPool::SequenceToken
SequencedWorkerPool::GetSequenceToken() {
1168 return inner_
->GetSequenceToken();
1171 SequencedWorkerPool::SequenceToken
SequencedWorkerPool::GetNamedSequenceToken(
1172 const std::string
& name
) {
1173 return inner_
->GetNamedSequenceToken(name
);
1176 scoped_refptr
<SequencedTaskRunner
> SequencedWorkerPool::GetSequencedTaskRunner(
1177 SequenceToken token
) {
1178 return GetSequencedTaskRunnerWithShutdownBehavior(token
, BLOCK_SHUTDOWN
);
1181 scoped_refptr
<SequencedTaskRunner
>
1182 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1183 SequenceToken token
, WorkerShutdown shutdown_behavior
) {
1184 return new SequencedWorkerPoolSequencedTaskRunner(
1185 this, token
, shutdown_behavior
);
1188 scoped_refptr
<TaskRunner
>
1189 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1190 WorkerShutdown shutdown_behavior
) {
1191 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior
);
1194 bool SequencedWorkerPool::PostWorkerTask(
1195 const tracked_objects::Location
& from_here
,
1196 const Closure
& task
) {
1197 return inner_
->PostTask(NULL
, SequenceToken(), BLOCK_SHUTDOWN
,
1198 from_here
, task
, TimeDelta());
1201 bool SequencedWorkerPool::PostDelayedWorkerTask(
1202 const tracked_objects::Location
& from_here
,
1203 const Closure
& task
,
1205 WorkerShutdown shutdown_behavior
=
1206 delay
== TimeDelta() ? BLOCK_SHUTDOWN
: SKIP_ON_SHUTDOWN
;
1207 return inner_
->PostTask(NULL
, SequenceToken(), shutdown_behavior
,
1208 from_here
, task
, delay
);
1211 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1212 const tracked_objects::Location
& from_here
,
1213 const Closure
& task
,
1214 WorkerShutdown shutdown_behavior
) {
1215 return inner_
->PostTask(NULL
, SequenceToken(), shutdown_behavior
,
1216 from_here
, task
, TimeDelta());
1219 bool SequencedWorkerPool::PostSequencedWorkerTask(
1220 SequenceToken sequence_token
,
1221 const tracked_objects::Location
& from_here
,
1222 const Closure
& task
) {
1223 return inner_
->PostTask(NULL
, sequence_token
, BLOCK_SHUTDOWN
,
1224 from_here
, task
, TimeDelta());
1227 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1228 SequenceToken sequence_token
,
1229 const tracked_objects::Location
& from_here
,
1230 const Closure
& task
,
1232 WorkerShutdown shutdown_behavior
=
1233 delay
== TimeDelta() ? BLOCK_SHUTDOWN
: SKIP_ON_SHUTDOWN
;
1234 return inner_
->PostTask(NULL
, sequence_token
, shutdown_behavior
,
1235 from_here
, task
, delay
);
1238 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1239 const std::string
& token_name
,
1240 const tracked_objects::Location
& from_here
,
1241 const Closure
& task
) {
1242 DCHECK(!token_name
.empty());
1243 return inner_
->PostTask(&token_name
, SequenceToken(), BLOCK_SHUTDOWN
,
1244 from_here
, task
, TimeDelta());
1247 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1248 SequenceToken sequence_token
,
1249 const tracked_objects::Location
& from_here
,
1250 const Closure
& task
,
1251 WorkerShutdown shutdown_behavior
) {
1252 return inner_
->PostTask(NULL
, sequence_token
, shutdown_behavior
,
1253 from_here
, task
, TimeDelta());
1256 bool SequencedWorkerPool::PostDelayedTask(
1257 const tracked_objects::Location
& from_here
,
1258 const Closure
& task
,
1260 return PostDelayedWorkerTask(from_here
, task
, delay
);
1263 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1264 return inner_
->RunsTasksOnCurrentThread();
1267 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1268 SequenceToken sequence_token
) const {
1269 return inner_
->IsRunningSequenceOnCurrentThread(sequence_token
);
1272 void SequencedWorkerPool::FlushForTesting() {
1273 inner_
->CleanupForTesting();
1276 void SequencedWorkerPool::SignalHasWorkForTesting() {
1277 inner_
->SignalHasWorkForTesting();
1280 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown
) {
1281 DCHECK(constructor_message_loop_
->BelongsToCurrentThread());
1282 inner_
->Shutdown(max_new_blocking_tasks_after_shutdown
);
1285 bool SequencedWorkerPool::IsShutdownInProgress() {
1286 return inner_
->IsShutdownInProgress();