Roll src/third_party/WebKit eac3800:0237a66 (svn 202606:202607)
[chromium-blink-merge.git] / base / threading / sequenced_worker_pool_unittest.cc
blobbf82b1103574549d32d14e41506c683ecc52ac7d
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/synchronization/condition_variable.h"
15 #include "base/synchronization/lock.h"
16 #include "base/test/sequenced_task_runner_test_template.h"
17 #include "base/test/sequenced_worker_pool_owner.h"
18 #include "base/test/task_runner_test_template.h"
19 #include "base/test/test_timeouts.h"
20 #include "base/threading/platform_thread.h"
21 #include "base/time/time.h"
22 #include "base/tracked_objects.h"
23 #include "testing/gtest/include/gtest/gtest.h"
25 namespace base {
27 // IMPORTANT NOTE:
29 // Many of these tests have failure modes where they'll hang forever. These
30 // tests should not be flaky, and hanging indicates a type of failure. Do not
31 // mark as flaky if they're hanging, it's likely an actual bug.
33 namespace {
35 const size_t kNumWorkerThreads = 3;
37 // Allows a number of threads to all be blocked on the same event, and
38 // provides a way to unblock a certain number of them.
39 class ThreadBlocker {
40 public:
41 ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
43 void Block() {
45 base::AutoLock lock(lock_);
46 while (unblock_counter_ == 0)
47 cond_var_.Wait();
48 unblock_counter_--;
50 cond_var_.Signal();
53 void Unblock(size_t count) {
55 base::AutoLock lock(lock_);
56 DCHECK_EQ(unblock_counter_, 0u);
57 unblock_counter_ = count;
59 cond_var_.Signal();
62 private:
63 base::Lock lock_;
64 base::ConditionVariable cond_var_;
66 size_t unblock_counter_;
69 class DestructionDeadlockChecker
70 : public base::RefCountedThreadSafe<DestructionDeadlockChecker> {
71 public:
72 DestructionDeadlockChecker(const scoped_refptr<SequencedWorkerPool>& pool)
73 : pool_(pool) {}
75 protected:
76 virtual ~DestructionDeadlockChecker() {
77 // This method should not deadlock.
78 pool_->RunsTasksOnCurrentThread();
81 private:
82 scoped_refptr<SequencedWorkerPool> pool_;
83 friend class base::RefCountedThreadSafe<DestructionDeadlockChecker>;
86 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
87 public:
88 TestTracker()
89 : lock_(),
90 cond_var_(&lock_),
91 started_events_(0) {
94 // Each of these tasks appends the argument to the complete sequence vector
95 // so calling code can see what order they finished in.
96 void FastTask(int id) {
97 SignalWorkerDone(id);
100 void SlowTask(int id) {
101 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
102 SignalWorkerDone(id);
105 void BlockTask(int id, ThreadBlocker* blocker) {
106 // Note that this task has started and signal anybody waiting for that
107 // to happen.
109 base::AutoLock lock(lock_);
110 started_events_++;
112 cond_var_.Signal();
114 blocker->Block();
115 SignalWorkerDone(id);
118 void PostAdditionalTasks(
119 int id, SequencedWorkerPool* pool,
120 bool expected_return_value) {
121 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
122 EXPECT_EQ(expected_return_value,
123 pool->PostWorkerTaskWithShutdownBehavior(
124 FROM_HERE, fast_task,
125 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
126 EXPECT_EQ(expected_return_value,
127 pool->PostWorkerTaskWithShutdownBehavior(
128 FROM_HERE, fast_task,
129 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
130 pool->PostWorkerTaskWithShutdownBehavior(
131 FROM_HERE, fast_task,
132 SequencedWorkerPool::BLOCK_SHUTDOWN);
133 SignalWorkerDone(id);
136 // This task posts itself back onto the SequencedWorkerPool before it
137 // finishes running. Each instance of the task maintains a strong reference
138 // to a DestructionDeadlockChecker. The DestructionDeadlockChecker is only
139 // destroyed when the task is destroyed without being run, which only happens
140 // during destruction of the SequencedWorkerPool.
141 void PostRepostingTask(
142 const scoped_refptr<SequencedWorkerPool>& pool,
143 const scoped_refptr<DestructionDeadlockChecker>& checker) {
144 Closure reposting_task =
145 base::Bind(&TestTracker::PostRepostingTask, this, pool, checker);
146 pool->PostWorkerTaskWithShutdownBehavior(
147 FROM_HERE, reposting_task, SequencedWorkerPool::SKIP_ON_SHUTDOWN);
150 // This task reposts itself back onto the SequencedWorkerPool before it
151 // finishes running.
152 void PostRepostingBlockingTask(
153 const scoped_refptr<SequencedWorkerPool>& pool,
154 const SequencedWorkerPool::SequenceToken& token) {
155 Closure reposting_task =
156 base::Bind(&TestTracker::PostRepostingBlockingTask, this, pool, token);
157 pool->PostSequencedWorkerTaskWithShutdownBehavior(token,
158 FROM_HERE, reposting_task, SequencedWorkerPool::BLOCK_SHUTDOWN);
161 void PostBlockingTaskThenUnblockThreads(
162 const scoped_refptr<SequencedWorkerPool>& pool,
163 ThreadBlocker* blocker,
164 size_t threads_to_wake) {
165 Closure arbitrary_task = base::Bind(&TestTracker::FastTask, this, 0);
166 pool->PostWorkerTaskWithShutdownBehavior(
167 FROM_HERE, arbitrary_task, SequencedWorkerPool::BLOCK_SHUTDOWN);
168 blocker->Unblock(threads_to_wake);
171 // Waits until the given number of tasks have started executing.
172 void WaitUntilTasksBlocked(size_t count) {
174 base::AutoLock lock(lock_);
175 while (started_events_ < count)
176 cond_var_.Wait();
178 cond_var_.Signal();
181 // Blocks the current thread until at least the given number of tasks are in
182 // the completed vector, and then returns a copy.
183 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
184 std::vector<int> ret;
186 base::AutoLock lock(lock_);
187 while (complete_sequence_.size() < num_tasks)
188 cond_var_.Wait();
189 ret = complete_sequence_;
191 cond_var_.Signal();
192 return ret;
195 size_t GetTasksCompletedCount() {
196 base::AutoLock lock(lock_);
197 return complete_sequence_.size();
200 void ClearCompleteSequence() {
201 base::AutoLock lock(lock_);
202 complete_sequence_.clear();
203 started_events_ = 0;
206 private:
207 friend class base::RefCountedThreadSafe<TestTracker>;
208 ~TestTracker() {}
210 void SignalWorkerDone(int id) {
212 base::AutoLock lock(lock_);
213 complete_sequence_.push_back(id);
215 cond_var_.Signal();
218 // Protects the complete_sequence.
219 base::Lock lock_;
221 base::ConditionVariable cond_var_;
223 // Protected by lock_.
224 std::vector<int> complete_sequence_;
226 // Counter of the number of "block" workers that have started.
227 size_t started_events_;
230 class SequencedWorkerPoolTest : public testing::Test {
231 public:
232 SequencedWorkerPoolTest()
233 : tracker_(new TestTracker) {
234 ResetPool();
237 void TearDown() override { pool()->Shutdown(); }
239 const scoped_refptr<SequencedWorkerPool>& pool() {
240 return pool_owner_->pool();
242 TestTracker* tracker() { return tracker_.get(); }
244 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
245 // down, and creates a new instance.
246 void ResetPool() {
247 pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
250 void SetWillWaitForShutdownCallback(const Closure& callback) {
251 pool_owner_->SetWillWaitForShutdownCallback(callback);
254 // Ensures that the given number of worker threads is created by adding
255 // tasks and waiting until they complete. Worker thread creation is
256 // serialized, can happen on background threads asynchronously, and doesn't
257 // happen any more at shutdown. This means that if a test posts a bunch of
258 // tasks and calls shutdown, fewer workers will be created than the test may
259 // expect.
261 // This function ensures that this condition can't happen so tests can make
262 // assumptions about the number of workers active. See the comment in
263 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
264 // details.
266 // It will post tasks to the queue with id -1. It also assumes this is the
267 // first thing called in a test since it will clear the complete_sequence_.
268 void EnsureAllWorkersCreated() {
269 // Create a bunch of threads, all waiting. This will cause that may
270 // workers to be created.
271 ThreadBlocker blocker;
272 for (size_t i = 0; i < kNumWorkerThreads; i++) {
273 pool()->PostWorkerTask(FROM_HERE,
274 base::Bind(&TestTracker::BlockTask,
275 tracker(), -1, &blocker));
277 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
279 // Now wake them up and wait until they're done.
280 blocker.Unblock(kNumWorkerThreads);
281 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
283 // Clean up the task IDs we added.
284 tracker()->ClearCompleteSequence();
287 int has_work_call_count() const {
288 return pool_owner_->has_work_call_count();
291 private:
292 MessageLoop message_loop_;
293 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
294 const scoped_refptr<TestTracker> tracker_;
297 // Checks that the given number of entries are in the tasks to complete of
298 // the given tracker, and then signals the given event the given number of
299 // times. This is used to wake up blocked background threads before blocking
300 // on shutdown.
301 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
302 size_t expected_tasks_to_complete,
303 ThreadBlocker* blocker,
304 size_t threads_to_awake) {
305 EXPECT_EQ(
306 expected_tasks_to_complete,
307 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
309 blocker->Unblock(threads_to_awake);
312 class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
313 public:
314 explicit DeletionHelper(
315 const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
316 : deleted_flag_(deleted_flag) {
319 private:
320 friend class base::RefCountedThreadSafe<DeletionHelper>;
321 virtual ~DeletionHelper() { deleted_flag_->data = true; }
323 const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
324 DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
327 void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
328 const scoped_refptr<DeletionHelper>& helper) {
329 ADD_FAILURE() << "Should never run";
332 // Tests that delayed tasks are deleted upon shutdown of the pool.
333 TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
334 // Post something to verify the pool is started up.
335 EXPECT_TRUE(pool()->PostTask(
336 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
338 scoped_refptr<base::RefCountedData<bool> > deleted_flag(
339 new base::RefCountedData<bool>(false));
341 base::Time posted_at(base::Time::Now());
342 // Post something that shouldn't run.
343 EXPECT_TRUE(pool()->PostDelayedTask(
344 FROM_HERE,
345 base::Bind(&HoldPoolReference,
346 pool(),
347 make_scoped_refptr(new DeletionHelper(deleted_flag))),
348 TestTimeouts::action_timeout()));
350 std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
351 ASSERT_EQ(1u, completion_sequence.size());
352 ASSERT_EQ(1, completion_sequence[0]);
354 pool()->Shutdown();
355 // Shutdown is asynchronous, so use ResetPool() to block until the pool is
356 // fully destroyed (and thus shut down).
357 ResetPool();
359 // Verify that we didn't block until the task was due.
360 ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
362 // Verify that the deferred task has not only not run, but has also been
363 // destroyed.
364 ASSERT_TRUE(deleted_flag->data);
367 // Tests that same-named tokens have the same ID.
368 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
369 const std::string name1("hello");
370 SequencedWorkerPool::SequenceToken token1 =
371 pool()->GetNamedSequenceToken(name1);
373 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
375 const std::string name3("goodbye");
376 SequencedWorkerPool::SequenceToken token3 =
377 pool()->GetNamedSequenceToken(name3);
379 // All 3 tokens should be different.
380 EXPECT_FALSE(token1.Equals(token2));
381 EXPECT_FALSE(token1.Equals(token3));
382 EXPECT_FALSE(token2.Equals(token3));
384 // Requesting the same name again should give the same value.
385 SequencedWorkerPool::SequenceToken token1again =
386 pool()->GetNamedSequenceToken(name1);
387 EXPECT_TRUE(token1.Equals(token1again));
389 SequencedWorkerPool::SequenceToken token3again =
390 pool()->GetNamedSequenceToken(name3);
391 EXPECT_TRUE(token3.Equals(token3again));
394 // Tests that posting a bunch of tasks (many more than the number of worker
395 // threads) runs them all.
396 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
397 pool()->PostWorkerTask(FROM_HERE,
398 base::Bind(&TestTracker::SlowTask, tracker(), 0));
400 const size_t kNumTasks = 20;
401 for (size_t i = 1; i < kNumTasks; i++) {
402 pool()->PostWorkerTask(FROM_HERE,
403 base::Bind(&TestTracker::FastTask, tracker(), i));
406 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
407 EXPECT_EQ(kNumTasks, result.size());
410 // Tests that posting a bunch of tasks (many more than the number of
411 // worker threads) to two pools simultaneously runs them all twice.
412 // This test is meant to shake out any concurrency issues between
413 // pools (like histograms).
414 TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
415 SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
416 SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
418 base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
419 pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
420 pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
422 const size_t kNumTasks = 20;
423 for (size_t i = 1; i < kNumTasks; i++) {
424 base::Closure fast_task =
425 base::Bind(&TestTracker::FastTask, tracker(), i);
426 pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
427 pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
430 std::vector<int> result =
431 tracker()->WaitUntilTasksComplete(2*kNumTasks);
432 EXPECT_EQ(2 * kNumTasks, result.size());
434 pool2.pool()->Shutdown();
435 pool1.pool()->Shutdown();
438 // Test that tasks with the same sequence token are executed in order but don't
439 // affect other tasks.
440 TEST_F(SequencedWorkerPoolTest, Sequence) {
441 // Fill all the worker threads except one.
442 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
443 ThreadBlocker background_blocker;
444 for (size_t i = 0; i < kNumBackgroundTasks; i++) {
445 pool()->PostWorkerTask(FROM_HERE,
446 base::Bind(&TestTracker::BlockTask,
447 tracker(), i, &background_blocker));
449 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
451 // Create two tasks with the same sequence token, one that will block on the
452 // event, and one which will just complete quickly when it's run. Since there
453 // is one worker thread free, the first task will start and then block, and
454 // the second task should be waiting.
455 ThreadBlocker blocker;
456 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
457 pool()->PostSequencedWorkerTask(
458 token1, FROM_HERE,
459 base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
460 pool()->PostSequencedWorkerTask(
461 token1, FROM_HERE,
462 base::Bind(&TestTracker::FastTask, tracker(), 101));
463 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
465 // Create another two tasks as above with a different token. These will be
466 // blocked since there are no slots to run.
467 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
468 pool()->PostSequencedWorkerTask(
469 token2, FROM_HERE,
470 base::Bind(&TestTracker::FastTask, tracker(), 200));
471 pool()->PostSequencedWorkerTask(
472 token2, FROM_HERE,
473 base::Bind(&TestTracker::FastTask, tracker(), 201));
474 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
476 // Let one background task complete. This should then let both tasks of
477 // token2 run to completion in order. The second task of token1 should still
478 // be blocked.
479 background_blocker.Unblock(1);
480 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
481 ASSERT_EQ(3u, result.size());
482 EXPECT_EQ(200, result[1]);
483 EXPECT_EQ(201, result[2]);
485 // Finish the rest of the background tasks. This should leave some workers
486 // free with the second token1 task still blocked on the first.
487 background_blocker.Unblock(kNumBackgroundTasks - 1);
488 EXPECT_EQ(kNumBackgroundTasks + 2,
489 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
491 // Allow the first task of token1 to complete. This should run the second.
492 blocker.Unblock(1);
493 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
494 ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
495 EXPECT_EQ(100, result[result.size() - 2]);
496 EXPECT_EQ(101, result[result.size() - 1]);
499 // Tests that any tasks posted after Shutdown are ignored.
500 // Disabled for flakiness. See http://crbug.com/166451.
501 TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
502 // Start tasks to take all the threads and block them.
503 EnsureAllWorkersCreated();
504 ThreadBlocker blocker;
505 for (size_t i = 0; i < kNumWorkerThreads; i++) {
506 pool()->PostWorkerTask(FROM_HERE,
507 base::Bind(&TestTracker::BlockTask,
508 tracker(), i, &blocker));
510 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
512 SetWillWaitForShutdownCallback(
513 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
514 scoped_refptr<TestTracker>(tracker()), 0,
515 &blocker, kNumWorkerThreads));
517 // Shutdown the worker pool. This should discard all non-blocking tasks.
518 const int kMaxNewBlockingTasksAfterShutdown = 100;
519 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
521 int old_has_work_call_count = has_work_call_count();
523 std::vector<int> result =
524 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
526 // The kNumWorkerThread items should have completed, in no particular order.
527 ASSERT_EQ(kNumWorkerThreads, result.size());
528 for (size_t i = 0; i < kNumWorkerThreads; i++) {
529 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
530 result.end());
533 // No further tasks, regardless of shutdown mode, should be allowed.
534 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
535 FROM_HERE,
536 base::Bind(&TestTracker::FastTask, tracker(), 100),
537 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
538 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
539 FROM_HERE,
540 base::Bind(&TestTracker::FastTask, tracker(), 101),
541 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
542 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
543 FROM_HERE,
544 base::Bind(&TestTracker::FastTask, tracker(), 102),
545 SequencedWorkerPool::BLOCK_SHUTDOWN));
547 ASSERT_EQ(old_has_work_call_count, has_work_call_count());
550 TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
551 // Test that <n> new blocking tasks are allowed provided they're posted
552 // by a running tasks.
553 EnsureAllWorkersCreated();
554 ThreadBlocker blocker;
556 // Start tasks to take all the threads and block them.
557 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
558 for (int i = 0; i < kNumBlockTasks; ++i) {
559 EXPECT_TRUE(pool()->PostWorkerTask(
560 FROM_HERE,
561 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
563 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
565 // Queue up shutdown blocking tasks behind those which will attempt to post
566 // additional tasks when run, PostAdditionalTasks attemtps to post 3
567 // new FastTasks, one for each shutdown_behavior.
568 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
569 for (int i = 0; i < kNumQueuedTasks; ++i) {
570 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
571 FROM_HERE,
572 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
573 false),
574 SequencedWorkerPool::BLOCK_SHUTDOWN));
577 // Setup to open the floodgates from within Shutdown().
578 SetWillWaitForShutdownCallback(
579 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
580 scoped_refptr<TestTracker>(tracker()),
581 0, &blocker, kNumBlockTasks));
583 // Allow half of the additional blocking tasks thru.
584 const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
585 pool()->Shutdown(kNumNewBlockingTasksToAllow);
587 // Ensure that the correct number of tasks actually got run.
588 tracker()->WaitUntilTasksComplete(static_cast<size_t>(
589 kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
591 // Clean up the task IDs we added and go home.
592 tracker()->ClearCompleteSequence();
595 // Tests that blocking tasks can still be posted during shutdown, as long as
596 // the task is not being posted within the context of a running task.
597 TEST_F(SequencedWorkerPoolTest,
598 AllowsBlockingTasksDuringShutdownOutsideOfRunningTask) {
599 EnsureAllWorkersCreated();
600 ThreadBlocker blocker;
602 // Start tasks to take all the threads and block them.
603 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
604 for (int i = 0; i < kNumBlockTasks; ++i) {
605 EXPECT_TRUE(pool()->PostWorkerTask(
606 FROM_HERE,
607 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
609 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
611 // Setup to open the floodgates from within Shutdown().
612 SetWillWaitForShutdownCallback(
613 base::Bind(&TestTracker::PostBlockingTaskThenUnblockThreads,
614 scoped_refptr<TestTracker>(tracker()), pool(), &blocker,
615 kNumWorkerThreads));
616 pool()->Shutdown(kNumWorkerThreads + 1);
618 // Ensure that the correct number of tasks actually got run.
619 tracker()->WaitUntilTasksComplete(static_cast<size_t>(kNumWorkerThreads + 1));
620 tracker()->ClearCompleteSequence();
623 // Tests that unrun tasks are discarded properly according to their shutdown
624 // mode.
625 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
626 // Start tasks to take all the threads and block them.
627 EnsureAllWorkersCreated();
628 ThreadBlocker blocker;
629 for (size_t i = 0; i < kNumWorkerThreads; i++) {
630 pool()->PostWorkerTask(FROM_HERE,
631 base::Bind(&TestTracker::BlockTask,
632 tracker(), i, &blocker));
634 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
636 // Create some tasks with different shutdown modes.
637 pool()->PostWorkerTaskWithShutdownBehavior(
638 FROM_HERE,
639 base::Bind(&TestTracker::FastTask, tracker(), 100),
640 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
641 pool()->PostWorkerTaskWithShutdownBehavior(
642 FROM_HERE,
643 base::Bind(&TestTracker::FastTask, tracker(), 101),
644 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
645 pool()->PostWorkerTaskWithShutdownBehavior(
646 FROM_HERE,
647 base::Bind(&TestTracker::FastTask, tracker(), 102),
648 SequencedWorkerPool::BLOCK_SHUTDOWN);
650 // Shutdown the worker pool. This should discard all non-blocking tasks.
651 SetWillWaitForShutdownCallback(
652 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
653 scoped_refptr<TestTracker>(tracker()), 0,
654 &blocker, kNumWorkerThreads));
655 pool()->Shutdown();
657 std::vector<int> result =
658 tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
660 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
661 // one, in no particular order.
662 ASSERT_EQ(kNumWorkerThreads + 1, result.size());
663 for (size_t i = 0; i < kNumWorkerThreads; i++) {
664 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
665 result.end());
667 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
670 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
671 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
672 scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
673 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
674 scoped_refptr<SequencedTaskRunner> sequenced_runner(
675 pool()->GetSequencedTaskRunnerWithShutdownBehavior(
676 pool()->GetSequenceToken(),
677 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
678 EnsureAllWorkersCreated();
679 ThreadBlocker blocker;
680 pool()->PostWorkerTaskWithShutdownBehavior(
681 FROM_HERE,
682 base::Bind(&TestTracker::BlockTask,
683 tracker(), 0, &blocker),
684 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
685 runner->PostTask(
686 FROM_HERE,
687 base::Bind(&TestTracker::BlockTask,
688 tracker(), 1, &blocker));
689 sequenced_runner->PostTask(
690 FROM_HERE,
691 base::Bind(&TestTracker::BlockTask,
692 tracker(), 2, &blocker));
694 tracker()->WaitUntilTasksBlocked(3);
696 // This should not block. If this test hangs, it means it failed.
697 pool()->Shutdown();
699 // The task should not have completed yet.
700 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
702 // Posting more tasks should fail.
703 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
704 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
705 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
706 EXPECT_FALSE(runner->PostTask(
707 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
708 EXPECT_FALSE(sequenced_runner->PostTask(
709 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
711 // Continue the background thread and make sure the tasks can complete.
712 blocker.Unblock(3);
713 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
714 EXPECT_EQ(3u, result.size());
717 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
718 // until they stop, but tasks not yet started do not.
719 TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
720 // Start tasks to take all the threads and block them.
721 EnsureAllWorkersCreated();
722 ThreadBlocker blocker;
724 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
725 // return until these tasks have completed.
726 for (size_t i = 0; i < kNumWorkerThreads; i++) {
727 pool()->PostWorkerTaskWithShutdownBehavior(
728 FROM_HERE,
729 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
730 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
732 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
734 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
735 // executed once Shutdown() has been called.
736 pool()->PostWorkerTaskWithShutdownBehavior(
737 FROM_HERE,
738 base::Bind(&TestTracker::BlockTask,
739 tracker(), 0, &blocker),
740 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
742 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
743 // been started block shutdown.
744 SetWillWaitForShutdownCallback(
745 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
746 scoped_refptr<TestTracker>(tracker()), 0,
747 &blocker, kNumWorkerThreads));
749 // No tasks should have completed yet.
750 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
752 // This should not block. If this test hangs, it means it failed.
753 pool()->Shutdown();
755 // Shutdown should not return until all of the tasks have completed.
756 std::vector<int> result =
757 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
759 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
760 // allowed to complete. No additional non-blocking tasks should have been
761 // started.
762 ASSERT_EQ(kNumWorkerThreads, result.size());
763 for (size_t i = 0; i < kNumWorkerThreads; i++) {
764 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
765 result.end());
769 // Ensure all worker threads are created, and then trigger a spurious
770 // work signal. This shouldn't cause any other work signals to be
771 // triggered. This is a regression test for http://crbug.com/117469.
772 TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
773 EnsureAllWorkersCreated();
774 int old_has_work_call_count = has_work_call_count();
775 pool()->SignalHasWorkForTesting();
776 // This is inherently racy, but can only produce false positives.
777 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
778 EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
781 void IsRunningOnCurrentThreadTask(
782 SequencedWorkerPool::SequenceToken test_positive_token,
783 SequencedWorkerPool::SequenceToken test_negative_token,
784 SequencedWorkerPool* pool,
785 SequencedWorkerPool* unused_pool) {
786 EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
787 EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
788 EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
789 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
790 EXPECT_FALSE(
791 unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
792 EXPECT_FALSE(
793 unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
796 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
797 TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
798 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
799 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
800 SequencedWorkerPool::SequenceToken unsequenced_token;
802 scoped_refptr<SequencedWorkerPool> unused_pool =
803 new SequencedWorkerPool(2, "unused_pool");
805 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
806 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
807 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
808 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
809 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
810 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
811 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
812 EXPECT_FALSE(
813 unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
815 pool()->PostSequencedWorkerTask(
816 token1, FROM_HERE,
817 base::Bind(&IsRunningOnCurrentThreadTask,
818 token1, token2, pool(), unused_pool));
819 pool()->PostSequencedWorkerTask(
820 token2, FROM_HERE,
821 base::Bind(&IsRunningOnCurrentThreadTask,
822 token2, unsequenced_token, pool(), unused_pool));
823 pool()->PostWorkerTask(
824 FROM_HERE,
825 base::Bind(&IsRunningOnCurrentThreadTask,
826 unsequenced_token, token1, pool(), unused_pool));
827 pool()->Shutdown();
828 unused_pool->Shutdown();
831 // Checks that tasks are destroyed in the right context during shutdown. If a
832 // task is destroyed while SequencedWorkerPool's global lock is held,
833 // SequencedWorkerPool might deadlock.
834 TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) {
835 for (int i = 0; i < 4; ++i) {
836 scoped_refptr<DestructionDeadlockChecker> checker(
837 new DestructionDeadlockChecker(pool()));
838 tracker()->PostRepostingTask(pool(), checker);
841 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
842 // which in turn should not deadlock in their destructors.
843 pool()->Shutdown();
846 // Similar to the test AvoidsDeadlockOnShutdown, but there are now also
847 // sequenced, blocking tasks in the queue during shutdown.
848 TEST_F(SequencedWorkerPoolTest,
849 AvoidsDeadlockOnShutdownWithSequencedBlockingTasks) {
850 const std::string sequence_token_name("name");
851 for (int i = 0; i < 4; ++i) {
852 scoped_refptr<DestructionDeadlockChecker> checker(
853 new DestructionDeadlockChecker(pool()));
854 tracker()->PostRepostingTask(pool(), checker);
856 SequencedWorkerPool::SequenceToken token1 =
857 pool()->GetNamedSequenceToken(sequence_token_name);
858 tracker()->PostRepostingBlockingTask(pool(), token1);
861 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
862 // which in turn should not deadlock in their destructors.
863 pool()->Shutdown();
866 // Verify that FlushForTesting works as intended.
867 TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
868 // Should be fine to call on a new instance.
869 pool()->FlushForTesting();
871 // Queue up a bunch of work, including a long delayed task and
872 // a task that produces additional tasks as an artifact.
873 pool()->PostDelayedWorkerTask(
874 FROM_HERE,
875 base::Bind(&TestTracker::FastTask, tracker(), 0),
876 TimeDelta::FromMinutes(5));
877 pool()->PostWorkerTask(FROM_HERE,
878 base::Bind(&TestTracker::SlowTask, tracker(), 0));
879 const size_t kNumFastTasks = 20;
880 for (size_t i = 0; i < kNumFastTasks; i++) {
881 pool()->PostWorkerTask(FROM_HERE,
882 base::Bind(&TestTracker::FastTask, tracker(), 0));
884 pool()->PostWorkerTask(
885 FROM_HERE,
886 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
887 true));
889 // We expect all except the delayed task to have been run. We verify all
890 // closures have been deleted by looking at the refcount of the
891 // tracker.
892 EXPECT_FALSE(tracker()->HasOneRef());
893 pool()->FlushForTesting();
894 EXPECT_TRUE(tracker()->HasOneRef());
895 EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
897 // Should be fine to call on an idle instance with all threads created, and
898 // spamming the method shouldn't deadlock or confuse the class.
899 pool()->FlushForTesting();
900 pool()->FlushForTesting();
902 // Should be fine to call after shutdown too.
903 pool()->Shutdown();
904 pool()->FlushForTesting();
907 TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
908 MessageLoop loop;
909 scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
910 scoped_refptr<SequencedTaskRunner> task_runner =
911 pool->GetSequencedTaskRunnerWithShutdownBehavior(
912 pool->GetSequenceToken(),
913 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
915 // Upon test exit, should shut down without hanging.
916 pool->Shutdown();
919 class SequencedWorkerPoolTaskRunnerTestDelegate {
920 public:
921 SequencedWorkerPoolTaskRunnerTestDelegate() {}
923 ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
925 void StartTaskRunner() {
926 pool_owner_.reset(
927 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
930 scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
931 return pool_owner_->pool();
934 void StopTaskRunner() {
935 // Make sure all tasks are run before shutting down. Delayed tasks are
936 // not run, they're simply deleted.
937 pool_owner_->pool()->FlushForTesting();
938 pool_owner_->pool()->Shutdown();
939 // Don't reset |pool_owner_| here, as the test may still hold a
940 // reference to the pool.
943 private:
944 MessageLoop message_loop_;
945 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
948 INSTANTIATE_TYPED_TEST_CASE_P(
949 SequencedWorkerPool, TaskRunnerTest,
950 SequencedWorkerPoolTaskRunnerTestDelegate);
951 INSTANTIATE_TYPED_TEST_CASE_P(SequencedWorkerPool, TaskRunnerAffinityTest,
952 SequencedWorkerPoolTaskRunnerTestDelegate);
954 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
955 public:
956 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
958 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
961 void StartTaskRunner() {
962 pool_owner_.reset(
963 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
964 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
965 SequencedWorkerPool::BLOCK_SHUTDOWN);
968 scoped_refptr<TaskRunner> GetTaskRunner() {
969 return task_runner_;
972 void StopTaskRunner() {
973 // Make sure all tasks are run before shutting down. Delayed tasks are
974 // not run, they're simply deleted.
975 pool_owner_->pool()->FlushForTesting();
976 pool_owner_->pool()->Shutdown();
977 // Don't reset |pool_owner_| here, as the test may still hold a
978 // reference to the pool.
981 private:
982 MessageLoop message_loop_;
983 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
984 scoped_refptr<TaskRunner> task_runner_;
987 INSTANTIATE_TYPED_TEST_CASE_P(
988 SequencedWorkerPoolTaskRunner, TaskRunnerTest,
989 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
990 INSTANTIATE_TYPED_TEST_CASE_P(
991 SequencedWorkerPoolTaskRunner, TaskRunnerAffinityTest,
992 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
994 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
995 public:
996 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
998 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
1001 void StartTaskRunner() {
1002 pool_owner_.reset(new SequencedWorkerPoolOwner(
1003 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
1004 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
1005 pool_owner_->pool()->GetSequenceToken());
1008 scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
1009 return task_runner_;
1012 void StopTaskRunner() {
1013 // Make sure all tasks are run before shutting down. Delayed tasks are
1014 // not run, they're simply deleted.
1015 pool_owner_->pool()->FlushForTesting();
1016 pool_owner_->pool()->Shutdown();
1017 // Don't reset |pool_owner_| here, as the test may still hold a
1018 // reference to the pool.
1021 private:
1022 MessageLoop message_loop_;
1023 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
1024 scoped_refptr<SequencedTaskRunner> task_runner_;
1027 INSTANTIATE_TYPED_TEST_CASE_P(
1028 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
1029 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
1030 INSTANTIATE_TYPED_TEST_CASE_P(
1031 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerAffinityTest,
1032 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
1034 INSTANTIATE_TYPED_TEST_CASE_P(
1035 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
1036 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
1037 INSTANTIATE_TYPED_TEST_CASE_P(
1038 SequencedWorkerPoolSequencedTaskRunner,
1039 SequencedTaskRunnerDelayedTest,
1040 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
1042 } // namespace
1044 } // namespace base