1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/synchronization/lock.h"
17 #include "base/test/sequenced_task_runner_test_template.h"
18 #include "base/test/sequenced_worker_pool_owner.h"
19 #include "base/test/task_runner_test_template.h"
20 #include "base/test/test_timeouts.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/time/time.h"
23 #include "base/tracked_objects.h"
24 #include "testing/gtest/include/gtest/gtest.h"
30 // Many of these tests have failure modes where they'll hang forever. These
31 // tests should not be flaky, and hanging indicates a type of failure. Do not
32 // mark as flaky if they're hanging, it's likely an actual bug.
36 const size_t kNumWorkerThreads
= 3;
38 // Allows a number of threads to all be blocked on the same event, and
39 // provides a way to unblock a certain number of them.
42 ThreadBlocker() : lock_(), cond_var_(&lock_
), unblock_counter_(0) {}
46 base::AutoLock
lock(lock_
);
47 while (unblock_counter_
== 0)
54 void Unblock(size_t count
) {
56 base::AutoLock
lock(lock_
);
57 DCHECK(unblock_counter_
== 0);
58 unblock_counter_
= count
;
65 base::ConditionVariable cond_var_
;
67 size_t unblock_counter_
;
70 class TestTracker
: public base::RefCountedThreadSafe
<TestTracker
> {
78 // Each of these tasks appends the argument to the complete sequence vector
79 // so calling code can see what order they finished in.
80 void FastTask(int id
) {
84 void SlowTask(int id
) {
85 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
89 void BlockTask(int id
, ThreadBlocker
* blocker
) {
90 // Note that this task has started and signal anybody waiting for that
93 base::AutoLock
lock(lock_
);
102 void PostAdditionalTasks(
103 int id
, SequencedWorkerPool
* pool
,
104 bool expected_return_value
) {
105 Closure fast_task
= base::Bind(&TestTracker::FastTask
, this, 100);
106 EXPECT_EQ(expected_return_value
,
107 pool
->PostWorkerTaskWithShutdownBehavior(
108 FROM_HERE
, fast_task
,
109 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
110 EXPECT_EQ(expected_return_value
,
111 pool
->PostWorkerTaskWithShutdownBehavior(
112 FROM_HERE
, fast_task
,
113 SequencedWorkerPool::SKIP_ON_SHUTDOWN
));
114 pool
->PostWorkerTaskWithShutdownBehavior(
115 FROM_HERE
, fast_task
,
116 SequencedWorkerPool::BLOCK_SHUTDOWN
);
117 SignalWorkerDone(id
);
120 // Waits until the given number of tasks have started executing.
121 void WaitUntilTasksBlocked(size_t count
) {
123 base::AutoLock
lock(lock_
);
124 while (started_events_
< count
)
130 // Blocks the current thread until at least the given number of tasks are in
131 // the completed vector, and then returns a copy.
132 std::vector
<int> WaitUntilTasksComplete(size_t num_tasks
) {
133 std::vector
<int> ret
;
135 base::AutoLock
lock(lock_
);
136 while (complete_sequence_
.size() < num_tasks
)
138 ret
= complete_sequence_
;
144 size_t GetTasksCompletedCount() {
145 base::AutoLock
lock(lock_
);
146 return complete_sequence_
.size();
149 void ClearCompleteSequence() {
150 base::AutoLock
lock(lock_
);
151 complete_sequence_
.clear();
156 friend class base::RefCountedThreadSafe
<TestTracker
>;
159 void SignalWorkerDone(int id
) {
161 base::AutoLock
lock(lock_
);
162 complete_sequence_
.push_back(id
);
167 // Protects the complete_sequence.
170 base::ConditionVariable cond_var_
;
172 // Protected by lock_.
173 std::vector
<int> complete_sequence_
;
175 // Counter of the number of "block" workers that have started.
176 size_t started_events_
;
179 class SequencedWorkerPoolTest
: public testing::Test
{
181 SequencedWorkerPoolTest()
182 : tracker_(new TestTracker
) {
186 void TearDown() override
{ pool()->Shutdown(); }
188 const scoped_refptr
<SequencedWorkerPool
>& pool() {
189 return pool_owner_
->pool();
191 TestTracker
* tracker() { return tracker_
.get(); }
193 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
194 // down, and creates a new instance.
196 pool_owner_
.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads
, "test"));
199 void SetWillWaitForShutdownCallback(const Closure
& callback
) {
200 pool_owner_
->SetWillWaitForShutdownCallback(callback
);
203 // Ensures that the given number of worker threads is created by adding
204 // tasks and waiting until they complete. Worker thread creation is
205 // serialized, can happen on background threads asynchronously, and doesn't
206 // happen any more at shutdown. This means that if a test posts a bunch of
207 // tasks and calls shutdown, fewer workers will be created than the test may
210 // This function ensures that this condition can't happen so tests can make
211 // assumptions about the number of workers active. See the comment in
212 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
215 // It will post tasks to the queue with id -1. It also assumes this is the
216 // first thing called in a test since it will clear the complete_sequence_.
217 void EnsureAllWorkersCreated() {
218 // Create a bunch of threads, all waiting. This will cause that may
219 // workers to be created.
220 ThreadBlocker blocker
;
221 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
222 pool()->PostWorkerTask(FROM_HERE
,
223 base::Bind(&TestTracker::BlockTask
,
224 tracker(), -1, &blocker
));
226 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
228 // Now wake them up and wait until they're done.
229 blocker
.Unblock(kNumWorkerThreads
);
230 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
);
232 // Clean up the task IDs we added.
233 tracker()->ClearCompleteSequence();
236 int has_work_call_count() const {
237 return pool_owner_
->has_work_call_count();
241 MessageLoop message_loop_
;
242 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
243 const scoped_refptr
<TestTracker
> tracker_
;
246 // Checks that the given number of entries are in the tasks to complete of
247 // the given tracker, and then signals the given event the given number of
248 // times. This is used to wakt up blocked background threads before blocking
250 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr
<TestTracker
> tracker
,
251 size_t expected_tasks_to_complete
,
252 ThreadBlocker
* blocker
,
253 size_t threads_to_awake
) {
255 expected_tasks_to_complete
,
256 tracker
->WaitUntilTasksComplete(expected_tasks_to_complete
).size());
258 blocker
->Unblock(threads_to_awake
);
261 class DeletionHelper
: public base::RefCountedThreadSafe
<DeletionHelper
> {
263 explicit DeletionHelper(
264 const scoped_refptr
<base::RefCountedData
<bool> >& deleted_flag
)
265 : deleted_flag_(deleted_flag
) {
269 friend class base::RefCountedThreadSafe
<DeletionHelper
>;
270 virtual ~DeletionHelper() { deleted_flag_
->data
= true; }
272 const scoped_refptr
<base::RefCountedData
<bool> > deleted_flag_
;
273 DISALLOW_COPY_AND_ASSIGN(DeletionHelper
);
276 void HoldPoolReference(const scoped_refptr
<base::SequencedWorkerPool
>& pool
,
277 const scoped_refptr
<DeletionHelper
>& helper
) {
278 ADD_FAILURE() << "Should never run";
281 // Tests that delayed tasks are deleted upon shutdown of the pool.
282 TEST_F(SequencedWorkerPoolTest
, DelayedTaskDuringShutdown
) {
283 // Post something to verify the pool is started up.
284 EXPECT_TRUE(pool()->PostTask(
285 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 1)));
287 scoped_refptr
<base::RefCountedData
<bool> > deleted_flag(
288 new base::RefCountedData
<bool>(false));
290 base::Time
posted_at(base::Time::Now());
291 // Post something that shouldn't run.
292 EXPECT_TRUE(pool()->PostDelayedTask(
294 base::Bind(&HoldPoolReference
,
296 make_scoped_refptr(new DeletionHelper(deleted_flag
))),
297 TestTimeouts::action_timeout()));
299 std::vector
<int> completion_sequence
= tracker()->WaitUntilTasksComplete(1);
300 ASSERT_EQ(1u, completion_sequence
.size());
301 ASSERT_EQ(1, completion_sequence
[0]);
304 // Shutdown is asynchronous, so use ResetPool() to block until the pool is
305 // fully destroyed (and thus shut down).
308 // Verify that we didn't block until the task was due.
309 ASSERT_LT(base::Time::Now() - posted_at
, TestTimeouts::action_timeout());
311 // Verify that the deferred task has not only not run, but has also been
313 ASSERT_TRUE(deleted_flag
->data
);
316 // Tests that same-named tokens have the same ID.
317 TEST_F(SequencedWorkerPoolTest
, NamedTokens
) {
318 const std::string
name1("hello");
319 SequencedWorkerPool::SequenceToken token1
=
320 pool()->GetNamedSequenceToken(name1
);
322 SequencedWorkerPool::SequenceToken token2
= pool()->GetSequenceToken();
324 const std::string
name3("goodbye");
325 SequencedWorkerPool::SequenceToken token3
=
326 pool()->GetNamedSequenceToken(name3
);
328 // All 3 tokens should be different.
329 EXPECT_FALSE(token1
.Equals(token2
));
330 EXPECT_FALSE(token1
.Equals(token3
));
331 EXPECT_FALSE(token2
.Equals(token3
));
333 // Requesting the same name again should give the same value.
334 SequencedWorkerPool::SequenceToken token1again
=
335 pool()->GetNamedSequenceToken(name1
);
336 EXPECT_TRUE(token1
.Equals(token1again
));
338 SequencedWorkerPool::SequenceToken token3again
=
339 pool()->GetNamedSequenceToken(name3
);
340 EXPECT_TRUE(token3
.Equals(token3again
));
343 // Tests that posting a bunch of tasks (many more than the number of worker
344 // threads) runs them all.
345 TEST_F(SequencedWorkerPoolTest
, LotsOfTasks
) {
346 pool()->PostWorkerTask(FROM_HERE
,
347 base::Bind(&TestTracker::SlowTask
, tracker(), 0));
349 const size_t kNumTasks
= 20;
350 for (size_t i
= 1; i
< kNumTasks
; i
++) {
351 pool()->PostWorkerTask(FROM_HERE
,
352 base::Bind(&TestTracker::FastTask
, tracker(), i
));
355 std::vector
<int> result
= tracker()->WaitUntilTasksComplete(kNumTasks
);
356 EXPECT_EQ(kNumTasks
, result
.size());
359 // Tests that posting a bunch of tasks (many more than the number of
360 // worker threads) to two pools simultaneously runs them all twice.
361 // This test is meant to shake out any concurrency issues between
362 // pools (like histograms).
363 TEST_F(SequencedWorkerPoolTest
, LotsOfTasksTwoPools
) {
364 SequencedWorkerPoolOwner
pool1(kNumWorkerThreads
, "test1");
365 SequencedWorkerPoolOwner
pool2(kNumWorkerThreads
, "test2");
367 base::Closure slow_task
= base::Bind(&TestTracker::SlowTask
, tracker(), 0);
368 pool1
.pool()->PostWorkerTask(FROM_HERE
, slow_task
);
369 pool2
.pool()->PostWorkerTask(FROM_HERE
, slow_task
);
371 const size_t kNumTasks
= 20;
372 for (size_t i
= 1; i
< kNumTasks
; i
++) {
373 base::Closure fast_task
=
374 base::Bind(&TestTracker::FastTask
, tracker(), i
);
375 pool1
.pool()->PostWorkerTask(FROM_HERE
, fast_task
);
376 pool2
.pool()->PostWorkerTask(FROM_HERE
, fast_task
);
379 std::vector
<int> result
=
380 tracker()->WaitUntilTasksComplete(2*kNumTasks
);
381 EXPECT_EQ(2 * kNumTasks
, result
.size());
383 pool2
.pool()->Shutdown();
384 pool1
.pool()->Shutdown();
387 // Test that tasks with the same sequence token are executed in order but don't
388 // affect other tasks.
389 TEST_F(SequencedWorkerPoolTest
, Sequence
) {
390 // Fill all the worker threads except one.
391 const size_t kNumBackgroundTasks
= kNumWorkerThreads
- 1;
392 ThreadBlocker background_blocker
;
393 for (size_t i
= 0; i
< kNumBackgroundTasks
; i
++) {
394 pool()->PostWorkerTask(FROM_HERE
,
395 base::Bind(&TestTracker::BlockTask
,
396 tracker(), i
, &background_blocker
));
398 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks
);
400 // Create two tasks with the same sequence token, one that will block on the
401 // event, and one which will just complete quickly when it's run. Since there
402 // is one worker thread free, the first task will start and then block, and
403 // the second task should be waiting.
404 ThreadBlocker blocker
;
405 SequencedWorkerPool::SequenceToken token1
= pool()->GetSequenceToken();
406 pool()->PostSequencedWorkerTask(
408 base::Bind(&TestTracker::BlockTask
, tracker(), 100, &blocker
));
409 pool()->PostSequencedWorkerTask(
411 base::Bind(&TestTracker::FastTask
, tracker(), 101));
412 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
414 // Create another two tasks as above with a different token. These will be
415 // blocked since there are no slots to run.
416 SequencedWorkerPool::SequenceToken token2
= pool()->GetSequenceToken();
417 pool()->PostSequencedWorkerTask(
419 base::Bind(&TestTracker::FastTask
, tracker(), 200));
420 pool()->PostSequencedWorkerTask(
422 base::Bind(&TestTracker::FastTask
, tracker(), 201));
423 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
425 // Let one background task complete. This should then let both tasks of
426 // token2 run to completion in order. The second task of token1 should still
428 background_blocker
.Unblock(1);
429 std::vector
<int> result
= tracker()->WaitUntilTasksComplete(3);
430 ASSERT_EQ(3u, result
.size());
431 EXPECT_EQ(200, result
[1]);
432 EXPECT_EQ(201, result
[2]);
434 // Finish the rest of the background tasks. This should leave some workers
435 // free with the second token1 task still blocked on the first.
436 background_blocker
.Unblock(kNumBackgroundTasks
- 1);
437 EXPECT_EQ(kNumBackgroundTasks
+ 2,
438 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks
+ 2).size());
440 // Allow the first task of token1 to complete. This should run the second.
442 result
= tracker()->WaitUntilTasksComplete(kNumBackgroundTasks
+ 4);
443 ASSERT_EQ(kNumBackgroundTasks
+ 4, result
.size());
444 EXPECT_EQ(100, result
[result
.size() - 2]);
445 EXPECT_EQ(101, result
[result
.size() - 1]);
448 // Tests that any tasks posted after Shutdown are ignored.
449 // Disabled for flakiness. See http://crbug.com/166451.
450 TEST_F(SequencedWorkerPoolTest
, DISABLED_IgnoresAfterShutdown
) {
451 // Start tasks to take all the threads and block them.
452 EnsureAllWorkersCreated();
453 ThreadBlocker blocker
;
454 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
455 pool()->PostWorkerTask(FROM_HERE
,
456 base::Bind(&TestTracker::BlockTask
,
457 tracker(), i
, &blocker
));
459 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
461 SetWillWaitForShutdownCallback(
462 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
463 scoped_refptr
<TestTracker
>(tracker()), 0,
464 &blocker
, kNumWorkerThreads
));
466 // Shutdown the worker pool. This should discard all non-blocking tasks.
467 const int kMaxNewBlockingTasksAfterShutdown
= 100;
468 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown
);
470 int old_has_work_call_count
= has_work_call_count();
472 std::vector
<int> result
=
473 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
);
475 // The kNumWorkerThread items should have completed, in no particular order.
476 ASSERT_EQ(kNumWorkerThreads
, result
.size());
477 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
478 EXPECT_TRUE(std::find(result
.begin(), result
.end(), static_cast<int>(i
)) !=
482 // No further tasks, regardless of shutdown mode, should be allowed.
483 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
485 base::Bind(&TestTracker::FastTask
, tracker(), 100),
486 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
487 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
489 base::Bind(&TestTracker::FastTask
, tracker(), 101),
490 SequencedWorkerPool::SKIP_ON_SHUTDOWN
));
491 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
493 base::Bind(&TestTracker::FastTask
, tracker(), 102),
494 SequencedWorkerPool::BLOCK_SHUTDOWN
));
496 ASSERT_EQ(old_has_work_call_count
, has_work_call_count());
499 TEST_F(SequencedWorkerPoolTest
, AllowsAfterShutdown
) {
500 // Test that <n> new blocking tasks are allowed provided they're posted
501 // by a running tasks.
502 EnsureAllWorkersCreated();
503 ThreadBlocker blocker
;
505 // Start tasks to take all the threads and block them.
506 const int kNumBlockTasks
= static_cast<int>(kNumWorkerThreads
);
507 for (int i
= 0; i
< kNumBlockTasks
; ++i
) {
508 EXPECT_TRUE(pool()->PostWorkerTask(
510 base::Bind(&TestTracker::BlockTask
, tracker(), i
, &blocker
)));
512 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
514 // Queue up shutdown blocking tasks behind those which will attempt to post
515 // additional tasks when run, PostAdditionalTasks attemtps to post 3
516 // new FastTasks, one for each shutdown_behavior.
517 const int kNumQueuedTasks
= static_cast<int>(kNumWorkerThreads
);
518 for (int i
= 0; i
< kNumQueuedTasks
; ++i
) {
519 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
521 base::Bind(&TestTracker::PostAdditionalTasks
, tracker(), i
, pool(),
523 SequencedWorkerPool::BLOCK_SHUTDOWN
));
526 // Setup to open the floodgates from within Shutdown().
527 SetWillWaitForShutdownCallback(
528 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
529 scoped_refptr
<TestTracker
>(tracker()),
530 0, &blocker
, kNumBlockTasks
));
532 // Allow half of the additional blocking tasks thru.
533 const int kNumNewBlockingTasksToAllow
= kNumWorkerThreads
/ 2;
534 pool()->Shutdown(kNumNewBlockingTasksToAllow
);
536 // Ensure that the correct number of tasks actually got run.
537 tracker()->WaitUntilTasksComplete(static_cast<size_t>(
538 kNumBlockTasks
+ kNumQueuedTasks
+ kNumNewBlockingTasksToAllow
));
540 // Clean up the task IDs we added and go home.
541 tracker()->ClearCompleteSequence();
544 // Tests that unrun tasks are discarded properly according to their shutdown
546 TEST_F(SequencedWorkerPoolTest
, DiscardOnShutdown
) {
547 // Start tasks to take all the threads and block them.
548 EnsureAllWorkersCreated();
549 ThreadBlocker blocker
;
550 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
551 pool()->PostWorkerTask(FROM_HERE
,
552 base::Bind(&TestTracker::BlockTask
,
553 tracker(), i
, &blocker
));
555 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
557 // Create some tasks with different shutdown modes.
558 pool()->PostWorkerTaskWithShutdownBehavior(
560 base::Bind(&TestTracker::FastTask
, tracker(), 100),
561 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
);
562 pool()->PostWorkerTaskWithShutdownBehavior(
564 base::Bind(&TestTracker::FastTask
, tracker(), 101),
565 SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
566 pool()->PostWorkerTaskWithShutdownBehavior(
568 base::Bind(&TestTracker::FastTask
, tracker(), 102),
569 SequencedWorkerPool::BLOCK_SHUTDOWN
);
571 // Shutdown the worker pool. This should discard all non-blocking tasks.
572 SetWillWaitForShutdownCallback(
573 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
574 scoped_refptr
<TestTracker
>(tracker()), 0,
575 &blocker
, kNumWorkerThreads
));
578 std::vector
<int> result
=
579 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
+ 1);
581 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
582 // one, in no particular order.
583 ASSERT_EQ(kNumWorkerThreads
+ 1, result
.size());
584 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
585 EXPECT_TRUE(std::find(result
.begin(), result
.end(), static_cast<int>(i
)) !=
588 EXPECT_TRUE(std::find(result
.begin(), result
.end(), 102) != result
.end());
591 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
592 TEST_F(SequencedWorkerPoolTest
, ContinueOnShutdown
) {
593 scoped_refptr
<TaskRunner
> runner(pool()->GetTaskRunnerWithShutdownBehavior(
594 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
595 scoped_refptr
<SequencedTaskRunner
> sequenced_runner(
596 pool()->GetSequencedTaskRunnerWithShutdownBehavior(
597 pool()->GetSequenceToken(),
598 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
599 EnsureAllWorkersCreated();
600 ThreadBlocker blocker
;
601 pool()->PostWorkerTaskWithShutdownBehavior(
603 base::Bind(&TestTracker::BlockTask
,
604 tracker(), 0, &blocker
),
605 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
);
608 base::Bind(&TestTracker::BlockTask
,
609 tracker(), 1, &blocker
));
610 sequenced_runner
->PostTask(
612 base::Bind(&TestTracker::BlockTask
,
613 tracker(), 2, &blocker
));
615 tracker()->WaitUntilTasksBlocked(3);
617 // This should not block. If this test hangs, it means it failed.
620 // The task should not have completed yet.
621 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
623 // Posting more tasks should fail.
624 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
625 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 0),
626 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
));
627 EXPECT_FALSE(runner
->PostTask(
628 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 0)));
629 EXPECT_FALSE(sequenced_runner
->PostTask(
630 FROM_HERE
, base::Bind(&TestTracker::FastTask
, tracker(), 0)));
632 // Continue the background thread and make sure the tasks can complete.
634 std::vector
<int> result
= tracker()->WaitUntilTasksComplete(3);
635 EXPECT_EQ(3u, result
.size());
638 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
639 // until they stop, but tasks not yet started do not.
640 TEST_F(SequencedWorkerPoolTest
, SkipOnShutdown
) {
641 // Start tasks to take all the threads and block them.
642 EnsureAllWorkersCreated();
643 ThreadBlocker blocker
;
645 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
646 // return until these tasks have completed.
647 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
648 pool()->PostWorkerTaskWithShutdownBehavior(
650 base::Bind(&TestTracker::BlockTask
, tracker(), i
, &blocker
),
651 SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
653 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads
);
655 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
656 // executed once Shutdown() has been called.
657 pool()->PostWorkerTaskWithShutdownBehavior(
659 base::Bind(&TestTracker::BlockTask
,
660 tracker(), 0, &blocker
),
661 SequencedWorkerPool::SKIP_ON_SHUTDOWN
);
663 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
664 // been started block shutdown.
665 SetWillWaitForShutdownCallback(
666 base::Bind(&EnsureTasksToCompleteCountAndUnblock
,
667 scoped_refptr
<TestTracker
>(tracker()), 0,
668 &blocker
, kNumWorkerThreads
));
670 // No tasks should have completed yet.
671 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
673 // This should not block. If this test hangs, it means it failed.
676 // Shutdown should not return until all of the tasks have completed.
677 std::vector
<int> result
=
678 tracker()->WaitUntilTasksComplete(kNumWorkerThreads
);
680 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
681 // allowed to complete. No additional non-blocking tasks should have been
683 ASSERT_EQ(kNumWorkerThreads
, result
.size());
684 for (size_t i
= 0; i
< kNumWorkerThreads
; i
++) {
685 EXPECT_TRUE(std::find(result
.begin(), result
.end(), static_cast<int>(i
)) !=
690 // Ensure all worker threads are created, and then trigger a spurious
691 // work signal. This shouldn't cause any other work signals to be
692 // triggered. This is a regression test for http://crbug.com/117469.
693 TEST_F(SequencedWorkerPoolTest
, SpuriousWorkSignal
) {
694 EnsureAllWorkersCreated();
695 int old_has_work_call_count
= has_work_call_count();
696 pool()->SignalHasWorkForTesting();
697 // This is inherently racy, but can only produce false positives.
698 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
699 EXPECT_EQ(old_has_work_call_count
+ 1, has_work_call_count());
702 void IsRunningOnCurrentThreadTask(
703 SequencedWorkerPool::SequenceToken test_positive_token
,
704 SequencedWorkerPool::SequenceToken test_negative_token
,
705 SequencedWorkerPool
* pool
,
706 SequencedWorkerPool
* unused_pool
) {
707 EXPECT_TRUE(pool
->RunsTasksOnCurrentThread());
708 EXPECT_TRUE(pool
->IsRunningSequenceOnCurrentThread(test_positive_token
));
709 EXPECT_FALSE(pool
->IsRunningSequenceOnCurrentThread(test_negative_token
));
710 EXPECT_FALSE(unused_pool
->RunsTasksOnCurrentThread());
712 unused_pool
->IsRunningSequenceOnCurrentThread(test_positive_token
));
714 unused_pool
->IsRunningSequenceOnCurrentThread(test_negative_token
));
717 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
718 TEST_F(SequencedWorkerPoolTest
, IsRunningOnCurrentThread
) {
719 SequencedWorkerPool::SequenceToken token1
= pool()->GetSequenceToken();
720 SequencedWorkerPool::SequenceToken token2
= pool()->GetSequenceToken();
721 SequencedWorkerPool::SequenceToken unsequenced_token
;
723 scoped_refptr
<SequencedWorkerPool
> unused_pool
=
724 new SequencedWorkerPool(2, "unused_pool");
726 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
727 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1
));
728 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2
));
729 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token
));
730 EXPECT_FALSE(unused_pool
->RunsTasksOnCurrentThread());
731 EXPECT_FALSE(unused_pool
->IsRunningSequenceOnCurrentThread(token1
));
732 EXPECT_FALSE(unused_pool
->IsRunningSequenceOnCurrentThread(token2
));
734 unused_pool
->IsRunningSequenceOnCurrentThread(unsequenced_token
));
736 pool()->PostSequencedWorkerTask(
738 base::Bind(&IsRunningOnCurrentThreadTask
,
739 token1
, token2
, pool(), unused_pool
));
740 pool()->PostSequencedWorkerTask(
742 base::Bind(&IsRunningOnCurrentThreadTask
,
743 token2
, unsequenced_token
, pool(), unused_pool
));
744 pool()->PostWorkerTask(
746 base::Bind(&IsRunningOnCurrentThreadTask
,
747 unsequenced_token
, token1
, pool(), unused_pool
));
749 unused_pool
->Shutdown();
752 // Verify that FlushForTesting works as intended.
753 TEST_F(SequencedWorkerPoolTest
, FlushForTesting
) {
754 // Should be fine to call on a new instance.
755 pool()->FlushForTesting();
757 // Queue up a bunch of work, including a long delayed task and
758 // a task that produces additional tasks as an artifact.
759 pool()->PostDelayedWorkerTask(
761 base::Bind(&TestTracker::FastTask
, tracker(), 0),
762 TimeDelta::FromMinutes(5));
763 pool()->PostWorkerTask(FROM_HERE
,
764 base::Bind(&TestTracker::SlowTask
, tracker(), 0));
765 const size_t kNumFastTasks
= 20;
766 for (size_t i
= 0; i
< kNumFastTasks
; i
++) {
767 pool()->PostWorkerTask(FROM_HERE
,
768 base::Bind(&TestTracker::FastTask
, tracker(), 0));
770 pool()->PostWorkerTask(
772 base::Bind(&TestTracker::PostAdditionalTasks
, tracker(), 0, pool(),
775 // We expect all except the delayed task to have been run. We verify all
776 // closures have been deleted by looking at the refcount of the
778 EXPECT_FALSE(tracker()->HasOneRef());
779 pool()->FlushForTesting();
780 EXPECT_TRUE(tracker()->HasOneRef());
781 EXPECT_EQ(1 + kNumFastTasks
+ 1 + 3, tracker()->GetTasksCompletedCount());
783 // Should be fine to call on an idle instance with all threads created, and
784 // spamming the method shouldn't deadlock or confuse the class.
785 pool()->FlushForTesting();
786 pool()->FlushForTesting();
788 // Should be fine to call after shutdown too.
790 pool()->FlushForTesting();
793 TEST(SequencedWorkerPoolRefPtrTest
, ShutsDownCleanWithContinueOnShutdown
) {
795 scoped_refptr
<SequencedWorkerPool
> pool(new SequencedWorkerPool(3, "Pool"));
796 scoped_refptr
<SequencedTaskRunner
> task_runner
=
797 pool
->GetSequencedTaskRunnerWithShutdownBehavior(
798 pool
->GetSequenceToken(),
799 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN
);
801 // Upon test exit, should shut down without hanging.
805 class SequencedWorkerPoolTaskRunnerTestDelegate
{
807 SequencedWorkerPoolTaskRunnerTestDelegate() {}
809 ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
811 void StartTaskRunner() {
813 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
816 scoped_refptr
<SequencedWorkerPool
> GetTaskRunner() {
817 return pool_owner_
->pool();
820 void StopTaskRunner() {
821 // Make sure all tasks are run before shutting down. Delayed tasks are
822 // not run, they're simply deleted.
823 pool_owner_
->pool()->FlushForTesting();
824 pool_owner_
->pool()->Shutdown();
825 // Don't reset |pool_owner_| here, as the test may still hold a
826 // reference to the pool.
830 MessageLoop message_loop_
;
831 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
834 INSTANTIATE_TYPED_TEST_CASE_P(
835 SequencedWorkerPool
, TaskRunnerTest
,
836 SequencedWorkerPoolTaskRunnerTestDelegate
);
838 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate
{
840 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
842 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
845 void StartTaskRunner() {
847 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
848 task_runner_
= pool_owner_
->pool()->GetTaskRunnerWithShutdownBehavior(
849 SequencedWorkerPool::BLOCK_SHUTDOWN
);
852 scoped_refptr
<TaskRunner
> GetTaskRunner() {
856 void StopTaskRunner() {
857 // Make sure all tasks are run before shutting down. Delayed tasks are
858 // not run, they're simply deleted.
859 pool_owner_
->pool()->FlushForTesting();
860 pool_owner_
->pool()->Shutdown();
861 // Don't reset |pool_owner_| here, as the test may still hold a
862 // reference to the pool.
866 MessageLoop message_loop_
;
867 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
868 scoped_refptr
<TaskRunner
> task_runner_
;
871 INSTANTIATE_TYPED_TEST_CASE_P(
872 SequencedWorkerPoolTaskRunner
, TaskRunnerTest
,
873 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate
);
875 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate
{
877 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
879 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
882 void StartTaskRunner() {
883 pool_owner_
.reset(new SequencedWorkerPoolOwner(
884 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
885 task_runner_
= pool_owner_
->pool()->GetSequencedTaskRunner(
886 pool_owner_
->pool()->GetSequenceToken());
889 scoped_refptr
<SequencedTaskRunner
> GetTaskRunner() {
893 void StopTaskRunner() {
894 // Make sure all tasks are run before shutting down. Delayed tasks are
895 // not run, they're simply deleted.
896 pool_owner_
->pool()->FlushForTesting();
897 pool_owner_
->pool()->Shutdown();
898 // Don't reset |pool_owner_| here, as the test may still hold a
899 // reference to the pool.
903 MessageLoop message_loop_
;
904 scoped_ptr
<SequencedWorkerPoolOwner
> pool_owner_
;
905 scoped_refptr
<SequencedTaskRunner
> task_runner_
;
908 INSTANTIATE_TYPED_TEST_CASE_P(
909 SequencedWorkerPoolSequencedTaskRunner
, TaskRunnerTest
,
910 SequencedWorkerPoolSequencedTaskRunnerTestDelegate
);
912 INSTANTIATE_TYPED_TEST_CASE_P(
913 SequencedWorkerPoolSequencedTaskRunner
, SequencedTaskRunnerTest
,
914 SequencedWorkerPoolSequencedTaskRunnerTestDelegate
);