Explicitly add python-numpy dependency to install-build-deps.
[chromium-blink-merge.git] / base / threading / sequenced_worker_pool_unittest.cc
blobb1fe2764442c9159c9614fe3abe102f6a496a67c
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/threading/sequenced_worker_pool.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/synchronization/lock.h"
17 #include "base/test/sequenced_task_runner_test_template.h"
18 #include "base/test/sequenced_worker_pool_owner.h"
19 #include "base/test/task_runner_test_template.h"
20 #include "base/test/test_timeouts.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/time/time.h"
23 #include "base/tracked_objects.h"
24 #include "testing/gtest/include/gtest/gtest.h"
26 namespace base {
28 // IMPORTANT NOTE:
30 // Many of these tests have failure modes where they'll hang forever. These
31 // tests should not be flaky, and hanging indicates a type of failure. Do not
32 // mark as flaky if they're hanging, it's likely an actual bug.
34 namespace {
36 const size_t kNumWorkerThreads = 3;
38 // Allows a number of threads to all be blocked on the same event, and
39 // provides a way to unblock a certain number of them.
40 class ThreadBlocker {
41 public:
42 ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
44 void Block() {
46 base::AutoLock lock(lock_);
47 while (unblock_counter_ == 0)
48 cond_var_.Wait();
49 unblock_counter_--;
51 cond_var_.Signal();
54 void Unblock(size_t count) {
56 base::AutoLock lock(lock_);
57 DCHECK(unblock_counter_ == 0);
58 unblock_counter_ = count;
60 cond_var_.Signal();
63 private:
64 base::Lock lock_;
65 base::ConditionVariable cond_var_;
67 size_t unblock_counter_;
70 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
71 public:
72 TestTracker()
73 : lock_(),
74 cond_var_(&lock_),
75 started_events_(0) {
78 // Each of these tasks appends the argument to the complete sequence vector
79 // so calling code can see what order they finished in.
80 void FastTask(int id) {
81 SignalWorkerDone(id);
84 void SlowTask(int id) {
85 base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
86 SignalWorkerDone(id);
89 void BlockTask(int id, ThreadBlocker* blocker) {
90 // Note that this task has started and signal anybody waiting for that
91 // to happen.
93 base::AutoLock lock(lock_);
94 started_events_++;
96 cond_var_.Signal();
98 blocker->Block();
99 SignalWorkerDone(id);
102 void PostAdditionalTasks(
103 int id, SequencedWorkerPool* pool,
104 bool expected_return_value) {
105 Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
106 EXPECT_EQ(expected_return_value,
107 pool->PostWorkerTaskWithShutdownBehavior(
108 FROM_HERE, fast_task,
109 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
110 EXPECT_EQ(expected_return_value,
111 pool->PostWorkerTaskWithShutdownBehavior(
112 FROM_HERE, fast_task,
113 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
114 pool->PostWorkerTaskWithShutdownBehavior(
115 FROM_HERE, fast_task,
116 SequencedWorkerPool::BLOCK_SHUTDOWN);
117 SignalWorkerDone(id);
120 // Waits until the given number of tasks have started executing.
121 void WaitUntilTasksBlocked(size_t count) {
123 base::AutoLock lock(lock_);
124 while (started_events_ < count)
125 cond_var_.Wait();
127 cond_var_.Signal();
130 // Blocks the current thread until at least the given number of tasks are in
131 // the completed vector, and then returns a copy.
132 std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
133 std::vector<int> ret;
135 base::AutoLock lock(lock_);
136 while (complete_sequence_.size() < num_tasks)
137 cond_var_.Wait();
138 ret = complete_sequence_;
140 cond_var_.Signal();
141 return ret;
144 size_t GetTasksCompletedCount() {
145 base::AutoLock lock(lock_);
146 return complete_sequence_.size();
149 void ClearCompleteSequence() {
150 base::AutoLock lock(lock_);
151 complete_sequence_.clear();
152 started_events_ = 0;
155 private:
156 friend class base::RefCountedThreadSafe<TestTracker>;
157 ~TestTracker() {}
159 void SignalWorkerDone(int id) {
161 base::AutoLock lock(lock_);
162 complete_sequence_.push_back(id);
164 cond_var_.Signal();
167 // Protects the complete_sequence.
168 base::Lock lock_;
170 base::ConditionVariable cond_var_;
172 // Protected by lock_.
173 std::vector<int> complete_sequence_;
175 // Counter of the number of "block" workers that have started.
176 size_t started_events_;
179 class SequencedWorkerPoolTest : public testing::Test {
180 public:
181 SequencedWorkerPoolTest()
182 : tracker_(new TestTracker) {
183 ResetPool();
186 virtual ~SequencedWorkerPoolTest() {}
188 virtual void SetUp() override {}
190 virtual void TearDown() override {
191 pool()->Shutdown();
194 const scoped_refptr<SequencedWorkerPool>& pool() {
195 return pool_owner_->pool();
197 TestTracker* tracker() { return tracker_.get(); }
199 // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
200 // down, and creates a new instance.
201 void ResetPool() {
202 pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
205 void SetWillWaitForShutdownCallback(const Closure& callback) {
206 pool_owner_->SetWillWaitForShutdownCallback(callback);
209 // Ensures that the given number of worker threads is created by adding
210 // tasks and waiting until they complete. Worker thread creation is
211 // serialized, can happen on background threads asynchronously, and doesn't
212 // happen any more at shutdown. This means that if a test posts a bunch of
213 // tasks and calls shutdown, fewer workers will be created than the test may
214 // expect.
216 // This function ensures that this condition can't happen so tests can make
217 // assumptions about the number of workers active. See the comment in
218 // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
219 // details.
221 // It will post tasks to the queue with id -1. It also assumes this is the
222 // first thing called in a test since it will clear the complete_sequence_.
223 void EnsureAllWorkersCreated() {
224 // Create a bunch of threads, all waiting. This will cause that may
225 // workers to be created.
226 ThreadBlocker blocker;
227 for (size_t i = 0; i < kNumWorkerThreads; i++) {
228 pool()->PostWorkerTask(FROM_HERE,
229 base::Bind(&TestTracker::BlockTask,
230 tracker(), -1, &blocker));
232 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
234 // Now wake them up and wait until they're done.
235 blocker.Unblock(kNumWorkerThreads);
236 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
238 // Clean up the task IDs we added.
239 tracker()->ClearCompleteSequence();
242 int has_work_call_count() const {
243 return pool_owner_->has_work_call_count();
246 private:
247 MessageLoop message_loop_;
248 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
249 const scoped_refptr<TestTracker> tracker_;
252 // Checks that the given number of entries are in the tasks to complete of
253 // the given tracker, and then signals the given event the given number of
254 // times. This is used to wakt up blocked background threads before blocking
255 // on shutdown.
256 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
257 size_t expected_tasks_to_complete,
258 ThreadBlocker* blocker,
259 size_t threads_to_awake) {
260 EXPECT_EQ(
261 expected_tasks_to_complete,
262 tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
264 blocker->Unblock(threads_to_awake);
267 class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
268 public:
269 explicit DeletionHelper(
270 const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
271 : deleted_flag_(deleted_flag) {
274 private:
275 friend class base::RefCountedThreadSafe<DeletionHelper>;
276 virtual ~DeletionHelper() { deleted_flag_->data = true; }
278 const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
279 DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
282 void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
283 const scoped_refptr<DeletionHelper>& helper) {
284 ADD_FAILURE() << "Should never run";
287 // Tests that delayed tasks are deleted upon shutdown of the pool.
288 TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
289 // Post something to verify the pool is started up.
290 EXPECT_TRUE(pool()->PostTask(
291 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
293 scoped_refptr<base::RefCountedData<bool> > deleted_flag(
294 new base::RefCountedData<bool>(false));
296 base::Time posted_at(base::Time::Now());
297 // Post something that shouldn't run.
298 EXPECT_TRUE(pool()->PostDelayedTask(
299 FROM_HERE,
300 base::Bind(&HoldPoolReference,
301 pool(),
302 make_scoped_refptr(new DeletionHelper(deleted_flag))),
303 TestTimeouts::action_timeout()));
305 std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
306 ASSERT_EQ(1u, completion_sequence.size());
307 ASSERT_EQ(1, completion_sequence[0]);
309 pool()->Shutdown();
310 // Shutdown is asynchronous, so use ResetPool() to block until the pool is
311 // fully destroyed (and thus shut down).
312 ResetPool();
314 // Verify that we didn't block until the task was due.
315 ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
317 // Verify that the deferred task has not only not run, but has also been
318 // destroyed.
319 ASSERT_TRUE(deleted_flag->data);
322 // Tests that same-named tokens have the same ID.
323 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
324 const std::string name1("hello");
325 SequencedWorkerPool::SequenceToken token1 =
326 pool()->GetNamedSequenceToken(name1);
328 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
330 const std::string name3("goodbye");
331 SequencedWorkerPool::SequenceToken token3 =
332 pool()->GetNamedSequenceToken(name3);
334 // All 3 tokens should be different.
335 EXPECT_FALSE(token1.Equals(token2));
336 EXPECT_FALSE(token1.Equals(token3));
337 EXPECT_FALSE(token2.Equals(token3));
339 // Requesting the same name again should give the same value.
340 SequencedWorkerPool::SequenceToken token1again =
341 pool()->GetNamedSequenceToken(name1);
342 EXPECT_TRUE(token1.Equals(token1again));
344 SequencedWorkerPool::SequenceToken token3again =
345 pool()->GetNamedSequenceToken(name3);
346 EXPECT_TRUE(token3.Equals(token3again));
349 // Tests that posting a bunch of tasks (many more than the number of worker
350 // threads) runs them all.
351 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
352 pool()->PostWorkerTask(FROM_HERE,
353 base::Bind(&TestTracker::SlowTask, tracker(), 0));
355 const size_t kNumTasks = 20;
356 for (size_t i = 1; i < kNumTasks; i++) {
357 pool()->PostWorkerTask(FROM_HERE,
358 base::Bind(&TestTracker::FastTask, tracker(), i));
361 std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
362 EXPECT_EQ(kNumTasks, result.size());
365 // Tests that posting a bunch of tasks (many more than the number of
366 // worker threads) to two pools simultaneously runs them all twice.
367 // This test is meant to shake out any concurrency issues between
368 // pools (like histograms).
369 TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
370 SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
371 SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
373 base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
374 pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
375 pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
377 const size_t kNumTasks = 20;
378 for (size_t i = 1; i < kNumTasks; i++) {
379 base::Closure fast_task =
380 base::Bind(&TestTracker::FastTask, tracker(), i);
381 pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
382 pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
385 std::vector<int> result =
386 tracker()->WaitUntilTasksComplete(2*kNumTasks);
387 EXPECT_EQ(2 * kNumTasks, result.size());
389 pool2.pool()->Shutdown();
390 pool1.pool()->Shutdown();
393 // Test that tasks with the same sequence token are executed in order but don't
394 // affect other tasks.
395 TEST_F(SequencedWorkerPoolTest, Sequence) {
396 // Fill all the worker threads except one.
397 const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
398 ThreadBlocker background_blocker;
399 for (size_t i = 0; i < kNumBackgroundTasks; i++) {
400 pool()->PostWorkerTask(FROM_HERE,
401 base::Bind(&TestTracker::BlockTask,
402 tracker(), i, &background_blocker));
404 tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
406 // Create two tasks with the same sequence token, one that will block on the
407 // event, and one which will just complete quickly when it's run. Since there
408 // is one worker thread free, the first task will start and then block, and
409 // the second task should be waiting.
410 ThreadBlocker blocker;
411 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
412 pool()->PostSequencedWorkerTask(
413 token1, FROM_HERE,
414 base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
415 pool()->PostSequencedWorkerTask(
416 token1, FROM_HERE,
417 base::Bind(&TestTracker::FastTask, tracker(), 101));
418 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
420 // Create another two tasks as above with a different token. These will be
421 // blocked since there are no slots to run.
422 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
423 pool()->PostSequencedWorkerTask(
424 token2, FROM_HERE,
425 base::Bind(&TestTracker::FastTask, tracker(), 200));
426 pool()->PostSequencedWorkerTask(
427 token2, FROM_HERE,
428 base::Bind(&TestTracker::FastTask, tracker(), 201));
429 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
431 // Let one background task complete. This should then let both tasks of
432 // token2 run to completion in order. The second task of token1 should still
433 // be blocked.
434 background_blocker.Unblock(1);
435 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
436 ASSERT_EQ(3u, result.size());
437 EXPECT_EQ(200, result[1]);
438 EXPECT_EQ(201, result[2]);
440 // Finish the rest of the background tasks. This should leave some workers
441 // free with the second token1 task still blocked on the first.
442 background_blocker.Unblock(kNumBackgroundTasks - 1);
443 EXPECT_EQ(kNumBackgroundTasks + 2,
444 tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
446 // Allow the first task of token1 to complete. This should run the second.
447 blocker.Unblock(1);
448 result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
449 ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
450 EXPECT_EQ(100, result[result.size() - 2]);
451 EXPECT_EQ(101, result[result.size() - 1]);
454 // Tests that any tasks posted after Shutdown are ignored.
455 // Disabled for flakiness. See http://crbug.com/166451.
456 TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
457 // Start tasks to take all the threads and block them.
458 EnsureAllWorkersCreated();
459 ThreadBlocker blocker;
460 for (size_t i = 0; i < kNumWorkerThreads; i++) {
461 pool()->PostWorkerTask(FROM_HERE,
462 base::Bind(&TestTracker::BlockTask,
463 tracker(), i, &blocker));
465 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
467 SetWillWaitForShutdownCallback(
468 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
469 scoped_refptr<TestTracker>(tracker()), 0,
470 &blocker, kNumWorkerThreads));
472 // Shutdown the worker pool. This should discard all non-blocking tasks.
473 const int kMaxNewBlockingTasksAfterShutdown = 100;
474 pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
476 int old_has_work_call_count = has_work_call_count();
478 std::vector<int> result =
479 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
481 // The kNumWorkerThread items should have completed, in no particular order.
482 ASSERT_EQ(kNumWorkerThreads, result.size());
483 for (size_t i = 0; i < kNumWorkerThreads; i++) {
484 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
485 result.end());
488 // No further tasks, regardless of shutdown mode, should be allowed.
489 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
490 FROM_HERE,
491 base::Bind(&TestTracker::FastTask, tracker(), 100),
492 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
493 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
494 FROM_HERE,
495 base::Bind(&TestTracker::FastTask, tracker(), 101),
496 SequencedWorkerPool::SKIP_ON_SHUTDOWN));
497 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
498 FROM_HERE,
499 base::Bind(&TestTracker::FastTask, tracker(), 102),
500 SequencedWorkerPool::BLOCK_SHUTDOWN));
502 ASSERT_EQ(old_has_work_call_count, has_work_call_count());
505 TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
506 // Test that <n> new blocking tasks are allowed provided they're posted
507 // by a running tasks.
508 EnsureAllWorkersCreated();
509 ThreadBlocker blocker;
511 // Start tasks to take all the threads and block them.
512 const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
513 for (int i = 0; i < kNumBlockTasks; ++i) {
514 EXPECT_TRUE(pool()->PostWorkerTask(
515 FROM_HERE,
516 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
518 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
520 // Queue up shutdown blocking tasks behind those which will attempt to post
521 // additional tasks when run, PostAdditionalTasks attemtps to post 3
522 // new FastTasks, one for each shutdown_behavior.
523 const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
524 for (int i = 0; i < kNumQueuedTasks; ++i) {
525 EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
526 FROM_HERE,
527 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
528 false),
529 SequencedWorkerPool::BLOCK_SHUTDOWN));
532 // Setup to open the floodgates from within Shutdown().
533 SetWillWaitForShutdownCallback(
534 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
535 scoped_refptr<TestTracker>(tracker()),
536 0, &blocker, kNumBlockTasks));
538 // Allow half of the additional blocking tasks thru.
539 const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
540 pool()->Shutdown(kNumNewBlockingTasksToAllow);
542 // Ensure that the correct number of tasks actually got run.
543 tracker()->WaitUntilTasksComplete(static_cast<size_t>(
544 kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
546 // Clean up the task IDs we added and go home.
547 tracker()->ClearCompleteSequence();
550 // Tests that unrun tasks are discarded properly according to their shutdown
551 // mode.
552 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
553 // Start tasks to take all the threads and block them.
554 EnsureAllWorkersCreated();
555 ThreadBlocker blocker;
556 for (size_t i = 0; i < kNumWorkerThreads; i++) {
557 pool()->PostWorkerTask(FROM_HERE,
558 base::Bind(&TestTracker::BlockTask,
559 tracker(), i, &blocker));
561 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
563 // Create some tasks with different shutdown modes.
564 pool()->PostWorkerTaskWithShutdownBehavior(
565 FROM_HERE,
566 base::Bind(&TestTracker::FastTask, tracker(), 100),
567 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
568 pool()->PostWorkerTaskWithShutdownBehavior(
569 FROM_HERE,
570 base::Bind(&TestTracker::FastTask, tracker(), 101),
571 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
572 pool()->PostWorkerTaskWithShutdownBehavior(
573 FROM_HERE,
574 base::Bind(&TestTracker::FastTask, tracker(), 102),
575 SequencedWorkerPool::BLOCK_SHUTDOWN);
577 // Shutdown the worker pool. This should discard all non-blocking tasks.
578 SetWillWaitForShutdownCallback(
579 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
580 scoped_refptr<TestTracker>(tracker()), 0,
581 &blocker, kNumWorkerThreads));
582 pool()->Shutdown();
584 std::vector<int> result =
585 tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
587 // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
588 // one, in no particular order.
589 ASSERT_EQ(kNumWorkerThreads + 1, result.size());
590 for (size_t i = 0; i < kNumWorkerThreads; i++) {
591 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
592 result.end());
594 EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
597 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
598 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
599 scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
600 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
601 scoped_refptr<SequencedTaskRunner> sequenced_runner(
602 pool()->GetSequencedTaskRunnerWithShutdownBehavior(
603 pool()->GetSequenceToken(),
604 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
605 EnsureAllWorkersCreated();
606 ThreadBlocker blocker;
607 pool()->PostWorkerTaskWithShutdownBehavior(
608 FROM_HERE,
609 base::Bind(&TestTracker::BlockTask,
610 tracker(), 0, &blocker),
611 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
612 runner->PostTask(
613 FROM_HERE,
614 base::Bind(&TestTracker::BlockTask,
615 tracker(), 1, &blocker));
616 sequenced_runner->PostTask(
617 FROM_HERE,
618 base::Bind(&TestTracker::BlockTask,
619 tracker(), 2, &blocker));
621 tracker()->WaitUntilTasksBlocked(3);
623 // This should not block. If this test hangs, it means it failed.
624 pool()->Shutdown();
626 // The task should not have completed yet.
627 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
629 // Posting more tasks should fail.
630 EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
631 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
632 SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
633 EXPECT_FALSE(runner->PostTask(
634 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
635 EXPECT_FALSE(sequenced_runner->PostTask(
636 FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
638 // Continue the background thread and make sure the tasks can complete.
639 blocker.Unblock(3);
640 std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
641 EXPECT_EQ(3u, result.size());
644 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
645 // until they stop, but tasks not yet started do not.
646 TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
647 // Start tasks to take all the threads and block them.
648 EnsureAllWorkersCreated();
649 ThreadBlocker blocker;
651 // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
652 // return until these tasks have completed.
653 for (size_t i = 0; i < kNumWorkerThreads; i++) {
654 pool()->PostWorkerTaskWithShutdownBehavior(
655 FROM_HERE,
656 base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
657 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
659 tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
661 // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
662 // executed once Shutdown() has been called.
663 pool()->PostWorkerTaskWithShutdownBehavior(
664 FROM_HERE,
665 base::Bind(&TestTracker::BlockTask,
666 tracker(), 0, &blocker),
667 SequencedWorkerPool::SKIP_ON_SHUTDOWN);
669 // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
670 // been started block shutdown.
671 SetWillWaitForShutdownCallback(
672 base::Bind(&EnsureTasksToCompleteCountAndUnblock,
673 scoped_refptr<TestTracker>(tracker()), 0,
674 &blocker, kNumWorkerThreads));
676 // No tasks should have completed yet.
677 EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
679 // This should not block. If this test hangs, it means it failed.
680 pool()->Shutdown();
682 // Shutdown should not return until all of the tasks have completed.
683 std::vector<int> result =
684 tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
686 // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
687 // allowed to complete. No additional non-blocking tasks should have been
688 // started.
689 ASSERT_EQ(kNumWorkerThreads, result.size());
690 for (size_t i = 0; i < kNumWorkerThreads; i++) {
691 EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
692 result.end());
696 // Ensure all worker threads are created, and then trigger a spurious
697 // work signal. This shouldn't cause any other work signals to be
698 // triggered. This is a regression test for http://crbug.com/117469.
699 TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
700 EnsureAllWorkersCreated();
701 int old_has_work_call_count = has_work_call_count();
702 pool()->SignalHasWorkForTesting();
703 // This is inherently racy, but can only produce false positives.
704 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
705 EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
708 void IsRunningOnCurrentThreadTask(
709 SequencedWorkerPool::SequenceToken test_positive_token,
710 SequencedWorkerPool::SequenceToken test_negative_token,
711 SequencedWorkerPool* pool,
712 SequencedWorkerPool* unused_pool) {
713 EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
714 EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
715 EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
716 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
717 EXPECT_FALSE(
718 unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
719 EXPECT_FALSE(
720 unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
723 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
724 TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
725 SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
726 SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
727 SequencedWorkerPool::SequenceToken unsequenced_token;
729 scoped_refptr<SequencedWorkerPool> unused_pool =
730 new SequencedWorkerPool(2, "unused_pool");
732 EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
733 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
734 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
735 EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
736 EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
737 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
738 EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
739 EXPECT_FALSE(
740 unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
742 pool()->PostSequencedWorkerTask(
743 token1, FROM_HERE,
744 base::Bind(&IsRunningOnCurrentThreadTask,
745 token1, token2, pool(), unused_pool));
746 pool()->PostSequencedWorkerTask(
747 token2, FROM_HERE,
748 base::Bind(&IsRunningOnCurrentThreadTask,
749 token2, unsequenced_token, pool(), unused_pool));
750 pool()->PostWorkerTask(
751 FROM_HERE,
752 base::Bind(&IsRunningOnCurrentThreadTask,
753 unsequenced_token, token1, pool(), unused_pool));
754 pool()->Shutdown();
755 unused_pool->Shutdown();
758 // Verify that FlushForTesting works as intended.
759 TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
760 // Should be fine to call on a new instance.
761 pool()->FlushForTesting();
763 // Queue up a bunch of work, including a long delayed task and
764 // a task that produces additional tasks as an artifact.
765 pool()->PostDelayedWorkerTask(
766 FROM_HERE,
767 base::Bind(&TestTracker::FastTask, tracker(), 0),
768 TimeDelta::FromMinutes(5));
769 pool()->PostWorkerTask(FROM_HERE,
770 base::Bind(&TestTracker::SlowTask, tracker(), 0));
771 const size_t kNumFastTasks = 20;
772 for (size_t i = 0; i < kNumFastTasks; i++) {
773 pool()->PostWorkerTask(FROM_HERE,
774 base::Bind(&TestTracker::FastTask, tracker(), 0));
776 pool()->PostWorkerTask(
777 FROM_HERE,
778 base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
779 true));
781 // We expect all except the delayed task to have been run. We verify all
782 // closures have been deleted by looking at the refcount of the
783 // tracker.
784 EXPECT_FALSE(tracker()->HasOneRef());
785 pool()->FlushForTesting();
786 EXPECT_TRUE(tracker()->HasOneRef());
787 EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
789 // Should be fine to call on an idle instance with all threads created, and
790 // spamming the method shouldn't deadlock or confuse the class.
791 pool()->FlushForTesting();
792 pool()->FlushForTesting();
794 // Should be fine to call after shutdown too.
795 pool()->Shutdown();
796 pool()->FlushForTesting();
799 TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
800 MessageLoop loop;
801 scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
802 scoped_refptr<SequencedTaskRunner> task_runner =
803 pool->GetSequencedTaskRunnerWithShutdownBehavior(
804 pool->GetSequenceToken(),
805 base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
807 // Upon test exit, should shut down without hanging.
808 pool->Shutdown();
811 class SequencedWorkerPoolTaskRunnerTestDelegate {
812 public:
813 SequencedWorkerPoolTaskRunnerTestDelegate() {}
815 ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
817 void StartTaskRunner() {
818 pool_owner_.reset(
819 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
822 scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
823 return pool_owner_->pool();
826 void StopTaskRunner() {
827 // Make sure all tasks are run before shutting down. Delayed tasks are
828 // not run, they're simply deleted.
829 pool_owner_->pool()->FlushForTesting();
830 pool_owner_->pool()->Shutdown();
831 // Don't reset |pool_owner_| here, as the test may still hold a
832 // reference to the pool.
835 private:
836 MessageLoop message_loop_;
837 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
840 INSTANTIATE_TYPED_TEST_CASE_P(
841 SequencedWorkerPool, TaskRunnerTest,
842 SequencedWorkerPoolTaskRunnerTestDelegate);
844 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
845 public:
846 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
848 ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
851 void StartTaskRunner() {
852 pool_owner_.reset(
853 new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
854 task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
855 SequencedWorkerPool::BLOCK_SHUTDOWN);
858 scoped_refptr<TaskRunner> GetTaskRunner() {
859 return task_runner_;
862 void StopTaskRunner() {
863 // Make sure all tasks are run before shutting down. Delayed tasks are
864 // not run, they're simply deleted.
865 pool_owner_->pool()->FlushForTesting();
866 pool_owner_->pool()->Shutdown();
867 // Don't reset |pool_owner_| here, as the test may still hold a
868 // reference to the pool.
871 private:
872 MessageLoop message_loop_;
873 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
874 scoped_refptr<TaskRunner> task_runner_;
877 INSTANTIATE_TYPED_TEST_CASE_P(
878 SequencedWorkerPoolTaskRunner, TaskRunnerTest,
879 SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
881 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
882 public:
883 SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
885 ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
888 void StartTaskRunner() {
889 pool_owner_.reset(new SequencedWorkerPoolOwner(
890 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
891 task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
892 pool_owner_->pool()->GetSequenceToken());
895 scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
896 return task_runner_;
899 void StopTaskRunner() {
900 // Make sure all tasks are run before shutting down. Delayed tasks are
901 // not run, they're simply deleted.
902 pool_owner_->pool()->FlushForTesting();
903 pool_owner_->pool()->Shutdown();
904 // Don't reset |pool_owner_| here, as the test may still hold a
905 // reference to the pool.
908 private:
909 MessageLoop message_loop_;
910 scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
911 scoped_refptr<SequencedTaskRunner> task_runner_;
914 INSTANTIATE_TYPED_TEST_CASE_P(
915 SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
916 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
918 INSTANTIATE_TYPED_TEST_CASE_P(
919 SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
920 SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
922 } // namespace
924 } // namespace base