Port Android relocation packer to chromium build
[chromium-blink-merge.git] / base / threading / sequenced_worker_pool_unittest.cc
blob5d0880c236fc2cc4b80674006c9e8e7027f9540e
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/synchronization/lock.h"
17 #include "base/test/sequenced_task_runner_test_template.h"
18 #include "base/test/sequenced_worker_pool_owner.h"
19 #include "base/test/task_runner_test_template.h"
20 #include "base/test/test_timeouts.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/time/time.h"
23 #include "base/tracked_objects.h"
24 #include "testing/gtest/include/gtest/gtest.h"
26 namespace base {
28 // IMPORTANT NOTE:
30 // Many of these tests have failure modes where they'll hang forever. These
31 // tests should not be flaky, and hanging indicates a type of failure. Do not
32 // mark as flaky if they're hanging, it's likely an actual bug.
34 namespace {
36 const size_t kNumWorkerThreads = 3;
38 // Allows a number of threads to all be blocked on the same event, and
39 // provides a way to unblock a certain number of them.
40 class ThreadBlocker {
41 public:
42 ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
44 void Block() {
46 base::AutoLock lock(lock_);
47 while (unblock_counter_ == 0)
48 cond_var_.Wait();
49 unblock_counter_--;
51 cond_var_.Signal();
54 void Unblock(size_t count) {
56 base::AutoLock lock(lock_);
57 DCHECK_EQ(unblock_counter_, 0u);
58 unblock_counter_ = count;
60 cond_var_.Signal();
63 private:
64 base::Lock lock_;
65 base::ConditionVariable cond_var_;
67 size_t unblock_counter_;
70 class DestructionDeadlockChecker
71 : public base::RefCountedThreadSafe<DestructionDeadlockChecker> {
72 public:
73 DestructionDeadlockChecker(const scoped_refptr<SequencedWorkerPool>& pool)
74 : pool_(pool) {}
76 protected:
77 virtual ~DestructionDeadlockChecker() {
78 // This method should not deadlock.
79 pool_->RunsTasksOnCurrentThread();
82 private:
83 scoped_refptr<SequencedWorkerPool> pool_;
84 friend class base::RefCountedThreadSafe<DestructionDeadlockChecker>;
87 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
88 public:
89 TestTracker()
90 : lock_(),
91 cond_var_(&lock_),
92 started_events_(0) {
95 // Each of these tasks appends the argument to the complete sequence vector
96 // so calling code can see what order they finished in.
97 void FastTask(int id) {
98 SignalWorkerDone(id);
101 void SlowTask(int id) {
102 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
103 SignalWorkerDone(id);
106 void BlockTask(int id, ThreadBlocker* blocker) {
107 // Note that this task has started and signal anybody waiting for that
108 // to happen.
110 base::AutoLock lock(lock_);
111 started_events_++;
113 cond_var_.Signal();
115 blocker->Block();
116 SignalWorkerDone(id);
119 void PostAdditionalTasks(
120 int id, SequencedWorkerPool* pool,
121 bool expected_return_value) {
122 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
123 EXPECT_EQ(expected_return_value,
124 pool->PostWorkerTaskWithShutdownBehavior(
125 FROM_HERE, fast_task,
126 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
127 EXPECT_EQ(expected_return_value,
128 pool->PostWorkerTaskWithShutdownBehavior(
129 FROM_HERE, fast_task,
130 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
131 pool->PostWorkerTaskWithShutdownBehavior(
132 FROM_HERE, fast_task,
133 SequencedWorkerPool::BLOCK_SHUTDOWN);
134 SignalWorkerDone(id);
137 // This task posts itself back onto the SequencedWorkerPool before it
138 // finishes running. Each instance of the task maintains a strong reference
139 // to a DestructionDeadlockChecker. The DestructionDeadlockChecker is only
140 // destroyed when the task is destroyed without being run, which only happens
141 // during destruction of the SequencedWorkerPool.
142 void PostRepostingTask(
143 const scoped_refptr<SequencedWorkerPool>& pool,
144 const scoped_refptr<DestructionDeadlockChecker>& checker) {
145 Closure reposting_task =
146 base::Bind(&TestTracker::PostRepostingTask, this, pool, checker);
147 pool->PostWorkerTaskWithShutdownBehavior(
148 FROM_HERE, reposting_task, SequencedWorkerPool::SKIP_ON_SHUTDOWN);
151 // This task reposts itself back onto the SequencedWorkerPool before it
152 // finishes running.
153 void PostRepostingBlockingTask(
154 const scoped_refptr<SequencedWorkerPool>& pool,
155 const SequencedWorkerPool::SequenceToken& token) {
156 Closure reposting_task =
157 base::Bind(&TestTracker::PostRepostingBlockingTask, this, pool, token);
158 pool->PostSequencedWorkerTaskWithShutdownBehavior(token,
159 FROM_HERE, reposting_task, SequencedWorkerPool::BLOCK_SHUTDOWN);
162 // Waits until the given number of tasks have started executing.
163 void WaitUntilTasksBlocked(size_t count) {
165 base::AutoLock lock(lock_);
166 while (started_events_ < count)
167 cond_var_.Wait();
169 cond_var_.Signal();
172 // Blocks the current thread until at least the given number of tasks are in
173 // the completed vector, and then returns a copy.
174 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
175 std::vector<int> ret;
177 base::AutoLock lock(lock_);
178 while (complete_sequence_.size() < num_tasks)
179 cond_var_.Wait();
180 ret = complete_sequence_;
182 cond_var_.Signal();
183 return ret;
186 size_t GetTasksCompletedCount() {
187 base::AutoLock lock(lock_);
188 return complete_sequence_.size();
191 void ClearCompleteSequence() {
192 base::AutoLock lock(lock_);
193 complete_sequence_.clear();
194 started_events_ = 0;
197 private:
198 friend class base::RefCountedThreadSafe<TestTracker>;
199 ~TestTracker() {}
201 void SignalWorkerDone(int id) {
203 base::AutoLock lock(lock_);
204 complete_sequence_.push_back(id);
206 cond_var_.Signal();
209 // Protects the complete_sequence.
210 base::Lock lock_;
212 base::ConditionVariable cond_var_;
214 // Protected by lock_.
215 std::vector<int> complete_sequence_;
217 // Counter of the number of "block" workers that have started.
218 size_t started_events_;
221 class SequencedWorkerPoolTest : public testing::Test {
222 public:
223 SequencedWorkerPoolTest()
224 : tracker_(new TestTracker) {
225 ResetPool();
228 void TearDown() override { pool()->Shutdown(); }
230 const scoped_refptr<SequencedWorkerPool>& pool() {
231 return pool_owner_->pool();
233 TestTracker* tracker() { return tracker_.get(); }
235 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
236 // down, and creates a new instance.
237 void ResetPool() {
238 pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
241 void SetWillWaitForShutdownCallback(const Closure& callback) {
242 pool_owner_->SetWillWaitForShutdownCallback(callback);
245 // Ensures that the given number of worker threads is created by adding
246 // tasks and waiting until they complete. Worker thread creation is
247 // serialized, can happen on background threads asynchronously, and doesn't
248 // happen any more at shutdown. This means that if a test posts a bunch of
249 // tasks and calls shutdown, fewer workers will be created than the test may
250 // expect.
252 // This function ensures that this condition can't happen so tests can make
253 // assumptions about the number of workers active. See the comment in
254 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
255 // details.
257 // It will post tasks to the queue with id -1. It also assumes this is the
258 // first thing called in a test since it will clear the complete_sequence_.
259 void EnsureAllWorkersCreated() {
260 // Create a bunch of threads, all waiting. This will cause that may
261 // workers to be created.
262 ThreadBlocker blocker;
263 for (size_t i = 0; i < kNumWorkerThreads; i++) {
264 pool()->PostWorkerTask(FROM_HERE,
265 base::Bind(&TestTracker::BlockTask,
266 tracker(), -1, &blocker));
268 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
270 // Now wake them up and wait until they're done.
271 blocker.Unblock(kNumWorkerThreads);
272 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
274 // Clean up the task IDs we added.
275 tracker()->ClearCompleteSequence();
278 int has_work_call_count() const {
279 return pool_owner_->has_work_call_count();
282 private:
283 MessageLoop message_loop_;
284 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
285 const scoped_refptr<TestTracker> tracker_;
288 // Checks that the given number of entries are in the tasks to complete of
289 // the given tracker, and then signals the given event the given number of
290 // times. This is used to wakt up blocked background threads before blocking
291 // on shutdown.
292 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
293 size_t expected_tasks_to_complete,
294 ThreadBlocker* blocker,
295 size_t threads_to_awake) {
296 EXPECT_EQ(
297 expected_tasks_to_complete,
298 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
300 blocker->Unblock(threads_to_awake);
303 class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
304 public:
305 explicit DeletionHelper(
306 const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
307 : deleted_flag_(deleted_flag) {
310 private:
311 friend class base::RefCountedThreadSafe<DeletionHelper>;
312 virtual ~DeletionHelper() { deleted_flag_->data = true; }
314 const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
315 DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
318 void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
319 const scoped_refptr<DeletionHelper>& helper) {
320 ADD_FAILURE() << "Should never run";
323 // Tests that delayed tasks are deleted upon shutdown of the pool.
324 TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
325 // Post something to verify the pool is started up.
326 EXPECT_TRUE(pool()->PostTask(
327 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
329 scoped_refptr<base::RefCountedData<bool> > deleted_flag(
330 new base::RefCountedData<bool>(false));
332 base::Time posted_at(base::Time::Now());
333 // Post something that shouldn't run.
334 EXPECT_TRUE(pool()->PostDelayedTask(
335 FROM_HERE,
336 base::Bind(&HoldPoolReference,
337 pool(),
338 make_scoped_refptr(new DeletionHelper(deleted_flag))),
339 TestTimeouts::action_timeout()));
341 std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
342 ASSERT_EQ(1u, completion_sequence.size());
343 ASSERT_EQ(1, completion_sequence[0]);
345 pool()->Shutdown();
346 // Shutdown is asynchronous, so use ResetPool() to block until the pool is
347 // fully destroyed (and thus shut down).
348 ResetPool();
350 // Verify that we didn't block until the task was due.
351 ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
353 // Verify that the deferred task has not only not run, but has also been
354 // destroyed.
355 ASSERT_TRUE(deleted_flag->data);
358 // Tests that same-named tokens have the same ID.
359 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
360 const std::string name1("hello");
361 SequencedWorkerPool::SequenceToken token1 =
362 pool()->GetNamedSequenceToken(name1);
364 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
366 const std::string name3("goodbye");
367 SequencedWorkerPool::SequenceToken token3 =
368 pool()->GetNamedSequenceToken(name3);
370 // All 3 tokens should be different.
371 EXPECT_FALSE(token1.Equals(token2));
372 EXPECT_FALSE(token1.Equals(token3));
373 EXPECT_FALSE(token2.Equals(token3));
375 // Requesting the same name again should give the same value.
376 SequencedWorkerPool::SequenceToken token1again =
377 pool()->GetNamedSequenceToken(name1);
378 EXPECT_TRUE(token1.Equals(token1again));
380 SequencedWorkerPool::SequenceToken token3again =
381 pool()->GetNamedSequenceToken(name3);
382 EXPECT_TRUE(token3.Equals(token3again));
385 // Tests that posting a bunch of tasks (many more than the number of worker
386 // threads) runs them all.
387 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
388 pool()->PostWorkerTask(FROM_HERE,
389 base::Bind(&TestTracker::SlowTask, tracker(), 0));
391 const size_t kNumTasks = 20;
392 for (size_t i = 1; i < kNumTasks; i++) {
393 pool()->PostWorkerTask(FROM_HERE,
394 base::Bind(&TestTracker::FastTask, tracker(), i));
397 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
398 EXPECT_EQ(kNumTasks, result.size());
401 // Tests that posting a bunch of tasks (many more than the number of
402 // worker threads) to two pools simultaneously runs them all twice.
403 // This test is meant to shake out any concurrency issues between
404 // pools (like histograms).
405 TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
406 SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
407 SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
409 base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
410 pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
411 pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
413 const size_t kNumTasks = 20;
414 for (size_t i = 1; i < kNumTasks; i++) {
415 base::Closure fast_task =
416 base::Bind(&TestTracker::FastTask, tracker(), i);
417 pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
418 pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
421 std::vector<int> result =
422 tracker()->WaitUntilTasksComplete(2*kNumTasks);
423 EXPECT_EQ(2 * kNumTasks, result.size());
425 pool2.pool()->Shutdown();
426 pool1.pool()->Shutdown();
429 // Test that tasks with the same sequence token are executed in order but don't
430 // affect other tasks.
431 TEST_F(SequencedWorkerPoolTest, Sequence) {
432 // Fill all the worker threads except one.
433 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
434 ThreadBlocker background_blocker;
435 for (size_t i = 0; i < kNumBackgroundTasks; i++) {
436 pool()->PostWorkerTask(FROM_HERE,
437 base::Bind(&TestTracker::BlockTask,
438 tracker(), i, &background_blocker));
440 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
442 // Create two tasks with the same sequence token, one that will block on the
443 // event, and one which will just complete quickly when it's run. Since there
444 // is one worker thread free, the first task will start and then block, and
445 // the second task should be waiting.
446 ThreadBlocker blocker;
447 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
448 pool()->PostSequencedWorkerTask(
449 token1, FROM_HERE,
450 base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
451 pool()->PostSequencedWorkerTask(
452 token1, FROM_HERE,
453 base::Bind(&TestTracker::FastTask, tracker(), 101));
454 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
456 // Create another two tasks as above with a different token. These will be
457 // blocked since there are no slots to run.
458 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
459 pool()->PostSequencedWorkerTask(
460 token2, FROM_HERE,
461 base::Bind(&TestTracker::FastTask, tracker(), 200));
462 pool()->PostSequencedWorkerTask(
463 token2, FROM_HERE,
464 base::Bind(&TestTracker::FastTask, tracker(), 201));
465 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
467 // Let one background task complete. This should then let both tasks of
468 // token2 run to completion in order. The second task of token1 should still
469 // be blocked.
470 background_blocker.Unblock(1);
471 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
472 ASSERT_EQ(3u, result.size());
473 EXPECT_EQ(200, result[1]);
474 EXPECT_EQ(201, result[2]);
476 // Finish the rest of the background tasks. This should leave some workers
477 // free with the second token1 task still blocked on the first.
478 background_blocker.Unblock(kNumBackgroundTasks - 1);
479 EXPECT_EQ(kNumBackgroundTasks + 2,
480 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
482 // Allow the first task of token1 to complete. This should run the second.
483 blocker.Unblock(1);
484 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
485 ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
486 EXPECT_EQ(100, result[result.size() - 2]);
487 EXPECT_EQ(101, result[result.size() - 1]);
490 // Tests that any tasks posted after Shutdown are ignored.
491 // Disabled for flakiness. See http://crbug.com/166451.
492 TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
493 // Start tasks to take all the threads and block them.
494 EnsureAllWorkersCreated();
495 ThreadBlocker blocker;
496 for (size_t i = 0; i < kNumWorkerThreads; i++) {
497 pool()->PostWorkerTask(FROM_HERE,
498 base::Bind(&TestTracker::BlockTask,
499 tracker(), i, &blocker));
501 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
503 SetWillWaitForShutdownCallback(
504 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
505 scoped_refptr<TestTracker>(tracker()), 0,
506 &blocker, kNumWorkerThreads));
508 // Shutdown the worker pool. This should discard all non-blocking tasks.
509 const int kMaxNewBlockingTasksAfterShutdown = 100;
510 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
512 int old_has_work_call_count = has_work_call_count();
514 std::vector<int> result =
515 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
517 // The kNumWorkerThread items should have completed, in no particular order.
518 ASSERT_EQ(kNumWorkerThreads, result.size());
519 for (size_t i = 0; i < kNumWorkerThreads; i++) {
520 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
521 result.end());
524 // No further tasks, regardless of shutdown mode, should be allowed.
525 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
526 FROM_HERE,
527 base::Bind(&TestTracker::FastTask, tracker(), 100),
528 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
529 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
530 FROM_HERE,
531 base::Bind(&TestTracker::FastTask, tracker(), 101),
532 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
533 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
534 FROM_HERE,
535 base::Bind(&TestTracker::FastTask, tracker(), 102),
536 SequencedWorkerPool::BLOCK_SHUTDOWN));
538 ASSERT_EQ(old_has_work_call_count, has_work_call_count());
541 TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
542 // Test that <n> new blocking tasks are allowed provided they're posted
543 // by a running tasks.
544 EnsureAllWorkersCreated();
545 ThreadBlocker blocker;
547 // Start tasks to take all the threads and block them.
548 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
549 for (int i = 0; i < kNumBlockTasks; ++i) {
550 EXPECT_TRUE(pool()->PostWorkerTask(
551 FROM_HERE,
552 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
554 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
556 // Queue up shutdown blocking tasks behind those which will attempt to post
557 // additional tasks when run, PostAdditionalTasks attemtps to post 3
558 // new FastTasks, one for each shutdown_behavior.
559 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
560 for (int i = 0; i < kNumQueuedTasks; ++i) {
561 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
562 FROM_HERE,
563 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
564 false),
565 SequencedWorkerPool::BLOCK_SHUTDOWN));
568 // Setup to open the floodgates from within Shutdown().
569 SetWillWaitForShutdownCallback(
570 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
571 scoped_refptr<TestTracker>(tracker()),
572 0, &blocker, kNumBlockTasks));
574 // Allow half of the additional blocking tasks thru.
575 const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
576 pool()->Shutdown(kNumNewBlockingTasksToAllow);
578 // Ensure that the correct number of tasks actually got run.
579 tracker()->WaitUntilTasksComplete(static_cast<size_t>(
580 kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
582 // Clean up the task IDs we added and go home.
583 tracker()->ClearCompleteSequence();
586 // Tests that unrun tasks are discarded properly according to their shutdown
587 // mode.
588 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
589 // Start tasks to take all the threads and block them.
590 EnsureAllWorkersCreated();
591 ThreadBlocker blocker;
592 for (size_t i = 0; i < kNumWorkerThreads; i++) {
593 pool()->PostWorkerTask(FROM_HERE,
594 base::Bind(&TestTracker::BlockTask,
595 tracker(), i, &blocker));
597 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
599 // Create some tasks with different shutdown modes.
600 pool()->PostWorkerTaskWithShutdownBehavior(
601 FROM_HERE,
602 base::Bind(&TestTracker::FastTask, tracker(), 100),
603 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
604 pool()->PostWorkerTaskWithShutdownBehavior(
605 FROM_HERE,
606 base::Bind(&TestTracker::FastTask, tracker(), 101),
607 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
608 pool()->PostWorkerTaskWithShutdownBehavior(
609 FROM_HERE,
610 base::Bind(&TestTracker::FastTask, tracker(), 102),
611 SequencedWorkerPool::BLOCK_SHUTDOWN);
613 // Shutdown the worker pool. This should discard all non-blocking tasks.
614 SetWillWaitForShutdownCallback(
615 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
616 scoped_refptr<TestTracker>(tracker()), 0,
617 &blocker, kNumWorkerThreads));
618 pool()->Shutdown();
620 std::vector<int> result =
621 tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
623 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
624 // one, in no particular order.
625 ASSERT_EQ(kNumWorkerThreads + 1, result.size());
626 for (size_t i = 0; i < kNumWorkerThreads; i++) {
627 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
628 result.end());
630 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
633 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
634 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
635 scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
636 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
637 scoped_refptr<SequencedTaskRunner> sequenced_runner(
638 pool()->GetSequencedTaskRunnerWithShutdownBehavior(
639 pool()->GetSequenceToken(),
640 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
641 EnsureAllWorkersCreated();
642 ThreadBlocker blocker;
643 pool()->PostWorkerTaskWithShutdownBehavior(
644 FROM_HERE,
645 base::Bind(&TestTracker::BlockTask,
646 tracker(), 0, &blocker),
647 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
648 runner->PostTask(
649 FROM_HERE,
650 base::Bind(&TestTracker::BlockTask,
651 tracker(), 1, &blocker));
652 sequenced_runner->PostTask(
653 FROM_HERE,
654 base::Bind(&TestTracker::BlockTask,
655 tracker(), 2, &blocker));
657 tracker()->WaitUntilTasksBlocked(3);
659 // This should not block. If this test hangs, it means it failed.
660 pool()->Shutdown();
662 // The task should not have completed yet.
663 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
665 // Posting more tasks should fail.
666 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
667 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
668 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
669 EXPECT_FALSE(runner->PostTask(
670 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
671 EXPECT_FALSE(sequenced_runner->PostTask(
672 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
674 // Continue the background thread and make sure the tasks can complete.
675 blocker.Unblock(3);
676 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
677 EXPECT_EQ(3u, result.size());
680 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
681 // until they stop, but tasks not yet started do not.
682 TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
683 // Start tasks to take all the threads and block them.
684 EnsureAllWorkersCreated();
685 ThreadBlocker blocker;
687 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
688 // return until these tasks have completed.
689 for (size_t i = 0; i < kNumWorkerThreads; i++) {
690 pool()->PostWorkerTaskWithShutdownBehavior(
691 FROM_HERE,
692 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
693 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
695 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
697 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
698 // executed once Shutdown() has been called.
699 pool()->PostWorkerTaskWithShutdownBehavior(
700 FROM_HERE,
701 base::Bind(&TestTracker::BlockTask,
702 tracker(), 0, &blocker),
703 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
705 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
706 // been started block shutdown.
707 SetWillWaitForShutdownCallback(
708 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
709 scoped_refptr<TestTracker>(tracker()), 0,
710 &blocker, kNumWorkerThreads));
712 // No tasks should have completed yet.
713 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
715 // This should not block. If this test hangs, it means it failed.
716 pool()->Shutdown();
718 // Shutdown should not return until all of the tasks have completed.
719 std::vector<int> result =
720 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
722 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
723 // allowed to complete. No additional non-blocking tasks should have been
724 // started.
725 ASSERT_EQ(kNumWorkerThreads, result.size());
726 for (size_t i = 0; i < kNumWorkerThreads; i++) {
727 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
728 result.end());
732 // Ensure all worker threads are created, and then trigger a spurious
733 // work signal. This shouldn't cause any other work signals to be
734 // triggered. This is a regression test for http://crbug.com/117469.
735 TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
736 EnsureAllWorkersCreated();
737 int old_has_work_call_count = has_work_call_count();
738 pool()->SignalHasWorkForTesting();
739 // This is inherently racy, but can only produce false positives.
740 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
741 EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
744 void IsRunningOnCurrentThreadTask(
745 SequencedWorkerPool::SequenceToken test_positive_token,
746 SequencedWorkerPool::SequenceToken test_negative_token,
747 SequencedWorkerPool* pool,
748 SequencedWorkerPool* unused_pool) {
749 EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
750 EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
751 EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
752 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
753 EXPECT_FALSE(
754 unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
755 EXPECT_FALSE(
756 unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
759 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
760 TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
761 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
762 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
763 SequencedWorkerPool::SequenceToken unsequenced_token;
765 scoped_refptr<SequencedWorkerPool> unused_pool =
766 new SequencedWorkerPool(2, "unused_pool");
768 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
769 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
770 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
771 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
772 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
773 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
774 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
775 EXPECT_FALSE(
776 unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
778 pool()->PostSequencedWorkerTask(
779 token1, FROM_HERE,
780 base::Bind(&IsRunningOnCurrentThreadTask,
781 token1, token2, pool(), unused_pool));
782 pool()->PostSequencedWorkerTask(
783 token2, FROM_HERE,
784 base::Bind(&IsRunningOnCurrentThreadTask,
785 token2, unsequenced_token, pool(), unused_pool));
786 pool()->PostWorkerTask(
787 FROM_HERE,
788 base::Bind(&IsRunningOnCurrentThreadTask,
789 unsequenced_token, token1, pool(), unused_pool));
790 pool()->Shutdown();
791 unused_pool->Shutdown();
794 // Checks that tasks are destroyed in the right context during shutdown. If a
795 // task is destroyed while SequencedWorkerPool's global lock is held,
796 // SequencedWorkerPool might deadlock.
797 TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) {
798 for (int i = 0; i < 4; ++i) {
799 scoped_refptr<DestructionDeadlockChecker> checker(
800 new DestructionDeadlockChecker(pool()));
801 tracker()->PostRepostingTask(pool(), checker);
804 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
805 // which in turn should not deadlock in their destructors.
806 pool()->Shutdown();
809 // Similar to the test AvoidsDeadlockOnShutdown, but there are now also
810 // sequenced, blocking tasks in the queue during shutdown.
811 TEST_F(SequencedWorkerPoolTest,
812 AvoidsDeadlockOnShutdownWithSequencedBlockingTasks) {
813 const std::string sequence_token_name("name");
814 for (int i = 0; i < 4; ++i) {
815 scoped_refptr<DestructionDeadlockChecker> checker(
816 new DestructionDeadlockChecker(pool()));
817 tracker()->PostRepostingTask(pool(), checker);
819 SequencedWorkerPool::SequenceToken token1 =
820 pool()->GetNamedSequenceToken(sequence_token_name);
821 tracker()->PostRepostingBlockingTask(pool(), token1);
824 // Shutting down the pool should destroy the DestructionDeadlockCheckers,
825 // which in turn should not deadlock in their destructors.
826 pool()->Shutdown();
829 // Verify that FlushForTesting works as intended.
830 TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
831 // Should be fine to call on a new instance.
832 pool()->FlushForTesting();
834 // Queue up a bunch of work, including a long delayed task and
835 // a task that produces additional tasks as an artifact.
836 pool()->PostDelayedWorkerTask(
837 FROM_HERE,
838 base::Bind(&TestTracker::FastTask, tracker(), 0),
839 TimeDelta::FromMinutes(5));
840 pool()->PostWorkerTask(FROM_HERE,
841 base::Bind(&TestTracker::SlowTask, tracker(), 0));
842 const size_t kNumFastTasks = 20;
843 for (size_t i = 0; i < kNumFastTasks; i++) {
844 pool()->PostWorkerTask(FROM_HERE,
845 base::Bind(&TestTracker::FastTask, tracker(), 0));
847 pool()->PostWorkerTask(
848 FROM_HERE,
849 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
850 true));
852 // We expect all except the delayed task to have been run. We verify all
853 // closures have been deleted by looking at the refcount of the
854 // tracker.
855 EXPECT_FALSE(tracker()->HasOneRef());
856 pool()->FlushForTesting();
857 EXPECT_TRUE(tracker()->HasOneRef());
858 EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
860 // Should be fine to call on an idle instance with all threads created, and
861 // spamming the method shouldn't deadlock or confuse the class.
862 pool()->FlushForTesting();
863 pool()->FlushForTesting();
865 // Should be fine to call after shutdown too.
866 pool()->Shutdown();
867 pool()->FlushForTesting();
870 TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
871 MessageLoop loop;
872 scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
873 scoped_refptr<SequencedTaskRunner> task_runner =
874 pool->GetSequencedTaskRunnerWithShutdownBehavior(
875 pool->GetSequenceToken(),
876 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
878 // Upon test exit, should shut down without hanging.
879 pool->Shutdown();
882 class SequencedWorkerPoolTaskRunnerTestDelegate {
883 public:
884 SequencedWorkerPoolTaskRunnerTestDelegate() {}
886 ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
888 void StartTaskRunner() {
889 pool_owner_.reset(
890 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
893 scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
894 return pool_owner_->pool();
897 void StopTaskRunner() {
898 // Make sure all tasks are run before shutting down. Delayed tasks are
899 // not run, they're simply deleted.
900 pool_owner_->pool()->FlushForTesting();
901 pool_owner_->pool()->Shutdown();
902 // Don't reset |pool_owner_| here, as the test may still hold a
903 // reference to the pool.
906 private:
907 MessageLoop message_loop_;
908 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
911 INSTANTIATE_TYPED_TEST_CASE_P(
912 SequencedWorkerPool, TaskRunnerTest,
913 SequencedWorkerPoolTaskRunnerTestDelegate);
915 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
916 public:
917 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
919 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
922 void StartTaskRunner() {
923 pool_owner_.reset(
924 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
925 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
926 SequencedWorkerPool::BLOCK_SHUTDOWN);
929 scoped_refptr<TaskRunner> GetTaskRunner() {
930 return task_runner_;
933 void StopTaskRunner() {
934 // Make sure all tasks are run before shutting down. Delayed tasks are
935 // not run, they're simply deleted.
936 pool_owner_->pool()->FlushForTesting();
937 pool_owner_->pool()->Shutdown();
938 // Don't reset |pool_owner_| here, as the test may still hold a
939 // reference to the pool.
942 private:
943 MessageLoop message_loop_;
944 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
945 scoped_refptr<TaskRunner> task_runner_;
948 INSTANTIATE_TYPED_TEST_CASE_P(
949 SequencedWorkerPoolTaskRunner, TaskRunnerTest,
950 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
952 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
953 public:
954 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
956 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
959 void StartTaskRunner() {
960 pool_owner_.reset(new SequencedWorkerPoolOwner(
961 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
962 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
963 pool_owner_->pool()->GetSequenceToken());
966 scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
967 return task_runner_;
970 void StopTaskRunner() {
971 // Make sure all tasks are run before shutting down. Delayed tasks are
972 // not run, they're simply deleted.
973 pool_owner_->pool()->FlushForTesting();
974 pool_owner_->pool()->Shutdown();
975 // Don't reset |pool_owner_| here, as the test may still hold a
976 // reference to the pool.
979 private:
980 MessageLoop message_loop_;
981 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
982 scoped_refptr<SequencedTaskRunner> task_runner_;
985 INSTANTIATE_TYPED_TEST_CASE_P(
986 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
987 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
989 INSTANTIATE_TYPED_TEST_CASE_P(
990 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
991 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
993 } // namespace
995 } // namespace base