1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/synchronization/lock.h"
17 #include "base/test/sequenced_task_runner_test_template.h"
18 #include "base/test/sequenced_worker_pool_owner.h"
19 #include "base/test/task_runner_test_template.h"
20 #include "base/test/test_timeouts.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/time/time.h"
23 #include "base/tracked_objects.h"
24 #include "testing/gtest/include/gtest/gtest.h"
30 // Many of these tests have failure modes where they'll hang forever. These
31 // tests should not be flaky, and hanging indicates a type of failure. Do not
32 // mark as flaky if they're hanging, it's likely an actual bug.
36 const size_t kNumWorkerThreads
= 3;
38 // Allows a number of threads to all be blocked on the same event, and
39 // provides a way to unblock a certain number of them.
42 ThreadBlocker() : lock_(), cond_var_(&lock_
), unblock_counter_(0) {}
46 base::AutoLock
lock(lock_
);
47 while (unblock_counter_
== 0)
54 void Unblock(size_t count
) {
56 base::AutoLock
lock(lock_
);
57 DCHECK(unblock_counter_
== 0);
58 unblock_counter_
= count
;
65 base::ConditionVariable cond_var_
;
67 size_t unblock_counter_
;
70 class TestTracker
: public base::RefCountedThreadSafe
<TestTracker
> {
78 // Each of these tasks appends the argument to the complete sequence vector
79 // so calling code can see what order they finished in.
80 void FastTask(int id
) {
84 void SlowTask(int id
) {
85 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
89 void BlockTask(int id
, ThreadBlocker
* blocker
) {
90 // Note that this task has started and signal anybody waiting for that
93 base::AutoLock
lock(lock_
);
102 void PostAdditionalTasks(
103 int id
, SequencedWorkerPool
* pool
,
104 bool expected_return_value
) {
105 Closure fast_task
= base::Bind(&TestTracker::FastTask
, this, 100);
106 EXPECT_EQ(expected_return_value
,
107 pool
->PostWorkerTaskWithShutdownBehavior(
108 FROM_HERE
, fast_task
,
109 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
110 EXPECT_EQ(expected_return_value
,
111 pool
->PostWorkerTaskWithShutdownBehavior(
112 FROM_HERE
, fast_task
,
113 SequencedWorkerPool::SKIP_ON_SHUTDOWN
));
114 pool
->PostWorkerTaskWithShutdownBehavior(
115 FROM_HERE
, fast_task
,
116 SequencedWorkerPool::BLOCK_SHUTDOWN
);
117 SignalWorkerDone(id
);
120 // Waits until the given number of tasks have started executing.
121 void WaitUntilTasksBlocked(size_t count
) {
123 base::AutoLock
lock(lock_
);
124 while (started_events_
< count
)
130 // Blocks the current thread until at least the given number of tasks are in
131 // the completed vector, and then returns a copy.
132 std::vector
<int> WaitUntilTasksComplete(size_t num_tasks
) {
133 std::vector
<int> ret
;
135 base::AutoLock
lock(lock_
);
136 while (complete_sequence_
.size() < num_tasks
)
138 ret
= complete_sequence_
;
144 size_t GetTasksCompletedCount() {
145 base::AutoLock
lock(lock_
);
146 return complete_sequence_
.size();
149 void ClearCompleteSequence() {
150 base::AutoLock
lock(lock_
);
151 complete_sequence_
.clear();
156 friend class base::RefCountedThreadSafe
<TestTracker
>;
159 void SignalWorkerDone(int id
) {
161 base::AutoLock
lock(lock_
);
162 complete_sequence_
.push_back(id
);
167 // Protects the complete_sequence.
170 base::ConditionVariable cond_var_
;
172 // Protected by lock_.
173 std::vector
<int> complete_sequence_
;
175 // Counter of the number of "block" workers that have started.
176 size_t started_events_
;
179 class SequencedWorkerPoolTest
: public testing::Test
{
181 SequencedWorkerPoolTest()
182 : tracker_(new TestTracker
) {
186 virtual ~SequencedWorkerPoolTest() {}
188 virtual void SetUp() OVERRIDE
{}
190 virtual void TearDown() OVERRIDE
{
194 const scoped_refptr
<SequencedWorkerPool
>& pool() {
195 return pool_owner_
->pool();
197 TestTracker
* tracker() { return tracker_
.get(); }
199 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
200 // down, and creates a new instance.
202 pool_owner_
.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads
, "test"));
205 void SetWillWaitForShutdownCallback(const Closure
& callback
) {
206 pool_owner_
->SetWillWaitForShutdownCallback(callback
);
209 // Ensures that the given number of worker threads is created by adding
210 // tasks and waiting until they complete. Worker thread creation is
211 // serialized, can happen on background threads asynchronously, and doesn't
212 // happen any more at shutdown. This means that if a test posts a bunch of
213 // tasks and calls shutdown, fewer workers will be created than the test may
216 // This function ensures that this condition can't happen so tests can make
217 // assumptions about the number of workers active. See the comment in
218 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
221 // It will post tasks to the queue with id -1. It also assumes this is the
222 // first thing called in a test since it will clear the complete_sequence_.
223 void EnsureAllWorkersCreated() {
224 // Create a bunch of threads, all waiting. This will cause that may
225 // workers to be created.
226 ThreadBlocker blocker
;
227 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
228 pool()->PostWorkerTask(FROM_HERE
,
229 base::Bind(&TestTracker::BlockTask
,
230 tracker(), -1, &blocker
));
232 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
234 // Now wake them up and wait until they're done.
235 blocker
.Unblock(kNumWorkerThreads
);
236 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
);
238 // Clean up the task IDs we added.
239 tracker()->ClearCompleteSequence();
242 int has_work_call_count() const {
243 return pool_owner_
->has_work_call_count();
247 MessageLoop message_loop_
;
248 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
249 const scoped_refptr
<TestTracker
> tracker_
;
252 // Checks that the given number of entries are in the tasks to complete of
253 // the given tracker, and then signals the given event the given number of
254 // times. This is used to wakt up blocked background threads before blocking
256 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr
<TestTracker
> tracker
,
257 size_t expected_tasks_to_complete
,
258 ThreadBlocker
* blocker
,
259 size_t threads_to_awake
) {
261 expected_tasks_to_complete
,
262 tracker
->WaitUntilTasksComplete(expected_tasks_to_complete
).size());
264 blocker
->Unblock(threads_to_awake
);
267 class DeletionHelper
: public base::RefCountedThreadSafe
<DeletionHelper
> {
269 explicit DeletionHelper(
270 const scoped_refptr
<base::RefCountedData
<bool> >& deleted_flag
)
271 : deleted_flag_(deleted_flag
) {
275 friend class base::RefCountedThreadSafe
<DeletionHelper
>;
276 virtual ~DeletionHelper() { deleted_flag_
->data
= true; }
278 const scoped_refptr
<base::RefCountedData
<bool> > deleted_flag_
;
279 DISALLOW_COPY_AND_ASSIGN(DeletionHelper
);
282 void HoldPoolReference(const scoped_refptr
<base::SequencedWorkerPool
>& pool
,
283 const scoped_refptr
<DeletionHelper
>& helper
) {
284 ADD_FAILURE() << "Should never run";
287 // Tests that delayed tasks are deleted upon shutdown of the pool.
288 TEST_F(SequencedWorkerPoolTest
, DelayedTaskDuringShutdown
) {
289 // Post something to verify the pool is started up.
290 EXPECT_TRUE(pool()->PostTask(
291 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 1)));
293 scoped_refptr
<base::RefCountedData
<bool> > deleted_flag(
294 new base::RefCountedData
<bool>(false));
296 base::Time
posted_at(base::Time::Now());
297 // Post something that shouldn't run.
298 EXPECT_TRUE(pool()->PostDelayedTask(
300 base::Bind(&HoldPoolReference
,
302 make_scoped_refptr(new DeletionHelper(deleted_flag
))),
303 TestTimeouts::action_timeout()));
305 std::vector
<int> completion_sequence
= tracker()->WaitUntilTasksComplete(1);
306 ASSERT_EQ(1u, completion_sequence
.size());
307 ASSERT_EQ(1, completion_sequence
[0]);
310 // Shutdown is asynchronous, so use ResetPool() to block until the pool is
311 // fully destroyed (and thus shut down).
314 // Verify that we didn't block until the task was due.
315 ASSERT_LT(base::Time::Now() - posted_at
, TestTimeouts::action_timeout());
317 // Verify that the deferred task has not only not run, but has also been
319 ASSERT_TRUE(deleted_flag
->data
);
322 // Tests that same-named tokens have the same ID.
323 TEST_F(SequencedWorkerPoolTest
, NamedTokens
) {
324 const std::string
name1("hello");
325 SequencedWorkerPool::SequenceToken token1
=
326 pool()->GetNamedSequenceToken(name1
);
328 SequencedWorkerPool::SequenceToken token2
= pool()->GetSequenceToken();
330 const std::string
name3("goodbye");
331 SequencedWorkerPool::SequenceToken token3
=
332 pool()->GetNamedSequenceToken(name3
);
334 // All 3 tokens should be different.
335 EXPECT_FALSE(token1
.Equals(token2
));
336 EXPECT_FALSE(token1
.Equals(token3
));
337 EXPECT_FALSE(token2
.Equals(token3
));
339 // Requesting the same name again should give the same value.
340 SequencedWorkerPool::SequenceToken token1again
=
341 pool()->GetNamedSequenceToken(name1
);
342 EXPECT_TRUE(token1
.Equals(token1again
));
344 SequencedWorkerPool::SequenceToken token3again
=
345 pool()->GetNamedSequenceToken(name3
);
346 EXPECT_TRUE(token3
.Equals(token3again
));
349 // Tests that posting a bunch of tasks (many more than the number of worker
350 // threads) runs them all.
351 TEST_F(SequencedWorkerPoolTest
, LotsOfTasks
) {
352 pool()->PostWorkerTask(FROM_HERE
,
353 base::Bind(&TestTracker::SlowTask
, tracker(), 0));
355 const size_t kNumTasks
= 20;
356 for (size_t i
= 1; i
< kNumTasks
; i
++) {
357 pool()->PostWorkerTask(FROM_HERE
,
358 base::Bind(&TestTracker::FastTask
, tracker(), i
));
361 std::vector
<int> result
= tracker()->WaitUntilTasksComplete(kNumTasks
);
362 EXPECT_EQ(kNumTasks
, result
.size());
365 // Tests that posting a bunch of tasks (many more than the number of
366 // worker threads) to two pools simultaneously runs them all twice.
367 // This test is meant to shake out any concurrency issues between
368 // pools (like histograms).
369 TEST_F(SequencedWorkerPoolTest
, LotsOfTasksTwoPools
) {
370 SequencedWorkerPoolOwner
pool1(kNumWorkerThreads
, "test1");
371 SequencedWorkerPoolOwner
pool2(kNumWorkerThreads
, "test2");
373 base::Closure slow_task
= base::Bind(&TestTracker::SlowTask
, tracker(), 0);
374 pool1
.pool()->PostWorkerTask(FROM_HERE
, slow_task
);
375 pool2
.pool()->PostWorkerTask(FROM_HERE
, slow_task
);
377 const size_t kNumTasks
= 20;
378 for (size_t i
= 1; i
< kNumTasks
; i
++) {
379 base::Closure fast_task
=
380 base::Bind(&TestTracker::FastTask
, tracker(), i
);
381 pool1
.pool()->PostWorkerTask(FROM_HERE
, fast_task
);
382 pool2
.pool()->PostWorkerTask(FROM_HERE
, fast_task
);
385 std::vector
<int> result
=
386 tracker()->WaitUntilTasksComplete(2*kNumTasks
);
387 EXPECT_EQ(2 * kNumTasks
, result
.size());
389 pool2
.pool()->Shutdown();
390 pool1
.pool()->Shutdown();
393 // Test that tasks with the same sequence token are executed in order but don't
394 // affect other tasks.
395 TEST_F(SequencedWorkerPoolTest
, Sequence
) {
396 // Fill all the worker threads except one.
397 const size_t kNumBackgroundTasks
= kNumWorkerThreads
- 1;
398 ThreadBlocker background_blocker
;
399 for (size_t i
= 0; i
< kNumBackgroundTasks
; i
++) {
400 pool()->PostWorkerTask(FROM_HERE
,
401 base::Bind(&TestTracker::BlockTask
,
402 tracker(), i
, &background_blocker
));
404 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks
);
406 // Create two tasks with the same sequence token, one that will block on the
407 // event, and one which will just complete quickly when it's run. Since there
408 // is one worker thread free, the first task will start and then block, and
409 // the second task should be waiting.
410 ThreadBlocker blocker
;
411 SequencedWorkerPool::SequenceToken token1
= pool()->GetSequenceToken();
412 pool()->PostSequencedWorkerTask(
414 base::Bind(&TestTracker::BlockTask
, tracker(), 100, &blocker
));
415 pool()->PostSequencedWorkerTask(
417 base::Bind(&TestTracker::FastTask
, tracker(), 101));
418 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
420 // Create another two tasks as above with a different token. These will be
421 // blocked since there are no slots to run.
422 SequencedWorkerPool::SequenceToken token2
= pool()->GetSequenceToken();
423 pool()->PostSequencedWorkerTask(
425 base::Bind(&TestTracker::FastTask
, tracker(), 200));
426 pool()->PostSequencedWorkerTask(
428 base::Bind(&TestTracker::FastTask
, tracker(), 201));
429 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
431 // Let one background task complete. This should then let both tasks of
432 // token2 run to completion in order. The second task of token1 should still
434 background_blocker
.Unblock(1);
435 std::vector
<int> result
= tracker()->WaitUntilTasksComplete(3);
436 ASSERT_EQ(3u, result
.size());
437 EXPECT_EQ(200, result
[1]);
438 EXPECT_EQ(201, result
[2]);
440 // Finish the rest of the background tasks. This should leave some workers
441 // free with the second token1 task still blocked on the first.
442 background_blocker
.Unblock(kNumBackgroundTasks
- 1);
443 EXPECT_EQ(kNumBackgroundTasks
+ 2,
444 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks
+ 2).size());
446 // Allow the first task of token1 to complete. This should run the second.
448 result
= tracker()->WaitUntilTasksComplete(kNumBackgroundTasks
+ 4);
449 ASSERT_EQ(kNumBackgroundTasks
+ 4, result
.size());
450 EXPECT_EQ(100, result
[result
.size() - 2]);
451 EXPECT_EQ(101, result
[result
.size() - 1]);
454 // Tests that any tasks posted after Shutdown are ignored.
455 // Disabled for flakiness. See http://crbug.com/166451.
456 TEST_F(SequencedWorkerPoolTest
, DISABLED_IgnoresAfterShutdown
) {
457 // Start tasks to take all the threads and block them.
458 EnsureAllWorkersCreated();
459 ThreadBlocker blocker
;
460 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
461 pool()->PostWorkerTask(FROM_HERE
,
462 base::Bind(&TestTracker::BlockTask
,
463 tracker(), i
, &blocker
));
465 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
467 SetWillWaitForShutdownCallback(
468 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
469 scoped_refptr
<TestTracker
>(tracker()), 0,
470 &blocker
, kNumWorkerThreads
));
472 // Shutdown the worker pool. This should discard all non-blocking tasks.
473 const int kMaxNewBlockingTasksAfterShutdown
= 100;
474 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown
);
476 int old_has_work_call_count
= has_work_call_count();
478 std::vector
<int> result
=
479 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
);
481 // The kNumWorkerThread items should have completed, in no particular order.
482 ASSERT_EQ(kNumWorkerThreads
, result
.size());
483 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
484 EXPECT_TRUE(std::find(result
.begin(), result
.end(), static_cast<int>(i
)) !=
488 // No further tasks, regardless of shutdown mode, should be allowed.
489 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
491 base::Bind(&TestTracker::FastTask
, tracker(), 100),
492 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
493 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
495 base::Bind(&TestTracker::FastTask
, tracker(), 101),
496 SequencedWorkerPool::SKIP_ON_SHUTDOWN
));
497 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
499 base::Bind(&TestTracker::FastTask
, tracker(), 102),
500 SequencedWorkerPool::BLOCK_SHUTDOWN
));
502 ASSERT_EQ(old_has_work_call_count
, has_work_call_count());
505 TEST_F(SequencedWorkerPoolTest
, AllowsAfterShutdown
) {
506 // Test that <n> new blocking tasks are allowed provided they're posted
507 // by a running tasks.
508 EnsureAllWorkersCreated();
509 ThreadBlocker blocker
;
511 // Start tasks to take all the threads and block them.
512 const int kNumBlockTasks
= static_cast<int>(kNumWorkerThreads
);
513 for (int i
= 0; i
< kNumBlockTasks
; ++i
) {
514 EXPECT_TRUE(pool()->PostWorkerTask(
516 base::Bind(&TestTracker::BlockTask
, tracker(), i
, &blocker
)));
518 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
520 // Queue up shutdown blocking tasks behind those which will attempt to post
521 // additional tasks when run, PostAdditionalTasks attemtps to post 3
522 // new FastTasks, one for each shutdown_behavior.
523 const int kNumQueuedTasks
= static_cast<int>(kNumWorkerThreads
);
524 for (int i
= 0; i
< kNumQueuedTasks
; ++i
) {
525 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
527 base::Bind(&TestTracker::PostAdditionalTasks
, tracker(), i
, pool(),
529 SequencedWorkerPool::BLOCK_SHUTDOWN
));
532 // Setup to open the floodgates from within Shutdown().
533 SetWillWaitForShutdownCallback(
534 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
535 scoped_refptr
<TestTracker
>(tracker()),
536 0, &blocker
, kNumBlockTasks
));
538 // Allow half of the additional blocking tasks thru.
539 const int kNumNewBlockingTasksToAllow
= kNumWorkerThreads
/ 2;
540 pool()->Shutdown(kNumNewBlockingTasksToAllow
);
542 // Ensure that the correct number of tasks actually got run.
543 tracker()->WaitUntilTasksComplete(static_cast<size_t>(
544 kNumBlockTasks
+ kNumQueuedTasks
+ kNumNewBlockingTasksToAllow
));
546 // Clean up the task IDs we added and go home.
547 tracker()->ClearCompleteSequence();
550 // Tests that unrun tasks are discarded properly according to their shutdown
552 TEST_F(SequencedWorkerPoolTest
, DiscardOnShutdown
) {
553 // Start tasks to take all the threads and block them.
554 EnsureAllWorkersCreated();
555 ThreadBlocker blocker
;
556 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
557 pool()->PostWorkerTask(FROM_HERE
,
558 base::Bind(&TestTracker::BlockTask
,
559 tracker(), i
, &blocker
));
561 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
563 // Create some tasks with different shutdown modes.
564 pool()->PostWorkerTaskWithShutdownBehavior(
566 base::Bind(&TestTracker::FastTask
, tracker(), 100),
567 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
);
568 pool()->PostWorkerTaskWithShutdownBehavior(
570 base::Bind(&TestTracker::FastTask
, tracker(), 101),
571 SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
572 pool()->PostWorkerTaskWithShutdownBehavior(
574 base::Bind(&TestTracker::FastTask
, tracker(), 102),
575 SequencedWorkerPool::BLOCK_SHUTDOWN
);
577 // Shutdown the worker pool. This should discard all non-blocking tasks.
578 SetWillWaitForShutdownCallback(
579 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
580 scoped_refptr
<TestTracker
>(tracker()), 0,
581 &blocker
, kNumWorkerThreads
));
584 std::vector
<int> result
=
585 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
+ 1);
587 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
588 // one, in no particular order.
589 ASSERT_EQ(kNumWorkerThreads
+ 1, result
.size());
590 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
591 EXPECT_TRUE(std::find(result
.begin(), result
.end(), static_cast<int>(i
)) !=
594 EXPECT_TRUE(std::find(result
.begin(), result
.end(), 102) != result
.end());
597 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
598 TEST_F(SequencedWorkerPoolTest
, ContinueOnShutdown
) {
599 scoped_refptr
<TaskRunner
> runner(pool()->GetTaskRunnerWithShutdownBehavior(
600 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
601 scoped_refptr
<SequencedTaskRunner
> sequenced_runner(
602 pool()->GetSequencedTaskRunnerWithShutdownBehavior(
603 pool()->GetSequenceToken(),
604 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
605 EnsureAllWorkersCreated();
606 ThreadBlocker blocker
;
607 pool()->PostWorkerTaskWithShutdownBehavior(
609 base::Bind(&TestTracker::BlockTask
,
610 tracker(), 0, &blocker
),
611 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
);
614 base::Bind(&TestTracker::BlockTask
,
615 tracker(), 1, &blocker
));
616 sequenced_runner
->PostTask(
618 base::Bind(&TestTracker::BlockTask
,
619 tracker(), 2, &blocker
));
621 tracker()->WaitUntilTasksBlocked(3);
623 // This should not block. If this test hangs, it means it failed.
626 // The task should not have completed yet.
627 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
629 // Posting more tasks should fail.
630 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
631 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 0),
632 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
633 EXPECT_FALSE(runner
->PostTask(
634 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 0)));
635 EXPECT_FALSE(sequenced_runner
->PostTask(
636 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 0)));
638 // Continue the background thread and make sure the tasks can complete.
640 std::vector
<int> result
= tracker()->WaitUntilTasksComplete(3);
641 EXPECT_EQ(3u, result
.size());
644 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
645 // until they stop, but tasks not yet started do not.
646 TEST_F(SequencedWorkerPoolTest
, SkipOnShutdown
) {
647 // Start tasks to take all the threads and block them.
648 EnsureAllWorkersCreated();
649 ThreadBlocker blocker
;
651 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
652 // return until these tasks have completed.
653 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
654 pool()->PostWorkerTaskWithShutdownBehavior(
656 base::Bind(&TestTracker::BlockTask
, tracker(), i
, &blocker
),
657 SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
659 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
661 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
662 // executed once Shutdown() has been called.
663 pool()->PostWorkerTaskWithShutdownBehavior(
665 base::Bind(&TestTracker::BlockTask
,
666 tracker(), 0, &blocker
),
667 SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
669 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
670 // been started block shutdown.
671 SetWillWaitForShutdownCallback(
672 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
673 scoped_refptr
<TestTracker
>(tracker()), 0,
674 &blocker
, kNumWorkerThreads
));
676 // No tasks should have completed yet.
677 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
679 // This should not block. If this test hangs, it means it failed.
682 // Shutdown should not return until all of the tasks have completed.
683 std::vector
<int> result
=
684 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
);
686 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
687 // allowed to complete. No additional non-blocking tasks should have been
689 ASSERT_EQ(kNumWorkerThreads
, result
.size());
690 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
691 EXPECT_TRUE(std::find(result
.begin(), result
.end(), static_cast<int>(i
)) !=
696 // Ensure all worker threads are created, and then trigger a spurious
697 // work signal. This shouldn't cause any other work signals to be
698 // triggered. This is a regression test for http://crbug.com/117469.
699 TEST_F(SequencedWorkerPoolTest
, SpuriousWorkSignal
) {
700 EnsureAllWorkersCreated();
701 int old_has_work_call_count
= has_work_call_count();
702 pool()->SignalHasWorkForTesting();
703 // This is inherently racy, but can only produce false positives.
704 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
705 EXPECT_EQ(old_has_work_call_count
+ 1, has_work_call_count());
708 void IsRunningOnCurrentThreadTask(
709 SequencedWorkerPool::SequenceToken test_positive_token
,
710 SequencedWorkerPool::SequenceToken test_negative_token
,
711 SequencedWorkerPool
* pool
,
712 SequencedWorkerPool
* unused_pool
) {
713 EXPECT_TRUE(pool
->RunsTasksOnCurrentThread());
714 EXPECT_TRUE(pool
->IsRunningSequenceOnCurrentThread(test_positive_token
));
715 EXPECT_FALSE(pool
->IsRunningSequenceOnCurrentThread(test_negative_token
));
716 EXPECT_FALSE(unused_pool
->RunsTasksOnCurrentThread());
718 unused_pool
->IsRunningSequenceOnCurrentThread(test_positive_token
));
720 unused_pool
->IsRunningSequenceOnCurrentThread(test_negative_token
));
723 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
724 TEST_F(SequencedWorkerPoolTest
, IsRunningOnCurrentThread
) {
725 SequencedWorkerPool::SequenceToken token1
= pool()->GetSequenceToken();
726 SequencedWorkerPool::SequenceToken token2
= pool()->GetSequenceToken();
727 SequencedWorkerPool::SequenceToken unsequenced_token
;
729 scoped_refptr
<SequencedWorkerPool
> unused_pool
=
730 new SequencedWorkerPool(2, "unused_pool");
732 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
733 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1
));
734 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2
));
735 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token
));
736 EXPECT_FALSE(unused_pool
->RunsTasksOnCurrentThread());
737 EXPECT_FALSE(unused_pool
->IsRunningSequenceOnCurrentThread(token1
));
738 EXPECT_FALSE(unused_pool
->IsRunningSequenceOnCurrentThread(token2
));
740 unused_pool
->IsRunningSequenceOnCurrentThread(unsequenced_token
));
742 pool()->PostSequencedWorkerTask(
744 base::Bind(&IsRunningOnCurrentThreadTask
,
745 token1
, token2
, pool(), unused_pool
));
746 pool()->PostSequencedWorkerTask(
748 base::Bind(&IsRunningOnCurrentThreadTask
,
749 token2
, unsequenced_token
, pool(), unused_pool
));
750 pool()->PostWorkerTask(
752 base::Bind(&IsRunningOnCurrentThreadTask
,
753 unsequenced_token
, token1
, pool(), unused_pool
));
755 unused_pool
->Shutdown();
758 // Verify that FlushForTesting works as intended.
759 TEST_F(SequencedWorkerPoolTest
, FlushForTesting
) {
760 // Should be fine to call on a new instance.
761 pool()->FlushForTesting();
763 // Queue up a bunch of work, including a long delayed task and
764 // a task that produces additional tasks as an artifact.
765 pool()->PostDelayedWorkerTask(
767 base::Bind(&TestTracker::FastTask
, tracker(), 0),
768 TimeDelta::FromMinutes(5));
769 pool()->PostWorkerTask(FROM_HERE
,
770 base::Bind(&TestTracker::SlowTask
, tracker(), 0));
771 const size_t kNumFastTasks
= 20;
772 for (size_t i
= 0; i
< kNumFastTasks
; i
++) {
773 pool()->PostWorkerTask(FROM_HERE
,
774 base::Bind(&TestTracker::FastTask
, tracker(), 0));
776 pool()->PostWorkerTask(
778 base::Bind(&TestTracker::PostAdditionalTasks
, tracker(), 0, pool(),
781 // We expect all except the delayed task to have been run. We verify all
782 // closures have been deleted by looking at the refcount of the
784 EXPECT_FALSE(tracker()->HasOneRef());
785 pool()->FlushForTesting();
786 EXPECT_TRUE(tracker()->HasOneRef());
787 EXPECT_EQ(1 + kNumFastTasks
+ 1 + 3, tracker()->GetTasksCompletedCount());
789 // Should be fine to call on an idle instance with all threads created, and
790 // spamming the method shouldn't deadlock or confuse the class.
791 pool()->FlushForTesting();
792 pool()->FlushForTesting();
794 // Should be fine to call after shutdown too.
796 pool()->FlushForTesting();
799 TEST(SequencedWorkerPoolRefPtrTest
, ShutsDownCleanWithContinueOnShutdown
) {
801 scoped_refptr
<SequencedWorkerPool
> pool(new SequencedWorkerPool(3, "Pool"));
802 scoped_refptr
<SequencedTaskRunner
> task_runner
=
803 pool
->GetSequencedTaskRunnerWithShutdownBehavior(
804 pool
->GetSequenceToken(),
805 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
);
807 // Upon test exit, should shut down without hanging.
811 class SequencedWorkerPoolTaskRunnerTestDelegate
{
813 SequencedWorkerPoolTaskRunnerTestDelegate() {}
815 ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
817 void StartTaskRunner() {
819 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
822 scoped_refptr
<SequencedWorkerPool
> GetTaskRunner() {
823 return pool_owner_
->pool();
826 void StopTaskRunner() {
827 // Make sure all tasks are run before shutting down. Delayed tasks are
828 // not run, they're simply deleted.
829 pool_owner_
->pool()->FlushForTesting();
830 pool_owner_
->pool()->Shutdown();
831 // Don't reset |pool_owner_| here, as the test may still hold a
832 // reference to the pool.
836 MessageLoop message_loop_
;
837 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
840 INSTANTIATE_TYPED_TEST_CASE_P(
841 SequencedWorkerPool
, TaskRunnerTest
,
842 SequencedWorkerPoolTaskRunnerTestDelegate
);
844 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate
{
846 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
848 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
851 void StartTaskRunner() {
853 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
854 task_runner_
= pool_owner_
->pool()->GetTaskRunnerWithShutdownBehavior(
855 SequencedWorkerPool::BLOCK_SHUTDOWN
);
858 scoped_refptr
<TaskRunner
> GetTaskRunner() {
862 void StopTaskRunner() {
863 // Make sure all tasks are run before shutting down. Delayed tasks are
864 // not run, they're simply deleted.
865 pool_owner_
->pool()->FlushForTesting();
866 pool_owner_
->pool()->Shutdown();
867 // Don't reset |pool_owner_| here, as the test may still hold a
868 // reference to the pool.
872 MessageLoop message_loop_
;
873 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
874 scoped_refptr
<TaskRunner
> task_runner_
;
877 INSTANTIATE_TYPED_TEST_CASE_P(
878 SequencedWorkerPoolTaskRunner
, TaskRunnerTest
,
879 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate
);
881 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate
{
883 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
885 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
888 void StartTaskRunner() {
889 pool_owner_
.reset(new SequencedWorkerPoolOwner(
890 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
891 task_runner_
= pool_owner_
->pool()->GetSequencedTaskRunner(
892 pool_owner_
->pool()->GetSequenceToken());
895 scoped_refptr
<SequencedTaskRunner
> GetTaskRunner() {
899 void StopTaskRunner() {
900 // Make sure all tasks are run before shutting down. Delayed tasks are
901 // not run, they're simply deleted.
902 pool_owner_
->pool()->FlushForTesting();
903 pool_owner_
->pool()->Shutdown();
904 // Don't reset |pool_owner_| here, as the test may still hold a
905 // reference to the pool.
909 MessageLoop message_loop_
;
910 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
911 scoped_refptr
<SequencedTaskRunner
> task_runner_
;
914 INSTANTIATE_TYPED_TEST_CASE_P(
915 SequencedWorkerPoolSequencedTaskRunner
, TaskRunnerTest
,
916 SequencedWorkerPoolSequencedTaskRunnerTestDelegate
);
918 INSTANTIATE_TYPED_TEST_CASE_P(
919 SequencedWorkerPoolSequencedTaskRunner
, SequencedTaskRunnerTest
,
920 SequencedWorkerPoolSequencedTaskRunnerTestDelegate
);