Remove unused parameter.
[chromium-blink-merge.git] / base / threading / sequenced_worker_pool_unittest.cc
blobc12156e3c9774134b797e435b264fced4240170c
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/synchronization/lock.h"
17 #include "base/test/sequenced_task_runner_test_template.h"
18 #include "base/test/sequenced_worker_pool_owner.h"
19 #include "base/test/task_runner_test_template.h"
20 #include "base/test/test_timeouts.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/time/time.h"
23 #include "base/tracked_objects.h"
24 #include "testing/gtest/include/gtest/gtest.h"
26 namespace base {
28 // IMPORTANT NOTE:
30 // Many of these tests have failure modes where they'll hang forever. These
31 // tests should not be flaky, and hanging indicates a type of failure. Do not
32 // mark as flaky if they're hanging, it's likely an actual bug.
34 namespace {
36 const size_t kNumWorkerThreads = 3;
38 // Allows a number of threads to all be blocked on the same event, and
39 // provides a way to unblock a certain number of them.
40 class ThreadBlocker {
41 public:
42 ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
44 void Block() {
46 base::AutoLock lock(lock_);
47 while (unblock_counter_ == 0)
48 cond_var_.Wait();
49 unblock_counter_--;
51 cond_var_.Signal();
54 void Unblock(size_t count) {
56 base::AutoLock lock(lock_);
57 DCHECK_EQ(unblock_counter_, 0u);
58 unblock_counter_ = count;
60 cond_var_.Signal();
63 private:
64 base::Lock lock_;
65 base::ConditionVariable cond_var_;
67 size_t unblock_counter_;
70 class DestructionDeadlockChecker
71 : public base::RefCountedThreadSafe<DestructionDeadlockChecker> {
72 public:
73 DestructionDeadlockChecker(const scoped_refptr<SequencedWorkerPool>& pool)
74 : pool_(pool) {}
76 protected:
77 virtual ~DestructionDeadlockChecker() {
78 // This method should not deadlock.
79 pool_->RunsTasksOnCurrentThread();
82 private:
83 scoped_refptr<SequencedWorkerPool> pool_;
84 friend class base::RefCountedThreadSafe<DestructionDeadlockChecker>;
87 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
88 public:
89 TestTracker()
90 : lock_(),
91 cond_var_(&lock_),
92 started_events_(0) {
95 // Each of these tasks appends the argument to the complete sequence vector
96 // so calling code can see what order they finished in.
97 void FastTask(int id) {
98 SignalWorkerDone(id);
101 void SlowTask(int id) {
102 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
103 SignalWorkerDone(id);
106 void BlockTask(int id, ThreadBlocker* blocker) {
107 // Note that this task has started and signal anybody waiting for that
108 // to happen.
110 base::AutoLock lock(lock_);
111 started_events_++;
113 cond_var_.Signal();
115 blocker->Block();
116 SignalWorkerDone(id);
119 void PostAdditionalTasks(
120 int id, SequencedWorkerPool* pool,
121 bool expected_return_value) {
122 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
123 EXPECT_EQ(expected_return_value,
124 pool->PostWorkerTaskWithShutdownBehavior(
125 FROM_HERE, fast_task,
126 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
127 EXPECT_EQ(expected_return_value,
128 pool->PostWorkerTaskWithShutdownBehavior(
129 FROM_HERE, fast_task,
130 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
131 pool->PostWorkerTaskWithShutdownBehavior(
132 FROM_HERE, fast_task,
133 SequencedWorkerPool::BLOCK_SHUTDOWN);
134 SignalWorkerDone(id);
137 // This task posts itself back onto the SequencedWorkerPool before it
138 // finishes running. Each instance of the task maintains a strong reference
139 // to a DestructionDeadlockChecker. The DestructionDeadlockChecker is only
140 // destroyed when the task is destroyed without being run, which only happens
141 // during destruction of the SequencedWorkerPool.
142 void PostRepostingTask(
143 const scoped_refptr<SequencedWorkerPool>& pool,
144 const scoped_refptr<DestructionDeadlockChecker>& checker) {
145 Closure reposting_task =
146 base::Bind(&TestTracker::PostRepostingTask, this, pool, checker);
147 pool->PostWorkerTaskWithShutdownBehavior(
148 FROM_HERE, reposting_task, SequencedWorkerPool::SKIP_ON_SHUTDOWN);
151 // This task reposts itself back onto the SequencedWorkerPool before it
152 // finishes running.
153 void PostRepostingBlockingTask(
154 const scoped_refptr<SequencedWorkerPool>& pool,
155 const SequencedWorkerPool::SequenceToken& token) {
156 Closure reposting_task =
157 base::Bind(&TestTracker::PostRepostingBlockingTask, this, pool, token);
158 pool->PostSequencedWorkerTaskWithShutdownBehavior(token,
159 FROM_HERE, reposting_task, SequencedWorkerPool::BLOCK_SHUTDOWN);
162 void PostBlockingTaskThenUnblockThreads(
163 const scoped_refptr<SequencedWorkerPool>& pool,
164 ThreadBlocker* blocker,
165 size_t threads_to_wake) {
166 Closure arbitrary_task = base::Bind(&TestTracker::FastTask, this, 0);
167 pool->PostWorkerTaskWithShutdownBehavior(
168 FROM_HERE, arbitrary_task, SequencedWorkerPool::BLOCK_SHUTDOWN);
169 blocker->Unblock(threads_to_wake);
172 // Waits until the given number of tasks have started executing.
173 void WaitUntilTasksBlocked(size_t count) {
175 base::AutoLock lock(lock_);
176 while (started_events_ < count)
177 cond_var_.Wait();
179 cond_var_.Signal();
182 // Blocks the current thread until at least the given number of tasks are in
183 // the completed vector, and then returns a copy.
184 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
185 std::vector<int> ret;
187 base::AutoLock lock(lock_);
188 while (complete_sequence_.size() < num_tasks)
189 cond_var_.Wait();
190 ret = complete_sequence_;
192 cond_var_.Signal();
193 return ret;
196 size_t GetTasksCompletedCount() {
197 base::AutoLock lock(lock_);
198 return complete_sequence_.size();
201 void ClearCompleteSequence() {
202 base::AutoLock lock(lock_);
203 complete_sequence_.clear();
204 started_events_ = 0;
207 private:
208 friend class base::RefCountedThreadSafe<TestTracker>;
209 ~TestTracker() {}
211 void SignalWorkerDone(int id) {
213 base::AutoLock lock(lock_);
214 complete_sequence_.push_back(id);
216 cond_var_.Signal();
219 // Protects the complete_sequence.
220 base::Lock lock_;
222 base::ConditionVariable cond_var_;
224 // Protected by lock_.
225 std::vector<int> complete_sequence_;
227 // Counter of the number of "block" workers that have started.
228 size_t started_events_;
231 class SequencedWorkerPoolTest : public testing::Test {
232 public:
233 SequencedWorkerPoolTest()
234 : tracker_(new TestTracker) {
235 ResetPool();
238 void TearDown() override { pool()->Shutdown(); }
240 const scoped_refptr<SequencedWorkerPool>& pool() {
241 return pool_owner_->pool();
243 TestTracker* tracker() { return tracker_.get(); }
245 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
246 // down, and creates a new instance.
247 void ResetPool() {
248 pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
251 void SetWillWaitForShutdownCallback(const Closure& callback) {
252 pool_owner_->SetWillWaitForShutdownCallback(callback);
255 // Ensures that the given number of worker threads is created by adding
256 // tasks and waiting until they complete. Worker thread creation is
257 // serialized, can happen on background threads asynchronously, and doesn't
258 // happen any more at shutdown. This means that if a test posts a bunch of
259 // tasks and calls shutdown, fewer workers will be created than the test may
260 // expect.
262 // This function ensures that this condition can't happen so tests can make
263 // assumptions about the number of workers active. See the comment in
264 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
265 // details.
267 // It will post tasks to the queue with id -1. It also assumes this is the
268 // first thing called in a test since it will clear the complete_sequence_.
269 void EnsureAllWorkersCreated() {
270 // Create a bunch of threads, all waiting. This will cause that may
271 // workers to be created.
272 ThreadBlocker blocker;
273 for (size_t i = 0; i < kNumWorkerThreads; i++) {
274 pool()->PostWorkerTask(FROM_HERE,
275 base::Bind(&TestTracker::BlockTask,
276 tracker(), -1, &blocker));
278 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
280 // Now wake them up and wait until they're done.
281 blocker.Unblock(kNumWorkerThreads);
282 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
284 // Clean up the task IDs we added.
285 tracker()->ClearCompleteSequence();
288 int has_work_call_count() const {
289 return pool_owner_->has_work_call_count();
292 private:
293 MessageLoop message_loop_;
294 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
295 const scoped_refptr<TestTracker> tracker_;
298 // Checks that the given number of entries are in the tasks to complete of
299 // the given tracker, and then signals the given event the given number of
300 // times. This is used to wake up blocked background threads before blocking
301 // on shutdown.
302 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
303 size_t expected_tasks_to_complete,
304 ThreadBlocker* blocker,
305 size_t threads_to_awake) {
306 EXPECT_EQ(
307 expected_tasks_to_complete,
308 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
310 blocker->Unblock(threads_to_awake);
313 class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
314 public:
315 explicit DeletionHelper(
316 const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
317 : deleted_flag_(deleted_flag) {
320 private:
321 friend class base::RefCountedThreadSafe<DeletionHelper>;
322 virtual ~DeletionHelper() { deleted_flag_->data = true; }
324 const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
325 DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
328 void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
329 const scoped_refptr<DeletionHelper>& helper) {
330 ADD_FAILURE() << "Should never run";
333 // Tests that delayed tasks are deleted upon shutdown of the pool.
334 TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
335 // Post something to verify the pool is started up.
336 EXPECT_TRUE(pool()->PostTask(
337 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
339 scoped_refptr<base::RefCountedData<bool> > deleted_flag(
340 new base::RefCountedData<bool>(false));
342 base::Time posted_at(base::Time::Now());
343 // Post something that shouldn't run.
344 EXPECT_TRUE(pool()->PostDelayedTask(
345 FROM_HERE,
346 base::Bind(&HoldPoolReference,
347 pool(),
348 make_scoped_refptr(new DeletionHelper(deleted_flag))),
349 TestTimeouts::action_timeout()));
351 std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
352 ASSERT_EQ(1u, completion_sequence.size());
353 ASSERT_EQ(1, completion_sequence[0]);
355 pool()->Shutdown();
356 // Shutdown is asynchronous, so use ResetPool() to block until the pool is
357 // fully destroyed (and thus shut down).
358 ResetPool();
360 // Verify that we didn't block until the task was due.
361 ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
363 // Verify that the deferred task has not only not run, but has also been
364 // destroyed.
365 ASSERT_TRUE(deleted_flag->data);
368 // Tests that same-named tokens have the same ID.
369 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
370 const std::string name1("hello");
371 SequencedWorkerPool::SequenceToken token1 =
372 pool()->GetNamedSequenceToken(name1);
374 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
376 const std::string name3("goodbye");
377 SequencedWorkerPool::SequenceToken token3 =
378 pool()->GetNamedSequenceToken(name3);
380 // All 3 tokens should be different.
381 EXPECT_FALSE(token1.Equals(token2));
382 EXPECT_FALSE(token1.Equals(token3));
383 EXPECT_FALSE(token2.Equals(token3));
385 // Requesting the same name again should give the same value.
386 SequencedWorkerPool::SequenceToken token1again =
387 pool()->GetNamedSequenceToken(name1);
388 EXPECT_TRUE(token1.Equals(token1again));
390 SequencedWorkerPool::SequenceToken token3again =
391 pool()->GetNamedSequenceToken(name3);
392 EXPECT_TRUE(token3.Equals(token3again));
395 // Tests that posting a bunch of tasks (many more than the number of worker
396 // threads) runs them all.
397 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
398 pool()->PostWorkerTask(FROM_HERE,
399 base::Bind(&TestTracker::SlowTask, tracker(), 0));
401 const size_t kNumTasks = 20;
402 for (size_t i = 1; i < kNumTasks; i++) {
403 pool()->PostWorkerTask(FROM_HERE,
404 base::Bind(&TestTracker::FastTask, tracker(), i));
407 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
408 EXPECT_EQ(kNumTasks, result.size());
411 // Tests that posting a bunch of tasks (many more than the number of
412 // worker threads) to two pools simultaneously runs them all twice.
413 // This test is meant to shake out any concurrency issues between
414 // pools (like histograms).
415 TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
416 SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
417 SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
419 base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
420 pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
421 pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
423 const size_t kNumTasks = 20;
424 for (size_t i = 1; i < kNumTasks; i++) {
425 base::Closure fast_task =
426 base::Bind(&TestTracker::FastTask, tracker(), i);
427 pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
428 pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
431 std::vector<int> result =
432 tracker()->WaitUntilTasksComplete(2*kNumTasks);
433 EXPECT_EQ(2 * kNumTasks, result.size());
435 pool2.pool()->Shutdown();
436 pool1.pool()->Shutdown();
439 // Test that tasks with the same sequence token are executed in order but don't
440 // affect other tasks.
441 TEST_F(SequencedWorkerPoolTest, Sequence) {
442 // Fill all the worker threads except one.
443 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
444 ThreadBlocker background_blocker;
445 for (size_t i = 0; i < kNumBackgroundTasks; i++) {
446 pool()->PostWorkerTask(FROM_HERE,
447 base::Bind(&TestTracker::BlockTask,
448 tracker(), i, &background_blocker));
450 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
452 // Create two tasks with the same sequence token, one that will block on the
453 // event, and one which will just complete quickly when it's run. Since there
454 // is one worker thread free, the first task will start and then block, and
455 // the second task should be waiting.
456 ThreadBlocker blocker;
457 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
458 pool()->PostSequencedWorkerTask(
459 token1, FROM_HERE,
460 base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
461 pool()->PostSequencedWorkerTask(
462 token1, FROM_HERE,
463 base::Bind(&TestTracker::FastTask, tracker(), 101));
464 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
466 // Create another two tasks as above with a different token. These will be
467 // blocked since there are no slots to run.
468 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
469 pool()->PostSequencedWorkerTask(
470 token2, FROM_HERE,
471 base::Bind(&TestTracker::FastTask, tracker(), 200));
472 pool()->PostSequencedWorkerTask(
473 token2, FROM_HERE,
474 base::Bind(&TestTracker::FastTask, tracker(), 201));
475 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
477 // Let one background task complete. This should then let both tasks of
478 // token2 run to completion in order. The second task of token1 should still
479 // be blocked.
480 background_blocker.Unblock(1);
481 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
482 ASSERT_EQ(3u, result.size());
483 EXPECT_EQ(200, result[1]);
484 EXPECT_EQ(201, result[2]);
486 // Finish the rest of the background tasks. This should leave some workers
487 // free with the second token1 task still blocked on the first.
488 background_blocker.Unblock(kNumBackgroundTasks - 1);
489 EXPECT_EQ(kNumBackgroundTasks + 2,
490 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
492 // Allow the first task of token1 to complete. This should run the second.
493 blocker.Unblock(1);
494 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
495 ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
496 EXPECT_EQ(100, result[result.size() - 2]);
497 EXPECT_EQ(101, result[result.size() - 1]);
500 // Tests that any tasks posted after Shutdown are ignored.
501 // Disabled for flakiness. See http://crbug.com/166451.
502 TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
503 // Start tasks to take all the threads and block them.
504 EnsureAllWorkersCreated();
505 ThreadBlocker blocker;
506 for (size_t i = 0; i < kNumWorkerThreads; i++) {
507 pool()->PostWorkerTask(FROM_HERE,
508 base::Bind(&TestTracker::BlockTask,
509 tracker(), i, &blocker));
511 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
513 SetWillWaitForShutdownCallback(
514 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
515 scoped_refptr<TestTracker>(tracker()), 0,
516 &blocker, kNumWorkerThreads));
518 // Shutdown the worker pool. This should discard all non-blocking tasks.
519 const int kMaxNewBlockingTasksAfterShutdown = 100;
520 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
522 int old_has_work_call_count = has_work_call_count();
524 std::vector<int> result =
525 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
527 // The kNumWorkerThread items should have completed, in no particular order.
528 ASSERT_EQ(kNumWorkerThreads, result.size());
529 for (size_t i = 0; i < kNumWorkerThreads; i++) {
530 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
531 result.end());
534 // No further tasks, regardless of shutdown mode, should be allowed.
535 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
536 FROM_HERE,
537 base::Bind(&TestTracker::FastTask, tracker(), 100),
538 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
539 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
540 FROM_HERE,
541 base::Bind(&TestTracker::FastTask, tracker(), 101),
542 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
543 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
544 FROM_HERE,
545 base::Bind(&TestTracker::FastTask, tracker(), 102),
546 SequencedWorkerPool::BLOCK_SHUTDOWN));
548 ASSERT_EQ(old_has_work_call_count, has_work_call_count());
551 TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
552 // Test that <n> new blocking tasks are allowed provided they're posted
553 // by a running tasks.
554 EnsureAllWorkersCreated();
555 ThreadBlocker blocker;
557 // Start tasks to take all the threads and block them.
558 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
559 for (int i = 0; i < kNumBlockTasks; ++i) {
560 EXPECT_TRUE(pool()->PostWorkerTask(
561 FROM_HERE,
562 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
564 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
566 // Queue up shutdown blocking tasks behind those which will attempt to post
567 // additional tasks when run, PostAdditionalTasks attemtps to post 3
568 // new FastTasks, one for each shutdown_behavior.
569 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
570 for (int i = 0; i < kNumQueuedTasks; ++i) {
571 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
572 FROM_HERE,
573 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
574 false),
575 SequencedWorkerPool::BLOCK_SHUTDOWN));
578 // Setup to open the floodgates from within Shutdown().
579 SetWillWaitForShutdownCallback(
580 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
581 scoped_refptr<TestTracker>(tracker()),
582 0, &blocker, kNumBlockTasks));
584 // Allow half of the additional blocking tasks thru.
585 const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
586 pool()->Shutdown(kNumNewBlockingTasksToAllow);
588 // Ensure that the correct number of tasks actually got run.
589 tracker()->WaitUntilTasksComplete(static_cast<size_t>(
590 kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
592 // Clean up the task IDs we added and go home.
593 tracker()->ClearCompleteSequence();
596 // Tests that blocking tasks can still be posted during shutdown, as long as
597 // the task is not being posted within the context of a running task.
598 TEST_F(SequencedWorkerPoolTest,
599 AllowsBlockingTasksDuringShutdownOutsideOfRunningTask) {
600 EnsureAllWorkersCreated();
601 ThreadBlocker blocker;
603 // Start tasks to take all the threads and block them.
604 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
605 for (int i = 0; i < kNumBlockTasks; ++i) {
606 EXPECT_TRUE(pool()->PostWorkerTask(
607 FROM_HERE,
608 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
610 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
612 // Setup to open the floodgates from within Shutdown().
613 SetWillWaitForShutdownCallback(
614 base::Bind(&TestTracker::PostBlockingTaskThenUnblockThreads,
615 scoped_refptr<TestTracker>(tracker()), pool(), &blocker,
616 kNumWorkerThreads));
617 pool()->Shutdown(kNumWorkerThreads + 1);
619 // Ensure that the correct number of tasks actually got run.
620 tracker()->WaitUntilTasksComplete(static_cast<size_t>(kNumWorkerThreads + 1));
621 tracker()->ClearCompleteSequence();
624 // Tests that unrun tasks are discarded properly according to their shutdown
625 // mode.
626 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
627 // Start tasks to take all the threads and block them.
628 EnsureAllWorkersCreated();
629 ThreadBlocker blocker;
630 for (size_t i = 0; i < kNumWorkerThreads; i++) {
631 pool()->PostWorkerTask(FROM_HERE,
632 base::Bind(&TestTracker::BlockTask,
633 tracker(), i, &blocker));
635 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
637 // Create some tasks with different shutdown modes.
638 pool()->PostWorkerTaskWithShutdownBehavior(
639 FROM_HERE,
640 base::Bind(&TestTracker::FastTask, tracker(), 100),
641 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
642 pool()->PostWorkerTaskWithShutdownBehavior(
643 FROM_HERE,
644 base::Bind(&TestTracker::FastTask, tracker(), 101),
645 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
646 pool()->PostWorkerTaskWithShutdownBehavior(
647 FROM_HERE,
648 base::Bind(&TestTracker::FastTask, tracker(), 102),
649 SequencedWorkerPool::BLOCK_SHUTDOWN);
651 // Shutdown the worker pool. This should discard all non-blocking tasks.
652 SetWillWaitForShutdownCallback(
653 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
654 scoped_refptr<TestTracker>(tracker()), 0,
655 &blocker, kNumWorkerThreads));
656 pool()->Shutdown();
658 std::vector<int> result =
659 tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
661 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
662 // one, in no particular order.
663 ASSERT_EQ(kNumWorkerThreads + 1, result.size());
664 for (size_t i = 0; i < kNumWorkerThreads; i++) {
665 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
666 result.end());
668 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
671 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
672 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
673 scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
674 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
675 scoped_refptr<SequencedTaskRunner> sequenced_runner(
676 pool()->GetSequencedTaskRunnerWithShutdownBehavior(
677 pool()->GetSequenceToken(),
678 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
679 EnsureAllWorkersCreated();
680 ThreadBlocker blocker;
681 pool()->PostWorkerTaskWithShutdownBehavior(
682 FROM_HERE,
683 base::Bind(&TestTracker::BlockTask,
684 tracker(), 0, &blocker),
685 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
686 runner->PostTask(
687 FROM_HERE,
688 base::Bind(&TestTracker::BlockTask,
689 tracker(), 1, &blocker));
690 sequenced_runner->PostTask(
691 FROM_HERE,
692 base::Bind(&TestTracker::BlockTask,
693 tracker(), 2, &blocker));
695 tracker()->WaitUntilTasksBlocked(3);
697 // This should not block. If this test hangs, it means it failed.
698 pool()->Shutdown();
700 // The task should not have completed yet.
701 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
703 // Posting more tasks should fail.
704 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
705 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
706 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
707 EXPECT_FALSE(runner->PostTask(
708 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
709 EXPECT_FALSE(sequenced_runner->PostTask(
710 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
712 // Continue the background thread and make sure the tasks can complete.
713 blocker.Unblock(3);
714 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
715 EXPECT_EQ(3u, result.size());
718 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
719 // until they stop, but tasks not yet started do not.
720 TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
721 // Start tasks to take all the threads and block them.
722 EnsureAllWorkersCreated();
723 ThreadBlocker blocker;
725 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
726 // return until these tasks have completed.
727 for (size_t i = 0; i < kNumWorkerThreads; i++) {
728 pool()->PostWorkerTaskWithShutdownBehavior(
729 FROM_HERE,
730 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
731 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
733 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
735 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
736 // executed once Shutdown() has been called.
737 pool()->PostWorkerTaskWithShutdownBehavior(
738 FROM_HERE,
739 base::Bind(&TestTracker::BlockTask,
740 tracker(), 0, &blocker),
741 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
743 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
744 // been started block shutdown.
745 SetWillWaitForShutdownCallback(
746 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
747 scoped_refptr<TestTracker>(tracker()), 0,
748 &blocker, kNumWorkerThreads));
750 // No tasks should have completed yet.
751 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
753 // This should not block. If this test hangs, it means it failed.
754 pool()->Shutdown();
756 // Shutdown should not return until all of the tasks have completed.
757 std::vector<int> result =
758 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
760 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
761 // allowed to complete. No additional non-blocking tasks should have been
762 // started.
763 ASSERT_EQ(kNumWorkerThreads, result.size());
764 for (size_t i = 0; i < kNumWorkerThreads; i++) {
765 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
766 result.end());
770 // Ensure all worker threads are created, and then trigger a spurious
771 // work signal. This shouldn't cause any other work signals to be
772 // triggered. This is a regression test for http://crbug.com/117469.
773 TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
774 EnsureAllWorkersCreated();
775 int old_has_work_call_count = has_work_call_count();
776 pool()->SignalHasWorkForTesting();
777 // This is inherently racy, but can only produce false positives.
778 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
779 EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
782 void IsRunningOnCurrentThreadTask(
783 SequencedWorkerPool::SequenceToken test_positive_token,
784 SequencedWorkerPool::SequenceToken test_negative_token,
785 SequencedWorkerPool* pool,
786 SequencedWorkerPool* unused_pool) {
787 EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
788 EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
789 EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
790 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
791 EXPECT_FALSE(
792 unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
793 EXPECT_FALSE(
794 unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
797 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
798 TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
799 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
800 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
801 SequencedWorkerPool::SequenceToken unsequenced_token;
803 scoped_refptr<SequencedWorkerPool> unused_pool =
804 new SequencedWorkerPool(2, "unused_pool");
806 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
807 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
808 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
809 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
810 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
811 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
812 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
813 EXPECT_FALSE(
814 unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
816 pool()->PostSequencedWorkerTask(
817 token1, FROM_HERE,
818 base::Bind(&IsRunningOnCurrentThreadTask,
819 token1, token2, pool(), unused_pool));
820 pool()->PostSequencedWorkerTask(
821 token2, FROM_HERE,
822 base::Bind(&IsRunningOnCurrentThreadTask,
823 token2, unsequenced_token, pool(), unused_pool));
824 pool()->PostWorkerTask(
825 FROM_HERE,
826 base::Bind(&IsRunningOnCurrentThreadTask,
827 unsequenced_token, token1, pool(), unused_pool));
828 pool()->Shutdown();
829 unused_pool->Shutdown();
832 // Checks that tasks are destroyed in the right context during shutdown. If a
833 // task is destroyed while SequencedWorkerPool's global lock is held,
834 // SequencedWorkerPool might deadlock.
835 TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) {
836 for (int i = 0; i < 4; ++i) {
837 scoped_refptr<DestructionDeadlockChecker> checker(
838 new DestructionDeadlockChecker(pool()));
839 tracker()->PostRepostingTask(pool(), checker);
842 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
843 // which in turn should not deadlock in their destructors.
844 pool()->Shutdown();
847 // Similar to the test AvoidsDeadlockOnShutdown, but there are now also
848 // sequenced, blocking tasks in the queue during shutdown.
849 TEST_F(SequencedWorkerPoolTest,
850 AvoidsDeadlockOnShutdownWithSequencedBlockingTasks) {
851 const std::string sequence_token_name("name");
852 for (int i = 0; i < 4; ++i) {
853 scoped_refptr<DestructionDeadlockChecker> checker(
854 new DestructionDeadlockChecker(pool()));
855 tracker()->PostRepostingTask(pool(), checker);
857 SequencedWorkerPool::SequenceToken token1 =
858 pool()->GetNamedSequenceToken(sequence_token_name);
859 tracker()->PostRepostingBlockingTask(pool(), token1);
862 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
863 // which in turn should not deadlock in their destructors.
864 pool()->Shutdown();
867 // Verify that FlushForTesting works as intended.
868 TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
869 // Should be fine to call on a new instance.
870 pool()->FlushForTesting();
872 // Queue up a bunch of work, including a long delayed task and
873 // a task that produces additional tasks as an artifact.
874 pool()->PostDelayedWorkerTask(
875 FROM_HERE,
876 base::Bind(&TestTracker::FastTask, tracker(), 0),
877 TimeDelta::FromMinutes(5));
878 pool()->PostWorkerTask(FROM_HERE,
879 base::Bind(&TestTracker::SlowTask, tracker(), 0));
880 const size_t kNumFastTasks = 20;
881 for (size_t i = 0; i < kNumFastTasks; i++) {
882 pool()->PostWorkerTask(FROM_HERE,
883 base::Bind(&TestTracker::FastTask, tracker(), 0));
885 pool()->PostWorkerTask(
886 FROM_HERE,
887 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
888 true));
890 // We expect all except the delayed task to have been run. We verify all
891 // closures have been deleted by looking at the refcount of the
892 // tracker.
893 EXPECT_FALSE(tracker()->HasOneRef());
894 pool()->FlushForTesting();
895 EXPECT_TRUE(tracker()->HasOneRef());
896 EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
898 // Should be fine to call on an idle instance with all threads created, and
899 // spamming the method shouldn't deadlock or confuse the class.
900 pool()->FlushForTesting();
901 pool()->FlushForTesting();
903 // Should be fine to call after shutdown too.
904 pool()->Shutdown();
905 pool()->FlushForTesting();
908 TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
909 MessageLoop loop;
910 scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
911 scoped_refptr<SequencedTaskRunner> task_runner =
912 pool->GetSequencedTaskRunnerWithShutdownBehavior(
913 pool->GetSequenceToken(),
914 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
916 // Upon test exit, should shut down without hanging.
917 pool->Shutdown();
920 class SequencedWorkerPoolTaskRunnerTestDelegate {
921 public:
922 SequencedWorkerPoolTaskRunnerTestDelegate() {}
924 ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
926 void StartTaskRunner() {
927 pool_owner_.reset(
928 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
931 scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
932 return pool_owner_->pool();
935 void StopTaskRunner() {
936 // Make sure all tasks are run before shutting down. Delayed tasks are
937 // not run, they're simply deleted.
938 pool_owner_->pool()->FlushForTesting();
939 pool_owner_->pool()->Shutdown();
940 // Don't reset |pool_owner_| here, as the test may still hold a
941 // reference to the pool.
944 private:
945 MessageLoop message_loop_;
946 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
949 INSTANTIATE_TYPED_TEST_CASE_P(
950 SequencedWorkerPool, TaskRunnerTest,
951 SequencedWorkerPoolTaskRunnerTestDelegate);
953 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
954 public:
955 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
957 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
960 void StartTaskRunner() {
961 pool_owner_.reset(
962 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
963 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
964 SequencedWorkerPool::BLOCK_SHUTDOWN);
967 scoped_refptr<TaskRunner> GetTaskRunner() {
968 return task_runner_;
971 void StopTaskRunner() {
972 // Make sure all tasks are run before shutting down. Delayed tasks are
973 // not run, they're simply deleted.
974 pool_owner_->pool()->FlushForTesting();
975 pool_owner_->pool()->Shutdown();
976 // Don't reset |pool_owner_| here, as the test may still hold a
977 // reference to the pool.
980 private:
981 MessageLoop message_loop_;
982 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
983 scoped_refptr<TaskRunner> task_runner_;
986 INSTANTIATE_TYPED_TEST_CASE_P(
987 SequencedWorkerPoolTaskRunner, TaskRunnerTest,
988 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
990 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
991 public:
992 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
994 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
997 void StartTaskRunner() {
998 pool_owner_.reset(new SequencedWorkerPoolOwner(
999 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
1000 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
1001 pool_owner_->pool()->GetSequenceToken());
1004 scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
1005 return task_runner_;
1008 void StopTaskRunner() {
1009 // Make sure all tasks are run before shutting down. Delayed tasks are
1010 // not run, they're simply deleted.
1011 pool_owner_->pool()->FlushForTesting();
1012 pool_owner_->pool()->Shutdown();
1013 // Don't reset |pool_owner_| here, as the test may still hold a
1014 // reference to the pool.
1017 private:
1018 MessageLoop message_loop_;
1019 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
1020 scoped_refptr<SequencedTaskRunner> task_runner_;
1023 INSTANTIATE_TYPED_TEST_CASE_P(
1024 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
1025 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
1027 INSTANTIATE_TYPED_TEST_CASE_P(
1028 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
1029 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
1031 } // namespace
1033 } // namespace base