1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/synchronization/condition_variable.h"
15 #include "base/synchronization/lock.h"
16 #include "base/test/sequenced_task_runner_test_template.h"
17 #include "base/test/sequenced_worker_pool_owner.h"
18 #include "base/test/task_runner_test_template.h"
19 #include "base/test/test_timeouts.h"
20 #include "base/threading/platform_thread.h"
21 #include "base/time/time.h"
22 #include "base/tracked_objects.h"
23 #include "testing/gtest/include/gtest/gtest.h"
29 // Many of these tests have failure modes where they'll hang forever. These
30 // tests should not be flaky, and hanging indicates a type of failure. Do not
31 // mark as flaky if they're hanging, it's likely an actual bug.
35 const size_t kNumWorkerThreads
= 3;
37 // Allows a number of threads to all be blocked on the same event, and
38 // provides a way to unblock a certain number of them.
41 ThreadBlocker() : lock_(), cond_var_(&lock_
), unblock_counter_(0) {}
45 base::AutoLock
lock(lock_
);
46 while (unblock_counter_
== 0)
53 void Unblock(size_t count
) {
55 base::AutoLock
lock(lock_
);
56 DCHECK_EQ(unblock_counter_
, 0u);
57 unblock_counter_
= count
;
64 base::ConditionVariable cond_var_
;
66 size_t unblock_counter_
;
69 class DestructionDeadlockChecker
70 : public base::RefCountedThreadSafe
<DestructionDeadlockChecker
> {
72 DestructionDeadlockChecker(const scoped_refptr
<SequencedWorkerPool
>& pool
)
76 virtual ~DestructionDeadlockChecker() {
77 // This method should not deadlock.
78 pool_
->RunsTasksOnCurrentThread();
82 scoped_refptr
<SequencedWorkerPool
> pool_
;
83 friend class base::RefCountedThreadSafe
<DestructionDeadlockChecker
>;
86 class TestTracker
: public base::RefCountedThreadSafe
<TestTracker
> {
94 // Each of these tasks appends the argument to the complete sequence vector
95 // so calling code can see what order they finished in.
96 void FastTask(int id
) {
100 void SlowTask(int id
) {
101 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
102 SignalWorkerDone(id
);
105 void BlockTask(int id
, ThreadBlocker
* blocker
) {
106 // Note that this task has started and signal anybody waiting for that
109 base::AutoLock
lock(lock_
);
115 SignalWorkerDone(id
);
118 void PostAdditionalTasks(
119 int id
, SequencedWorkerPool
* pool
,
120 bool expected_return_value
) {
121 Closure fast_task
= base::Bind(&TestTracker::FastTask
, this, 100);
122 EXPECT_EQ(expected_return_value
,
123 pool
->PostWorkerTaskWithShutdownBehavior(
124 FROM_HERE
, fast_task
,
125 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
126 EXPECT_EQ(expected_return_value
,
127 pool
->PostWorkerTaskWithShutdownBehavior(
128 FROM_HERE
, fast_task
,
129 SequencedWorkerPool::SKIP_ON_SHUTDOWN
));
130 pool
->PostWorkerTaskWithShutdownBehavior(
131 FROM_HERE
, fast_task
,
132 SequencedWorkerPool::BLOCK_SHUTDOWN
);
133 SignalWorkerDone(id
);
136 // This task posts itself back onto the SequencedWorkerPool before it
137 // finishes running. Each instance of the task maintains a strong reference
138 // to a DestructionDeadlockChecker. The DestructionDeadlockChecker is only
139 // destroyed when the task is destroyed without being run, which only happens
140 // during destruction of the SequencedWorkerPool.
141 void PostRepostingTask(
142 const scoped_refptr
<SequencedWorkerPool
>& pool
,
143 const scoped_refptr
<DestructionDeadlockChecker
>& checker
) {
144 Closure reposting_task
=
145 base::Bind(&TestTracker::PostRepostingTask
, this, pool
, checker
);
146 pool
->PostWorkerTaskWithShutdownBehavior(
147 FROM_HERE
, reposting_task
, SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
150 // This task reposts itself back onto the SequencedWorkerPool before it
152 void PostRepostingBlockingTask(
153 const scoped_refptr
<SequencedWorkerPool
>& pool
,
154 const SequencedWorkerPool::SequenceToken
& token
) {
155 Closure reposting_task
=
156 base::Bind(&TestTracker::PostRepostingBlockingTask
, this, pool
, token
);
157 pool
->PostSequencedWorkerTaskWithShutdownBehavior(token
,
158 FROM_HERE
, reposting_task
, SequencedWorkerPool::BLOCK_SHUTDOWN
);
161 void PostBlockingTaskThenUnblockThreads(
162 const scoped_refptr
<SequencedWorkerPool
>& pool
,
163 ThreadBlocker
* blocker
,
164 size_t threads_to_wake
) {
165 Closure arbitrary_task
= base::Bind(&TestTracker::FastTask
, this, 0);
166 pool
->PostWorkerTaskWithShutdownBehavior(
167 FROM_HERE
, arbitrary_task
, SequencedWorkerPool::BLOCK_SHUTDOWN
);
168 blocker
->Unblock(threads_to_wake
);
171 // Waits until the given number of tasks have started executing.
172 void WaitUntilTasksBlocked(size_t count
) {
174 base::AutoLock
lock(lock_
);
175 while (started_events_
< count
)
181 // Blocks the current thread until at least the given number of tasks are in
182 // the completed vector, and then returns a copy.
183 std::vector
<int> WaitUntilTasksComplete(size_t num_tasks
) {
184 std::vector
<int> ret
;
186 base::AutoLock
lock(lock_
);
187 while (complete_sequence_
.size() < num_tasks
)
189 ret
= complete_sequence_
;
195 size_t GetTasksCompletedCount() {
196 base::AutoLock
lock(lock_
);
197 return complete_sequence_
.size();
200 void ClearCompleteSequence() {
201 base::AutoLock
lock(lock_
);
202 complete_sequence_
.clear();
207 friend class base::RefCountedThreadSafe
<TestTracker
>;
210 void SignalWorkerDone(int id
) {
212 base::AutoLock
lock(lock_
);
213 complete_sequence_
.push_back(id
);
218 // Protects the complete_sequence.
221 base::ConditionVariable cond_var_
;
223 // Protected by lock_.
224 std::vector
<int> complete_sequence_
;
226 // Counter of the number of "block" workers that have started.
227 size_t started_events_
;
230 class SequencedWorkerPoolTest
: public testing::Test
{
232 SequencedWorkerPoolTest()
233 : tracker_(new TestTracker
) {
237 void TearDown() override
{ pool()->Shutdown(); }
239 const scoped_refptr
<SequencedWorkerPool
>& pool() {
240 return pool_owner_
->pool();
242 TestTracker
* tracker() { return tracker_
.get(); }
244 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
245 // down, and creates a new instance.
247 pool_owner_
.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads
, "test"));
250 void SetWillWaitForShutdownCallback(const Closure
& callback
) {
251 pool_owner_
->SetWillWaitForShutdownCallback(callback
);
254 // Ensures that the given number of worker threads is created by adding
255 // tasks and waiting until they complete. Worker thread creation is
256 // serialized, can happen on background threads asynchronously, and doesn't
257 // happen any more at shutdown. This means that if a test posts a bunch of
258 // tasks and calls shutdown, fewer workers will be created than the test may
261 // This function ensures that this condition can't happen so tests can make
262 // assumptions about the number of workers active. See the comment in
263 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
266 // It will post tasks to the queue with id -1. It also assumes this is the
267 // first thing called in a test since it will clear the complete_sequence_.
268 void EnsureAllWorkersCreated() {
269 // Create a bunch of threads, all waiting. This will cause that may
270 // workers to be created.
271 ThreadBlocker blocker
;
272 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
273 pool()->PostWorkerTask(FROM_HERE
,
274 base::Bind(&TestTracker::BlockTask
,
275 tracker(), -1, &blocker
));
277 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
279 // Now wake them up and wait until they're done.
280 blocker
.Unblock(kNumWorkerThreads
);
281 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
);
283 // Clean up the task IDs we added.
284 tracker()->ClearCompleteSequence();
287 int has_work_call_count() const {
288 return pool_owner_
->has_work_call_count();
292 MessageLoop message_loop_
;
293 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
294 const scoped_refptr
<TestTracker
> tracker_
;
297 // Checks that the given number of entries are in the tasks to complete of
298 // the given tracker, and then signals the given event the given number of
299 // times. This is used to wake up blocked background threads before blocking
301 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr
<TestTracker
> tracker
,
302 size_t expected_tasks_to_complete
,
303 ThreadBlocker
* blocker
,
304 size_t threads_to_awake
) {
306 expected_tasks_to_complete
,
307 tracker
->WaitUntilTasksComplete(expected_tasks_to_complete
).size());
309 blocker
->Unblock(threads_to_awake
);
312 class DeletionHelper
: public base::RefCountedThreadSafe
<DeletionHelper
> {
314 explicit DeletionHelper(
315 const scoped_refptr
<base::RefCountedData
<bool> >& deleted_flag
)
316 : deleted_flag_(deleted_flag
) {
320 friend class base::RefCountedThreadSafe
<DeletionHelper
>;
321 virtual ~DeletionHelper() { deleted_flag_
->data
= true; }
323 const scoped_refptr
<base::RefCountedData
<bool> > deleted_flag_
;
324 DISALLOW_COPY_AND_ASSIGN(DeletionHelper
);
327 void HoldPoolReference(const scoped_refptr
<base::SequencedWorkerPool
>& pool
,
328 const scoped_refptr
<DeletionHelper
>& helper
) {
329 ADD_FAILURE() << "Should never run";
332 // Tests that delayed tasks are deleted upon shutdown of the pool.
333 TEST_F(SequencedWorkerPoolTest
, DelayedTaskDuringShutdown
) {
334 // Post something to verify the pool is started up.
335 EXPECT_TRUE(pool()->PostTask(
336 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 1)));
338 scoped_refptr
<base::RefCountedData
<bool> > deleted_flag(
339 new base::RefCountedData
<bool>(false));
341 base::Time
posted_at(base::Time::Now());
342 // Post something that shouldn't run.
343 EXPECT_TRUE(pool()->PostDelayedTask(
345 base::Bind(&HoldPoolReference
,
347 make_scoped_refptr(new DeletionHelper(deleted_flag
))),
348 TestTimeouts::action_timeout()));
350 std::vector
<int> completion_sequence
= tracker()->WaitUntilTasksComplete(1);
351 ASSERT_EQ(1u, completion_sequence
.size());
352 ASSERT_EQ(1, completion_sequence
[0]);
355 // Shutdown is asynchronous, so use ResetPool() to block until the pool is
356 // fully destroyed (and thus shut down).
359 // Verify that we didn't block until the task was due.
360 ASSERT_LT(base::Time::Now() - posted_at
, TestTimeouts::action_timeout());
362 // Verify that the deferred task has not only not run, but has also been
364 ASSERT_TRUE(deleted_flag
->data
);
367 // Tests that same-named tokens have the same ID.
368 TEST_F(SequencedWorkerPoolTest
, NamedTokens
) {
369 const std::string
name1("hello");
370 SequencedWorkerPool::SequenceToken token1
=
371 pool()->GetNamedSequenceToken(name1
);
373 SequencedWorkerPool::SequenceToken token2
= pool()->GetSequenceToken();
375 const std::string
name3("goodbye");
376 SequencedWorkerPool::SequenceToken token3
=
377 pool()->GetNamedSequenceToken(name3
);
379 // All 3 tokens should be different.
380 EXPECT_FALSE(token1
.Equals(token2
));
381 EXPECT_FALSE(token1
.Equals(token3
));
382 EXPECT_FALSE(token2
.Equals(token3
));
384 // Requesting the same name again should give the same value.
385 SequencedWorkerPool::SequenceToken token1again
=
386 pool()->GetNamedSequenceToken(name1
);
387 EXPECT_TRUE(token1
.Equals(token1again
));
389 SequencedWorkerPool::SequenceToken token3again
=
390 pool()->GetNamedSequenceToken(name3
);
391 EXPECT_TRUE(token3
.Equals(token3again
));
394 // Tests that posting a bunch of tasks (many more than the number of worker
395 // threads) runs them all.
396 TEST_F(SequencedWorkerPoolTest
, LotsOfTasks
) {
397 pool()->PostWorkerTask(FROM_HERE
,
398 base::Bind(&TestTracker::SlowTask
, tracker(), 0));
400 const size_t kNumTasks
= 20;
401 for (size_t i
= 1; i
< kNumTasks
; i
++) {
402 pool()->PostWorkerTask(FROM_HERE
,
403 base::Bind(&TestTracker::FastTask
, tracker(), i
));
406 std::vector
<int> result
= tracker()->WaitUntilTasksComplete(kNumTasks
);
407 EXPECT_EQ(kNumTasks
, result
.size());
410 // Tests that posting a bunch of tasks (many more than the number of
411 // worker threads) to two pools simultaneously runs them all twice.
412 // This test is meant to shake out any concurrency issues between
413 // pools (like histograms).
414 TEST_F(SequencedWorkerPoolTest
, LotsOfTasksTwoPools
) {
415 SequencedWorkerPoolOwner
pool1(kNumWorkerThreads
, "test1");
416 SequencedWorkerPoolOwner
pool2(kNumWorkerThreads
, "test2");
418 base::Closure slow_task
= base::Bind(&TestTracker::SlowTask
, tracker(), 0);
419 pool1
.pool()->PostWorkerTask(FROM_HERE
, slow_task
);
420 pool2
.pool()->PostWorkerTask(FROM_HERE
, slow_task
);
422 const size_t kNumTasks
= 20;
423 for (size_t i
= 1; i
< kNumTasks
; i
++) {
424 base::Closure fast_task
=
425 base::Bind(&TestTracker::FastTask
, tracker(), i
);
426 pool1
.pool()->PostWorkerTask(FROM_HERE
, fast_task
);
427 pool2
.pool()->PostWorkerTask(FROM_HERE
, fast_task
);
430 std::vector
<int> result
=
431 tracker()->WaitUntilTasksComplete(2*kNumTasks
);
432 EXPECT_EQ(2 * kNumTasks
, result
.size());
434 pool2
.pool()->Shutdown();
435 pool1
.pool()->Shutdown();
438 // Test that tasks with the same sequence token are executed in order but don't
439 // affect other tasks.
440 TEST_F(SequencedWorkerPoolTest
, Sequence
) {
441 // Fill all the worker threads except one.
442 const size_t kNumBackgroundTasks
= kNumWorkerThreads
- 1;
443 ThreadBlocker background_blocker
;
444 for (size_t i
= 0; i
< kNumBackgroundTasks
; i
++) {
445 pool()->PostWorkerTask(FROM_HERE
,
446 base::Bind(&TestTracker::BlockTask
,
447 tracker(), i
, &background_blocker
));
449 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks
);
451 // Create two tasks with the same sequence token, one that will block on the
452 // event, and one which will just complete quickly when it's run. Since there
453 // is one worker thread free, the first task will start and then block, and
454 // the second task should be waiting.
455 ThreadBlocker blocker
;
456 SequencedWorkerPool::SequenceToken token1
= pool()->GetSequenceToken();
457 pool()->PostSequencedWorkerTask(
459 base::Bind(&TestTracker::BlockTask
, tracker(), 100, &blocker
));
460 pool()->PostSequencedWorkerTask(
462 base::Bind(&TestTracker::FastTask
, tracker(), 101));
463 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
465 // Create another two tasks as above with a different token. These will be
466 // blocked since there are no slots to run.
467 SequencedWorkerPool::SequenceToken token2
= pool()->GetSequenceToken();
468 pool()->PostSequencedWorkerTask(
470 base::Bind(&TestTracker::FastTask
, tracker(), 200));
471 pool()->PostSequencedWorkerTask(
473 base::Bind(&TestTracker::FastTask
, tracker(), 201));
474 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
476 // Let one background task complete. This should then let both tasks of
477 // token2 run to completion in order. The second task of token1 should still
479 background_blocker
.Unblock(1);
480 std::vector
<int> result
= tracker()->WaitUntilTasksComplete(3);
481 ASSERT_EQ(3u, result
.size());
482 EXPECT_EQ(200, result
[1]);
483 EXPECT_EQ(201, result
[2]);
485 // Finish the rest of the background tasks. This should leave some workers
486 // free with the second token1 task still blocked on the first.
487 background_blocker
.Unblock(kNumBackgroundTasks
- 1);
488 EXPECT_EQ(kNumBackgroundTasks
+ 2,
489 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks
+ 2).size());
491 // Allow the first task of token1 to complete. This should run the second.
493 result
= tracker()->WaitUntilTasksComplete(kNumBackgroundTasks
+ 4);
494 ASSERT_EQ(kNumBackgroundTasks
+ 4, result
.size());
495 EXPECT_EQ(100, result
[result
.size() - 2]);
496 EXPECT_EQ(101, result
[result
.size() - 1]);
499 // Tests that any tasks posted after Shutdown are ignored.
500 // Disabled for flakiness. See http://crbug.com/166451.
501 TEST_F(SequencedWorkerPoolTest
, DISABLED_IgnoresAfterShutdown
) {
502 // Start tasks to take all the threads and block them.
503 EnsureAllWorkersCreated();
504 ThreadBlocker blocker
;
505 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
506 pool()->PostWorkerTask(FROM_HERE
,
507 base::Bind(&TestTracker::BlockTask
,
508 tracker(), i
, &blocker
));
510 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
512 SetWillWaitForShutdownCallback(
513 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
514 scoped_refptr
<TestTracker
>(tracker()), 0,
515 &blocker
, kNumWorkerThreads
));
517 // Shutdown the worker pool. This should discard all non-blocking tasks.
518 const int kMaxNewBlockingTasksAfterShutdown
= 100;
519 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown
);
521 int old_has_work_call_count
= has_work_call_count();
523 std::vector
<int> result
=
524 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
);
526 // The kNumWorkerThread items should have completed, in no particular order.
527 ASSERT_EQ(kNumWorkerThreads
, result
.size());
528 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
529 EXPECT_TRUE(std::find(result
.begin(), result
.end(), static_cast<int>(i
)) !=
533 // No further tasks, regardless of shutdown mode, should be allowed.
534 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
536 base::Bind(&TestTracker::FastTask
, tracker(), 100),
537 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
538 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
540 base::Bind(&TestTracker::FastTask
, tracker(), 101),
541 SequencedWorkerPool::SKIP_ON_SHUTDOWN
));
542 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
544 base::Bind(&TestTracker::FastTask
, tracker(), 102),
545 SequencedWorkerPool::BLOCK_SHUTDOWN
));
547 ASSERT_EQ(old_has_work_call_count
, has_work_call_count());
550 TEST_F(SequencedWorkerPoolTest
, AllowsAfterShutdown
) {
551 // Test that <n> new blocking tasks are allowed provided they're posted
552 // by a running tasks.
553 EnsureAllWorkersCreated();
554 ThreadBlocker blocker
;
556 // Start tasks to take all the threads and block them.
557 const int kNumBlockTasks
= static_cast<int>(kNumWorkerThreads
);
558 for (int i
= 0; i
< kNumBlockTasks
; ++i
) {
559 EXPECT_TRUE(pool()->PostWorkerTask(
561 base::Bind(&TestTracker::BlockTask
, tracker(), i
, &blocker
)));
563 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
565 // Queue up shutdown blocking tasks behind those which will attempt to post
566 // additional tasks when run, PostAdditionalTasks attemtps to post 3
567 // new FastTasks, one for each shutdown_behavior.
568 const int kNumQueuedTasks
= static_cast<int>(kNumWorkerThreads
);
569 for (int i
= 0; i
< kNumQueuedTasks
; ++i
) {
570 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
572 base::Bind(&TestTracker::PostAdditionalTasks
, tracker(), i
, pool(),
574 SequencedWorkerPool::BLOCK_SHUTDOWN
));
577 // Setup to open the floodgates from within Shutdown().
578 SetWillWaitForShutdownCallback(
579 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
580 scoped_refptr
<TestTracker
>(tracker()),
581 0, &blocker
, kNumBlockTasks
));
583 // Allow half of the additional blocking tasks thru.
584 const int kNumNewBlockingTasksToAllow
= kNumWorkerThreads
/ 2;
585 pool()->Shutdown(kNumNewBlockingTasksToAllow
);
587 // Ensure that the correct number of tasks actually got run.
588 tracker()->WaitUntilTasksComplete(static_cast<size_t>(
589 kNumBlockTasks
+ kNumQueuedTasks
+ kNumNewBlockingTasksToAllow
));
591 // Clean up the task IDs we added and go home.
592 tracker()->ClearCompleteSequence();
595 // Tests that blocking tasks can still be posted during shutdown, as long as
596 // the task is not being posted within the context of a running task.
597 TEST_F(SequencedWorkerPoolTest
,
598 AllowsBlockingTasksDuringShutdownOutsideOfRunningTask
) {
599 EnsureAllWorkersCreated();
600 ThreadBlocker blocker
;
602 // Start tasks to take all the threads and block them.
603 const int kNumBlockTasks
= static_cast<int>(kNumWorkerThreads
);
604 for (int i
= 0; i
< kNumBlockTasks
; ++i
) {
605 EXPECT_TRUE(pool()->PostWorkerTask(
607 base::Bind(&TestTracker::BlockTask
, tracker(), i
, &blocker
)));
609 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
611 // Setup to open the floodgates from within Shutdown().
612 SetWillWaitForShutdownCallback(
613 base::Bind(&TestTracker::PostBlockingTaskThenUnblockThreads
,
614 scoped_refptr
<TestTracker
>(tracker()), pool(), &blocker
,
616 pool()->Shutdown(kNumWorkerThreads
+ 1);
618 // Ensure that the correct number of tasks actually got run.
619 tracker()->WaitUntilTasksComplete(static_cast<size_t>(kNumWorkerThreads
+ 1));
620 tracker()->ClearCompleteSequence();
623 // Tests that unrun tasks are discarded properly according to their shutdown
625 TEST_F(SequencedWorkerPoolTest
, DiscardOnShutdown
) {
626 // Start tasks to take all the threads and block them.
627 EnsureAllWorkersCreated();
628 ThreadBlocker blocker
;
629 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
630 pool()->PostWorkerTask(FROM_HERE
,
631 base::Bind(&TestTracker::BlockTask
,
632 tracker(), i
, &blocker
));
634 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
636 // Create some tasks with different shutdown modes.
637 pool()->PostWorkerTaskWithShutdownBehavior(
639 base::Bind(&TestTracker::FastTask
, tracker(), 100),
640 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
);
641 pool()->PostWorkerTaskWithShutdownBehavior(
643 base::Bind(&TestTracker::FastTask
, tracker(), 101),
644 SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
645 pool()->PostWorkerTaskWithShutdownBehavior(
647 base::Bind(&TestTracker::FastTask
, tracker(), 102),
648 SequencedWorkerPool::BLOCK_SHUTDOWN
);
650 // Shutdown the worker pool. This should discard all non-blocking tasks.
651 SetWillWaitForShutdownCallback(
652 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
653 scoped_refptr
<TestTracker
>(tracker()), 0,
654 &blocker
, kNumWorkerThreads
));
657 std::vector
<int> result
=
658 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
+ 1);
660 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
661 // one, in no particular order.
662 ASSERT_EQ(kNumWorkerThreads
+ 1, result
.size());
663 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
664 EXPECT_TRUE(std::find(result
.begin(), result
.end(), static_cast<int>(i
)) !=
667 EXPECT_TRUE(std::find(result
.begin(), result
.end(), 102) != result
.end());
670 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
671 TEST_F(SequencedWorkerPoolTest
, ContinueOnShutdown
) {
672 scoped_refptr
<TaskRunner
> runner(pool()->GetTaskRunnerWithShutdownBehavior(
673 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
674 scoped_refptr
<SequencedTaskRunner
> sequenced_runner(
675 pool()->GetSequencedTaskRunnerWithShutdownBehavior(
676 pool()->GetSequenceToken(),
677 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
678 EnsureAllWorkersCreated();
679 ThreadBlocker blocker
;
680 pool()->PostWorkerTaskWithShutdownBehavior(
682 base::Bind(&TestTracker::BlockTask
,
683 tracker(), 0, &blocker
),
684 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
);
687 base::Bind(&TestTracker::BlockTask
,
688 tracker(), 1, &blocker
));
689 sequenced_runner
->PostTask(
691 base::Bind(&TestTracker::BlockTask
,
692 tracker(), 2, &blocker
));
694 tracker()->WaitUntilTasksBlocked(3);
696 // This should not block. If this test hangs, it means it failed.
699 // The task should not have completed yet.
700 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
702 // Posting more tasks should fail.
703 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
704 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 0),
705 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
706 EXPECT_FALSE(runner
->PostTask(
707 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 0)));
708 EXPECT_FALSE(sequenced_runner
->PostTask(
709 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 0)));
711 // Continue the background thread and make sure the tasks can complete.
713 std::vector
<int> result
= tracker()->WaitUntilTasksComplete(3);
714 EXPECT_EQ(3u, result
.size());
717 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
718 // until they stop, but tasks not yet started do not.
719 TEST_F(SequencedWorkerPoolTest
, SkipOnShutdown
) {
720 // Start tasks to take all the threads and block them.
721 EnsureAllWorkersCreated();
722 ThreadBlocker blocker
;
724 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
725 // return until these tasks have completed.
726 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
727 pool()->PostWorkerTaskWithShutdownBehavior(
729 base::Bind(&TestTracker::BlockTask
, tracker(), i
, &blocker
),
730 SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
732 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
734 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
735 // executed once Shutdown() has been called.
736 pool()->PostWorkerTaskWithShutdownBehavior(
738 base::Bind(&TestTracker::BlockTask
,
739 tracker(), 0, &blocker
),
740 SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
742 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
743 // been started block shutdown.
744 SetWillWaitForShutdownCallback(
745 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
746 scoped_refptr
<TestTracker
>(tracker()), 0,
747 &blocker
, kNumWorkerThreads
));
749 // No tasks should have completed yet.
750 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
752 // This should not block. If this test hangs, it means it failed.
755 // Shutdown should not return until all of the tasks have completed.
756 std::vector
<int> result
=
757 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
);
759 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
760 // allowed to complete. No additional non-blocking tasks should have been
762 ASSERT_EQ(kNumWorkerThreads
, result
.size());
763 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
764 EXPECT_TRUE(std::find(result
.begin(), result
.end(), static_cast<int>(i
)) !=
769 // Ensure all worker threads are created, and then trigger a spurious
770 // work signal. This shouldn't cause any other work signals to be
771 // triggered. This is a regression test for http://crbug.com/117469.
772 TEST_F(SequencedWorkerPoolTest
, SpuriousWorkSignal
) {
773 EnsureAllWorkersCreated();
774 int old_has_work_call_count
= has_work_call_count();
775 pool()->SignalHasWorkForTesting();
776 // This is inherently racy, but can only produce false positives.
777 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
778 EXPECT_EQ(old_has_work_call_count
+ 1, has_work_call_count());
781 void IsRunningOnCurrentThreadTask(
782 SequencedWorkerPool::SequenceToken test_positive_token
,
783 SequencedWorkerPool::SequenceToken test_negative_token
,
784 SequencedWorkerPool
* pool
,
785 SequencedWorkerPool
* unused_pool
) {
786 EXPECT_TRUE(pool
->RunsTasksOnCurrentThread());
787 EXPECT_TRUE(pool
->IsRunningSequenceOnCurrentThread(test_positive_token
));
788 EXPECT_FALSE(pool
->IsRunningSequenceOnCurrentThread(test_negative_token
));
789 EXPECT_FALSE(unused_pool
->RunsTasksOnCurrentThread());
791 unused_pool
->IsRunningSequenceOnCurrentThread(test_positive_token
));
793 unused_pool
->IsRunningSequenceOnCurrentThread(test_negative_token
));
796 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
797 TEST_F(SequencedWorkerPoolTest
, IsRunningOnCurrentThread
) {
798 SequencedWorkerPool::SequenceToken token1
= pool()->GetSequenceToken();
799 SequencedWorkerPool::SequenceToken token2
= pool()->GetSequenceToken();
800 SequencedWorkerPool::SequenceToken unsequenced_token
;
802 scoped_refptr
<SequencedWorkerPool
> unused_pool
=
803 new SequencedWorkerPool(2, "unused_pool");
805 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
806 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1
));
807 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2
));
808 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token
));
809 EXPECT_FALSE(unused_pool
->RunsTasksOnCurrentThread());
810 EXPECT_FALSE(unused_pool
->IsRunningSequenceOnCurrentThread(token1
));
811 EXPECT_FALSE(unused_pool
->IsRunningSequenceOnCurrentThread(token2
));
813 unused_pool
->IsRunningSequenceOnCurrentThread(unsequenced_token
));
815 pool()->PostSequencedWorkerTask(
817 base::Bind(&IsRunningOnCurrentThreadTask
,
818 token1
, token2
, pool(), unused_pool
));
819 pool()->PostSequencedWorkerTask(
821 base::Bind(&IsRunningOnCurrentThreadTask
,
822 token2
, unsequenced_token
, pool(), unused_pool
));
823 pool()->PostWorkerTask(
825 base::Bind(&IsRunningOnCurrentThreadTask
,
826 unsequenced_token
, token1
, pool(), unused_pool
));
828 unused_pool
->Shutdown();
831 // Checks that tasks are destroyed in the right context during shutdown. If a
832 // task is destroyed while SequencedWorkerPool's global lock is held,
833 // SequencedWorkerPool might deadlock.
834 TEST_F(SequencedWorkerPoolTest
, AvoidsDeadlockOnShutdown
) {
835 for (int i
= 0; i
< 4; ++i
) {
836 scoped_refptr
<DestructionDeadlockChecker
> checker(
837 new DestructionDeadlockChecker(pool()));
838 tracker()->PostRepostingTask(pool(), checker
);
841 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
842 // which in turn should not deadlock in their destructors.
846 // Similar to the test AvoidsDeadlockOnShutdown, but there are now also
847 // sequenced, blocking tasks in the queue during shutdown.
848 TEST_F(SequencedWorkerPoolTest
,
849 AvoidsDeadlockOnShutdownWithSequencedBlockingTasks
) {
850 const std::string
sequence_token_name("name");
851 for (int i
= 0; i
< 4; ++i
) {
852 scoped_refptr
<DestructionDeadlockChecker
> checker(
853 new DestructionDeadlockChecker(pool()));
854 tracker()->PostRepostingTask(pool(), checker
);
856 SequencedWorkerPool::SequenceToken token1
=
857 pool()->GetNamedSequenceToken(sequence_token_name
);
858 tracker()->PostRepostingBlockingTask(pool(), token1
);
861 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
862 // which in turn should not deadlock in their destructors.
866 // Verify that FlushForTesting works as intended.
867 TEST_F(SequencedWorkerPoolTest
, FlushForTesting
) {
868 // Should be fine to call on a new instance.
869 pool()->FlushForTesting();
871 // Queue up a bunch of work, including a long delayed task and
872 // a task that produces additional tasks as an artifact.
873 pool()->PostDelayedWorkerTask(
875 base::Bind(&TestTracker::FastTask
, tracker(), 0),
876 TimeDelta::FromMinutes(5));
877 pool()->PostWorkerTask(FROM_HERE
,
878 base::Bind(&TestTracker::SlowTask
, tracker(), 0));
879 const size_t kNumFastTasks
= 20;
880 for (size_t i
= 0; i
< kNumFastTasks
; i
++) {
881 pool()->PostWorkerTask(FROM_HERE
,
882 base::Bind(&TestTracker::FastTask
, tracker(), 0));
884 pool()->PostWorkerTask(
886 base::Bind(&TestTracker::PostAdditionalTasks
, tracker(), 0, pool(),
889 // We expect all except the delayed task to have been run. We verify all
890 // closures have been deleted by looking at the refcount of the
892 EXPECT_FALSE(tracker()->HasOneRef());
893 pool()->FlushForTesting();
894 EXPECT_TRUE(tracker()->HasOneRef());
895 EXPECT_EQ(1 + kNumFastTasks
+ 1 + 3, tracker()->GetTasksCompletedCount());
897 // Should be fine to call on an idle instance with all threads created, and
898 // spamming the method shouldn't deadlock or confuse the class.
899 pool()->FlushForTesting();
900 pool()->FlushForTesting();
902 // Should be fine to call after shutdown too.
904 pool()->FlushForTesting();
907 TEST(SequencedWorkerPoolRefPtrTest
, ShutsDownCleanWithContinueOnShutdown
) {
909 scoped_refptr
<SequencedWorkerPool
> pool(new SequencedWorkerPool(3, "Pool"));
910 scoped_refptr
<SequencedTaskRunner
> task_runner
=
911 pool
->GetSequencedTaskRunnerWithShutdownBehavior(
912 pool
->GetSequenceToken(),
913 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
);
915 // Upon test exit, should shut down without hanging.
919 class SequencedWorkerPoolTaskRunnerTestDelegate
{
921 SequencedWorkerPoolTaskRunnerTestDelegate() {}
923 ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
925 void StartTaskRunner() {
927 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
930 scoped_refptr
<SequencedWorkerPool
> GetTaskRunner() {
931 return pool_owner_
->pool();
934 void StopTaskRunner() {
935 // Make sure all tasks are run before shutting down. Delayed tasks are
936 // not run, they're simply deleted.
937 pool_owner_
->pool()->FlushForTesting();
938 pool_owner_
->pool()->Shutdown();
939 // Don't reset |pool_owner_| here, as the test may still hold a
940 // reference to the pool.
944 MessageLoop message_loop_
;
945 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
948 INSTANTIATE_TYPED_TEST_CASE_P(
949 SequencedWorkerPool
, TaskRunnerTest
,
950 SequencedWorkerPoolTaskRunnerTestDelegate
);
951 INSTANTIATE_TYPED_TEST_CASE_P(SequencedWorkerPool
, TaskRunnerAffinityTest
,
952 SequencedWorkerPoolTaskRunnerTestDelegate
);
954 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate
{
956 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
958 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
961 void StartTaskRunner() {
963 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
964 task_runner_
= pool_owner_
->pool()->GetTaskRunnerWithShutdownBehavior(
965 SequencedWorkerPool::BLOCK_SHUTDOWN
);
968 scoped_refptr
<TaskRunner
> GetTaskRunner() {
972 void StopTaskRunner() {
973 // Make sure all tasks are run before shutting down. Delayed tasks are
974 // not run, they're simply deleted.
975 pool_owner_
->pool()->FlushForTesting();
976 pool_owner_
->pool()->Shutdown();
977 // Don't reset |pool_owner_| here, as the test may still hold a
978 // reference to the pool.
982 MessageLoop message_loop_
;
983 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
984 scoped_refptr
<TaskRunner
> task_runner_
;
987 INSTANTIATE_TYPED_TEST_CASE_P(
988 SequencedWorkerPoolTaskRunner
, TaskRunnerTest
,
989 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate
);
990 INSTANTIATE_TYPED_TEST_CASE_P(
991 SequencedWorkerPoolTaskRunner
, TaskRunnerAffinityTest
,
992 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate
);
994 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate
{
996 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
998 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
1001 void StartTaskRunner() {
1002 pool_owner_
.reset(new SequencedWorkerPoolOwner(
1003 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
1004 task_runner_
= pool_owner_
->pool()->GetSequencedTaskRunner(
1005 pool_owner_
->pool()->GetSequenceToken());
1008 scoped_refptr
<SequencedTaskRunner
> GetTaskRunner() {
1009 return task_runner_
;
1012 void StopTaskRunner() {
1013 // Make sure all tasks are run before shutting down. Delayed tasks are
1014 // not run, they're simply deleted.
1015 pool_owner_
->pool()->FlushForTesting();
1016 pool_owner_
->pool()->Shutdown();
1017 // Don't reset |pool_owner_| here, as the test may still hold a
1018 // reference to the pool.
1022 MessageLoop message_loop_
;
1023 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
1024 scoped_refptr
<SequencedTaskRunner
> task_runner_
;
1027 INSTANTIATE_TYPED_TEST_CASE_P(
1028 SequencedWorkerPoolSequencedTaskRunner
, TaskRunnerTest
,
1029 SequencedWorkerPoolSequencedTaskRunnerTestDelegate
);
1030 INSTANTIATE_TYPED_TEST_CASE_P(
1031 SequencedWorkerPoolSequencedTaskRunner
, TaskRunnerAffinityTest
,
1032 SequencedWorkerPoolSequencedTaskRunnerTestDelegate
);
1034 INSTANTIATE_TYPED_TEST_CASE_P(
1035 SequencedWorkerPoolSequencedTaskRunner
, SequencedTaskRunnerTest
,
1036 SequencedWorkerPoolSequencedTaskRunnerTestDelegate
);