1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/scheduler/task_queue_manager.h"
10 #include "base/bind.h"
11 #include "base/trace_event/trace_event.h"
12 #include "base/trace_event/trace_event_argument.h"
13 #include "cc/test/test_now_source.h"
14 #include "content/child/scheduler/nestable_single_thread_task_runner.h"
15 #include "content/child/scheduler/task_queue_selector.h"
18 const int64_t kMaxTimeTicks
= std::numeric_limits
<int64
>::max();
24 // Now() is somewhat expensive so it makes sense not to call Now() unless we
28 explicit LazyNow(base::TimeTicks now
)
29 : task_queue_manager_(nullptr), now_(now
) {
30 DCHECK(!now
.is_null());
33 explicit LazyNow(TaskQueueManager
* task_queue_manager
)
34 : task_queue_manager_(task_queue_manager
) {}
36 base::TimeTicks
Now() {
38 now_
= task_queue_manager_
->Now();
43 TaskQueueManager
* task_queue_manager_
; // NOT OWNED
47 class TaskQueue
: public base::SingleThreadTaskRunner
{
49 TaskQueue(TaskQueueManager
* task_queue_manager
,
50 const char* disabled_by_default_tracing_category
);
52 // base::SingleThreadTaskRunner implementation.
53 bool RunsTasksOnCurrentThread() const override
;
54 bool PostDelayedTask(const tracked_objects::Location
& from_here
,
55 const base::Closure
& task
,
56 base::TimeDelta delay
) override
{
57 return PostDelayedTaskImpl(from_here
, task
, delay
, TaskType::NORMAL
);
60 bool PostNonNestableDelayedTask(const tracked_objects::Location
& from_here
,
61 const base::Closure
& task
,
62 base::TimeDelta delay
) override
{
63 return PostDelayedTaskImpl(from_here
, task
, delay
, TaskType::NON_NESTABLE
);
66 bool IsQueueEmpty() const;
68 void SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy
);
71 bool NextPendingDelayedTaskRunTime(
72 base::TimeTicks
* next_pending_delayed_task
);
74 bool UpdateWorkQueue(LazyNow
* lazy_now
,
75 const base::PendingTask
* previous_task
);
76 base::PendingTask
TakeTaskFromWorkQueue();
78 void WillDeleteTaskQueueManager();
80 base::TaskQueue
& work_queue() { return work_queue_
; }
82 void set_name(const char* name
) { name_
= name
; }
84 void AsValueInto(base::trace_event::TracedValue
* state
) const;
92 ~TaskQueue() override
;
94 bool PostDelayedTaskImpl(const tracked_objects::Location
& from_here
,
95 const base::Closure
& task
,
96 base::TimeDelta delay
,
99 // Delayed task posted to the underlying run loop, which locks |lock_| and
100 // calls MoveReadyDelayedTasksToIncomingQueueLocked to process dealyed tasks
101 // that need to be run now.
102 void MoveReadyDelayedTasksToIncomingQueue();
104 // Enqueues any delayed tasks which should be run now on the incoming_queue_
105 // and calls ScheduleDelayedWorkLocked to ensure future tasks are scheduled.
106 // Must be called with |lock_| locked.
107 void MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow
* lazy_now
);
109 // Posts MoveReadyDelayedTasksToIncomingQueue if there isn't already a task
110 // posted on the underlying runloop for the next task's scheduled run time.
111 void ScheduleDelayedWorkLocked(LazyNow
* lazy_now
);
113 void PumpQueueLocked();
114 bool TaskIsOlderThanQueuedTasks(const base::PendingTask
* task
);
115 bool ShouldAutoPumpQueueLocked(const base::PendingTask
* previous_task
);
116 void EnqueueTaskLocked(const base::PendingTask
& pending_task
);
118 void TraceQueueSize(bool is_locked
) const;
119 static const char* PumpPolicyToString(
120 TaskQueueManager::PumpPolicy pump_policy
);
121 static void QueueAsValueInto(const base::TaskQueue
& queue
,
122 base::trace_event::TracedValue
* state
);
123 static void QueueAsValueInto(const base::DelayedTaskQueue
& queue
,
124 base::trace_event::TracedValue
* state
);
125 static void TaskAsValueInto(const base::PendingTask
& task
,
126 base::trace_event::TracedValue
* state
);
128 // This lock protects all members except the work queue and the
129 // main_thread_checker_.
130 mutable base::Lock lock_
;
131 base::PlatformThreadId thread_id_
;
132 TaskQueueManager
* task_queue_manager_
;
133 base::TaskQueue incoming_queue_
;
134 TaskQueueManager::PumpPolicy pump_policy_
;
136 const char* disabled_by_default_tracing_category_
;
137 base::DelayedTaskQueue delayed_task_queue_
;
138 std::set
<base::TimeTicks
> in_flight_kick_delayed_tasks_
;
140 base::ThreadChecker main_thread_checker_
;
141 base::TaskQueue work_queue_
;
143 DISALLOW_COPY_AND_ASSIGN(TaskQueue
);
146 TaskQueue::TaskQueue(TaskQueueManager
* task_queue_manager
,
147 const char* disabled_by_default_tracing_category
)
148 : thread_id_(base::PlatformThread::CurrentId()),
149 task_queue_manager_(task_queue_manager
),
150 pump_policy_(TaskQueueManager::PumpPolicy::AUTO
),
152 disabled_by_default_tracing_category_(
153 disabled_by_default_tracing_category
) {
156 TaskQueue::~TaskQueue() {
159 void TaskQueue::WillDeleteTaskQueueManager() {
160 base::AutoLock
lock(lock_
);
161 task_queue_manager_
= nullptr;
162 delayed_task_queue_
= base::DelayedTaskQueue();
163 incoming_queue_
= base::TaskQueue();
164 work_queue_
= base::TaskQueue();
167 bool TaskQueue::RunsTasksOnCurrentThread() const {
168 base::AutoLock
lock(lock_
);
169 return base::PlatformThread::CurrentId() == thread_id_
;
172 bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location
& from_here
,
173 const base::Closure
& task
,
174 base::TimeDelta delay
,
175 TaskType task_type
) {
176 base::AutoLock
lock(lock_
);
177 if (!task_queue_manager_
)
180 base::PendingTask
pending_task(from_here
, task
, base::TimeTicks(),
181 task_type
!= TaskType::NON_NESTABLE
);
182 task_queue_manager_
->DidQueueTask(&pending_task
);
184 if (delay
> base::TimeDelta()) {
185 base::TimeTicks now
= task_queue_manager_
->Now();
186 pending_task
.delayed_run_time
= now
+ delay
;
187 delayed_task_queue_
.push(pending_task
);
188 TraceQueueSize(true);
189 // If we changed the topmost task, then it is time to reschedule.
190 if (delayed_task_queue_
.top().task
.Equals(pending_task
.task
)) {
191 LazyNow
lazy_now(now
);
192 ScheduleDelayedWorkLocked(&lazy_now
);
196 EnqueueTaskLocked(pending_task
);
200 void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() {
201 DCHECK(main_thread_checker_
.CalledOnValidThread());
202 base::AutoLock
lock(lock_
);
203 if (!task_queue_manager_
)
206 LazyNow
lazy_now(task_queue_manager_
);
207 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now
);
210 void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow
* lazy_now
) {
211 lock_
.AssertAcquired();
212 // Enqueue all delayed tasks that should be running now.
213 while (!delayed_task_queue_
.empty() &&
214 delayed_task_queue_
.top().delayed_run_time
<= lazy_now
->Now()) {
215 in_flight_kick_delayed_tasks_
.erase(
216 delayed_task_queue_
.top().delayed_run_time
);
217 EnqueueTaskLocked(delayed_task_queue_
.top());
218 delayed_task_queue_
.pop();
220 TraceQueueSize(true);
221 ScheduleDelayedWorkLocked(lazy_now
);
224 void TaskQueue::ScheduleDelayedWorkLocked(LazyNow
* lazy_now
) {
225 lock_
.AssertAcquired();
226 // Any remaining tasks are in the future, so queue a task to kick them.
227 if (!delayed_task_queue_
.empty()) {
228 base::TimeTicks next_run_time
= delayed_task_queue_
.top().delayed_run_time
;
229 DCHECK_GT(next_run_time
, lazy_now
->Now());
230 // Make sure we don't have more than one
231 // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled
232 // run time (note it's fine to have multiple ones in flight for distinct
234 if (in_flight_kick_delayed_tasks_
.find(next_run_time
) ==
235 in_flight_kick_delayed_tasks_
.end()) {
236 in_flight_kick_delayed_tasks_
.insert(next_run_time
);
237 base::TimeDelta delay
= next_run_time
- lazy_now
->Now();
238 task_queue_manager_
->PostDelayedTask(
240 Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue
, this), delay
);
245 bool TaskQueue::IsQueueEmpty() const {
246 if (!work_queue_
.empty())
250 base::AutoLock
lock(lock_
);
251 return incoming_queue_
.empty();
255 bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask
* task
) {
256 lock_
.AssertAcquired();
257 // A null task is passed when UpdateQueue is called before any task is run.
258 // In this case we don't want to pump an after_wakeup queue, so return true
263 // Return false if there are no task in the incoming queue.
264 if (incoming_queue_
.empty())
267 base::PendingTask oldest_queued_task
= incoming_queue_
.front();
268 DCHECK(oldest_queued_task
.delayed_run_time
.is_null());
269 DCHECK(task
->delayed_run_time
.is_null());
271 // Note: the comparison is correct due to the fact that the PendingTask
272 // operator inverts its comparison operation in order to work well in a heap
273 // based priority queue.
274 return oldest_queued_task
< *task
;
277 bool TaskQueue::ShouldAutoPumpQueueLocked(
278 const base::PendingTask
* previous_task
) {
279 lock_
.AssertAcquired();
280 if (pump_policy_
== TaskQueueManager::PumpPolicy::MANUAL
)
282 if (pump_policy_
== TaskQueueManager::PumpPolicy::AFTER_WAKEUP
&&
283 TaskIsOlderThanQueuedTasks(previous_task
))
285 if (incoming_queue_
.empty())
290 bool TaskQueue::NextPendingDelayedTaskRunTime(
291 base::TimeTicks
* next_pending_delayed_task
) {
292 base::AutoLock
lock(lock_
);
293 if (delayed_task_queue_
.empty())
295 *next_pending_delayed_task
= delayed_task_queue_
.top().delayed_run_time
;
299 bool TaskQueue::UpdateWorkQueue(LazyNow
* lazy_now
,
300 const base::PendingTask
* previous_task
) {
301 if (!work_queue_
.empty())
305 base::AutoLock
lock(lock_
);
306 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now
);
307 if (!ShouldAutoPumpQueueLocked(previous_task
))
309 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now
);
310 work_queue_
.Swap(&incoming_queue_
);
311 TraceQueueSize(true);
316 base::PendingTask
TaskQueue::TakeTaskFromWorkQueue() {
317 base::PendingTask pending_task
= work_queue_
.front();
319 TraceQueueSize(false);
323 void TaskQueue::TraceQueueSize(bool is_locked
) const {
325 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_
,
327 if (!is_tracing
|| !name_
)
332 lock_
.AssertAcquired();
334 disabled_by_default_tracing_category_
, name_
,
335 incoming_queue_
.size() + work_queue_
.size() + delayed_task_queue_
.size());
340 void TaskQueue::EnqueueTaskLocked(const base::PendingTask
& pending_task
) {
341 lock_
.AssertAcquired();
342 if (!task_queue_manager_
)
344 if (pump_policy_
== TaskQueueManager::PumpPolicy::AUTO
&&
345 incoming_queue_
.empty())
346 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
347 incoming_queue_
.push(pending_task
);
349 if (!pending_task
.delayed_run_time
.is_null()) {
350 // Clear the delayed run time because we've already applied the delay
351 // before getting here.
352 incoming_queue_
.back().delayed_run_time
= base::TimeTicks();
354 TraceQueueSize(true);
357 void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy
) {
358 base::AutoLock
lock(lock_
);
359 if (pump_policy
== TaskQueueManager::PumpPolicy::AUTO
&&
360 pump_policy_
!= TaskQueueManager::PumpPolicy::AUTO
) {
363 pump_policy_
= pump_policy
;
366 void TaskQueue::PumpQueueLocked() {
367 lock_
.AssertAcquired();
368 if (task_queue_manager_
) {
369 LazyNow
lazy_now(task_queue_manager_
);
370 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now
);
372 while (!incoming_queue_
.empty()) {
373 work_queue_
.push(incoming_queue_
.front());
374 incoming_queue_
.pop();
376 if (!work_queue_
.empty())
377 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
380 void TaskQueue::PumpQueue() {
381 base::AutoLock
lock(lock_
);
385 void TaskQueue::AsValueInto(base::trace_event::TracedValue
* state
) const {
386 base::AutoLock
lock(lock_
);
387 state
->BeginDictionary();
389 state
->SetString("name", name_
);
390 state
->SetString("pump_policy", PumpPolicyToString(pump_policy_
));
391 state
->BeginArray("incoming_queue");
392 QueueAsValueInto(incoming_queue_
, state
);
394 state
->BeginArray("work_queue");
395 QueueAsValueInto(work_queue_
, state
);
397 state
->BeginArray("delayed_task_queue");
398 QueueAsValueInto(delayed_task_queue_
, state
);
400 state
->EndDictionary();
404 const char* TaskQueue::PumpPolicyToString(
405 TaskQueueManager::PumpPolicy pump_policy
) {
406 switch (pump_policy
) {
407 case TaskQueueManager::PumpPolicy::AUTO
:
409 case TaskQueueManager::PumpPolicy::AFTER_WAKEUP
:
410 return "after_wakeup";
411 case TaskQueueManager::PumpPolicy::MANUAL
:
420 void TaskQueue::QueueAsValueInto(const base::TaskQueue
& queue
,
421 base::trace_event::TracedValue
* state
) {
422 base::TaskQueue
queue_copy(queue
);
423 while (!queue_copy
.empty()) {
424 TaskAsValueInto(queue_copy
.front(), state
);
430 void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue
& queue
,
431 base::trace_event::TracedValue
* state
) {
432 base::DelayedTaskQueue
queue_copy(queue
);
433 while (!queue_copy
.empty()) {
434 TaskAsValueInto(queue_copy
.top(), state
);
440 void TaskQueue::TaskAsValueInto(const base::PendingTask
& task
,
441 base::trace_event::TracedValue
* state
) {
442 state
->BeginDictionary();
443 state
->SetString("posted_from", task
.posted_from
.ToString());
444 state
->SetInteger("sequence_num", task
.sequence_num
);
445 state
->SetBoolean("nestable", task
.nestable
);
446 state
->SetBoolean("is_high_res", task
.is_high_res
);
449 (task
.delayed_run_time
- base::TimeTicks()).InMicroseconds() / 1000.0L);
450 state
->EndDictionary();
453 } // namespace internal
455 TaskQueueManager::TaskQueueManager(
456 size_t task_queue_count
,
457 scoped_refptr
<NestableSingleThreadTaskRunner
> main_task_runner
,
458 TaskQueueSelector
* selector
,
459 const char* disabled_by_default_tracing_category
)
460 : main_task_runner_(main_task_runner
),
462 pending_dowork_count_(0),
464 time_source_(nullptr),
465 disabled_by_default_tracing_category_(
466 disabled_by_default_tracing_category
),
467 weak_factory_(this) {
468 DCHECK(main_task_runner
->RunsTasksOnCurrentThread());
469 TRACE_EVENT_OBJECT_CREATED_WITH_ID(disabled_by_default_tracing_category
,
470 "TaskQueueManager", this);
472 task_queue_manager_weak_ptr_
= weak_factory_
.GetWeakPtr();
473 for (size_t i
= 0; i
< task_queue_count
; i
++) {
474 scoped_refptr
<internal::TaskQueue
> queue(make_scoped_refptr(
475 new internal::TaskQueue(this, disabled_by_default_tracing_category
)));
476 queues_
.push_back(queue
);
479 std::vector
<const base::TaskQueue
*> work_queues
;
480 for (const auto& queue
: queues_
)
481 work_queues
.push_back(&queue
->work_queue());
482 selector_
->RegisterWorkQueues(work_queues
);
485 TaskQueueManager::~TaskQueueManager() {
486 TRACE_EVENT_OBJECT_DELETED_WITH_ID(disabled_by_default_tracing_category_
,
487 "TaskQueueManager", this);
488 for (auto& queue
: queues_
)
489 queue
->WillDeleteTaskQueueManager();
492 internal::TaskQueue
* TaskQueueManager::Queue(size_t queue_index
) const {
493 DCHECK_LT(queue_index
, queues_
.size());
494 return queues_
[queue_index
].get();
497 scoped_refptr
<base::SingleThreadTaskRunner
>
498 TaskQueueManager::TaskRunnerForQueue(size_t queue_index
) const {
499 return Queue(queue_index
);
502 bool TaskQueueManager::IsQueueEmpty(size_t queue_index
) const {
503 internal::TaskQueue
* queue
= Queue(queue_index
);
504 return queue
->IsQueueEmpty();
507 base::TimeTicks
TaskQueueManager::NextPendingDelayedTaskRunTime() {
508 DCHECK(main_thread_checker_
.CalledOnValidThread());
509 bool found_pending_task
= false;
510 base::TimeTicks
next_pending_delayed_task(
511 base::TimeTicks::FromInternalValue(kMaxTimeTicks
));
512 for (auto& queue
: queues_
) {
513 base::TimeTicks queues_next_pending_delayed_task
;
514 if (queue
->NextPendingDelayedTaskRunTime(
515 &queues_next_pending_delayed_task
)) {
516 found_pending_task
= true;
517 next_pending_delayed_task
=
518 std::min(next_pending_delayed_task
, queues_next_pending_delayed_task
);
522 if (!found_pending_task
)
523 return base::TimeTicks();
525 DCHECK_NE(next_pending_delayed_task
,
526 base::TimeTicks::FromInternalValue(kMaxTimeTicks
));
527 return next_pending_delayed_task
;
530 void TaskQueueManager::SetPumpPolicy(size_t queue_index
,
531 PumpPolicy pump_policy
) {
532 DCHECK(main_thread_checker_
.CalledOnValidThread());
533 internal::TaskQueue
* queue
= Queue(queue_index
);
534 queue
->SetPumpPolicy(pump_policy
);
537 void TaskQueueManager::PumpQueue(size_t queue_index
) {
538 DCHECK(main_thread_checker_
.CalledOnValidThread());
539 internal::TaskQueue
* queue
= Queue(queue_index
);
543 bool TaskQueueManager::UpdateWorkQueues(
544 const base::PendingTask
* previous_task
) {
545 // TODO(skyostil): This is not efficient when the number of queues grows very
546 // large due to the number of locks taken. Consider optimizing when we get
548 DCHECK(main_thread_checker_
.CalledOnValidThread());
549 internal::LazyNow
lazy_now(this);
550 bool has_work
= false;
551 for (auto& queue
: queues_
) {
552 has_work
|= queue
->UpdateWorkQueue(&lazy_now
, previous_task
);
553 if (!queue
->work_queue().empty()) {
554 // Currently we should not be getting tasks with delayed run times in any
555 // of the work queues.
556 DCHECK(queue
->work_queue().front().delayed_run_time
.is_null());
562 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
563 bool on_main_thread
= main_task_runner_
->BelongsToCurrentThread();
564 if (on_main_thread
) {
565 // We only want one pending DoWork posted from the main thread, or we risk
566 // an explosion of pending DoWorks which could starve out everything else.
567 if (pending_dowork_count_
> 0) {
570 pending_dowork_count_
++;
573 main_task_runner_
->PostTask(
574 FROM_HERE
, Bind(&TaskQueueManager::DoWork
, task_queue_manager_weak_ptr_
,
578 void TaskQueueManager::DoWork(bool posted_from_main_thread
) {
579 if (posted_from_main_thread
) {
580 pending_dowork_count_
--;
581 DCHECK_GE(pending_dowork_count_
, 0);
583 DCHECK(main_thread_checker_
.CalledOnValidThread());
585 // Pass nullptr to UpdateWorkQueues here to prevent waking up a
586 // pump-after-wakeup queue.
587 if (!UpdateWorkQueues(nullptr))
590 base::PendingTask
previous_task((tracked_objects::Location()),
592 for (int i
= 0; i
< work_batch_size_
; i
++) {
594 if (!SelectWorkQueueToService(&queue_index
))
596 // Note that this function won't post another call to DoWork if one is
597 // already pending, so it is safe to call it in a loop.
598 MaybePostDoWorkOnMainRunner();
599 ProcessTaskFromWorkQueue(queue_index
, i
> 0, &previous_task
);
601 if (!UpdateWorkQueues(&previous_task
))
606 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index
) {
607 bool should_run
= selector_
->SelectWorkQueueToService(out_queue_index
);
608 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
609 disabled_by_default_tracing_category_
, "TaskQueueManager", this,
610 AsValueWithSelectorResult(should_run
, *out_queue_index
));
614 void TaskQueueManager::DidQueueTask(base::PendingTask
* pending_task
) {
615 pending_task
->sequence_num
= task_sequence_num_
.GetNext();
616 task_annotator_
.DidQueueTask("TaskQueueManager::PostTask", *pending_task
);
619 void TaskQueueManager::ProcessTaskFromWorkQueue(
621 bool has_previous_task
,
622 base::PendingTask
* previous_task
) {
623 DCHECK(main_thread_checker_
.CalledOnValidThread());
624 internal::TaskQueue
* queue
= Queue(queue_index
);
625 base::PendingTask pending_task
= queue
->TakeTaskFromWorkQueue();
626 if (!pending_task
.nestable
&& main_task_runner_
->IsNested()) {
627 // Defer non-nestable work to the main task runner. NOTE these tasks can be
628 // arbitrarily delayed so the additional delay should not be a problem.
629 main_task_runner_
->PostNonNestableTask(pending_task
.posted_from
,
632 // Suppress "will" task observer notifications for the first and "did"
633 // notifications for the last task in the batch to avoid duplicate
635 if (has_previous_task
) {
636 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver
, task_observers_
,
637 DidProcessTask(*previous_task
));
638 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver
, task_observers_
,
639 WillProcessTask(pending_task
));
641 task_annotator_
.RunTask("TaskQueueManager::PostTask",
642 "TaskQueueManager::RunTask", pending_task
);
643 pending_task
.task
.Reset();
644 *previous_task
= pending_task
;
648 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
649 return main_task_runner_
->RunsTasksOnCurrentThread();
652 bool TaskQueueManager::PostDelayedTask(
653 const tracked_objects::Location
& from_here
,
654 const base::Closure
& task
,
655 base::TimeDelta delay
) {
656 DCHECK(delay
> base::TimeDelta());
657 return main_task_runner_
->PostDelayedTask(from_here
, task
, delay
);
660 void TaskQueueManager::SetQueueName(size_t queue_index
, const char* name
) {
661 DCHECK(main_thread_checker_
.CalledOnValidThread());
662 internal::TaskQueue
* queue
= Queue(queue_index
);
663 queue
->set_name(name
);
666 void TaskQueueManager::SetWorkBatchSize(int work_batch_size
) {
667 DCHECK(main_thread_checker_
.CalledOnValidThread());
668 DCHECK_GE(work_batch_size
, 1);
669 work_batch_size_
= work_batch_size
;
672 void TaskQueueManager::AddTaskObserver(
673 base::MessageLoop::TaskObserver
* task_observer
) {
674 DCHECK(main_thread_checker_
.CalledOnValidThread());
675 base::MessageLoop::current()->AddTaskObserver(task_observer
);
676 task_observers_
.AddObserver(task_observer
);
679 void TaskQueueManager::RemoveTaskObserver(
680 base::MessageLoop::TaskObserver
* task_observer
) {
681 DCHECK(main_thread_checker_
.CalledOnValidThread());
682 base::MessageLoop::current()->RemoveTaskObserver(task_observer
);
683 task_observers_
.RemoveObserver(task_observer
);
686 void TaskQueueManager::SetTimeSourceForTesting(
687 scoped_refptr
<cc::TestNowSource
> time_source
) {
688 DCHECK(main_thread_checker_
.CalledOnValidThread());
689 time_source_
= time_source
;
692 base::TimeTicks
TaskQueueManager::Now() const {
693 return UNLIKELY(time_source_
) ? time_source_
->Now() : base::TimeTicks::Now();
696 scoped_refptr
<base::trace_event::ConvertableToTraceFormat
>
697 TaskQueueManager::AsValueWithSelectorResult(bool should_run
,
698 size_t selected_queue
) const {
699 DCHECK(main_thread_checker_
.CalledOnValidThread());
700 scoped_refptr
<base::trace_event::TracedValue
> state
=
701 new base::trace_event::TracedValue();
702 state
->BeginArray("queues");
703 for (auto& queue
: queues_
)
704 queue
->AsValueInto(state
.get());
706 state
->BeginDictionary("selector");
707 selector_
->AsValueInto(state
.get());
708 state
->EndDictionary();
710 state
->SetInteger("selected_queue", selected_queue
);
714 } // namespace content