1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/renderer/scheduler/task_queue_manager.h"
10 #include "base/trace_event/trace_event.h"
11 #include "base/trace_event/trace_event_argument.h"
12 #include "cc/test/test_now_source.h"
13 #include "content/renderer/scheduler/task_queue_selector.h"
16 const int64_t kMaxTimeTicks
= std::numeric_limits
<int64
>::max();
22 class TaskQueue
: public base::SingleThreadTaskRunner
{
24 TaskQueue(TaskQueueManager
* task_queue_manager
);
26 // base::SingleThreadTaskRunner implementation.
27 bool RunsTasksOnCurrentThread() const override
;
28 bool PostDelayedTask(const tracked_objects::Location
& from_here
,
29 const base::Closure
& task
,
30 base::TimeDelta delay
) override
{
31 return PostDelayedTaskImpl(from_here
, task
, delay
, TaskType::NORMAL
);
34 bool PostNonNestableDelayedTask(const tracked_objects::Location
& from_here
,
35 const base::Closure
& task
,
36 base::TimeDelta delay
) override
{
37 return PostDelayedTaskImpl(from_here
, task
, delay
, TaskType::NON_NESTABLE
);
40 bool IsQueueEmpty() const;
42 void SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy
);
45 bool UpdateWorkQueue(base::TimeTicks
* next_pending_delayed_task
,
46 const base::PendingTask
* previous_task
);
47 base::PendingTask
TakeTaskFromWorkQueue();
49 void WillDeleteTaskQueueManager();
51 base::TaskQueue
& work_queue() { return work_queue_
; }
53 void set_name(const char* name
) { name_
= name
; }
55 void AsValueInto(base::trace_event::TracedValue
* state
) const;
63 ~TaskQueue() override
;
65 bool PostDelayedTaskImpl(const tracked_objects::Location
& from_here
,
66 const base::Closure
& task
,
67 base::TimeDelta delay
,
70 // Adds a task at the end of the incoming task queue and schedules a call to
71 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
72 // pumping is enabled. Can be called on an arbitrary thread.
73 void EnqueueTask(const base::PendingTask
& pending_task
);
75 void PumpQueueLocked();
76 bool TaskIsOlderThanQueuedTasks(const base::PendingTask
* task
);
77 bool ShouldAutoPumpQueueLocked(const base::PendingTask
* previous_task
);
78 void EnqueueTaskLocked(const base::PendingTask
& pending_task
);
80 void TraceQueueSize(bool is_locked
) const;
81 static const char* PumpPolicyToString(
82 TaskQueueManager::PumpPolicy pump_policy
);
83 static void QueueAsValueInto(const base::TaskQueue
& queue
,
84 base::trace_event::TracedValue
* state
);
85 static void TaskAsValueInto(const base::PendingTask
& task
,
86 base::trace_event::TracedValue
* state
);
88 // This lock protects all members except the work queue.
89 mutable base::Lock lock_
;
90 TaskQueueManager
* task_queue_manager_
;
91 base::TaskQueue incoming_queue_
;
92 TaskQueueManager::PumpPolicy pump_policy_
;
94 std::priority_queue
<base::TimeTicks
,
95 std::vector
<base::TimeTicks
>,
96 std::greater
<base::TimeTicks
>> delayed_task_run_times_
;
98 base::TaskQueue work_queue_
;
100 DISALLOW_COPY_AND_ASSIGN(TaskQueue
);
103 TaskQueue::TaskQueue(TaskQueueManager
* task_queue_manager
)
104 : task_queue_manager_(task_queue_manager
),
105 pump_policy_(TaskQueueManager::PumpPolicy::AUTO
),
109 TaskQueue::~TaskQueue() {
112 void TaskQueue::WillDeleteTaskQueueManager() {
113 base::AutoLock
lock(lock_
);
114 task_queue_manager_
= nullptr;
117 bool TaskQueue::RunsTasksOnCurrentThread() const {
118 base::AutoLock
lock(lock_
);
119 if (!task_queue_manager_
)
121 return task_queue_manager_
->RunsTasksOnCurrentThread();
124 bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location
& from_here
,
125 const base::Closure
& task
,
126 base::TimeDelta delay
,
127 TaskType task_type
) {
128 base::AutoLock
lock(lock_
);
129 if (!task_queue_manager_
)
132 base::PendingTask
pending_task(from_here
, task
, base::TimeTicks(),
133 task_type
!= TaskType::NON_NESTABLE
);
134 task_queue_manager_
->DidQueueTask(&pending_task
);
136 if (delay
> base::TimeDelta()) {
137 pending_task
.delayed_run_time
= task_queue_manager_
->Now() + delay
;
138 delayed_task_run_times_
.push(pending_task
.delayed_run_time
);
139 return task_queue_manager_
->PostDelayedTask(
140 FROM_HERE
, Bind(&TaskQueue::EnqueueTask
, this, pending_task
), delay
);
142 EnqueueTaskLocked(pending_task
);
146 bool TaskQueue::IsQueueEmpty() const {
147 if (!work_queue_
.empty())
151 base::AutoLock
lock(lock_
);
152 return incoming_queue_
.empty();
156 bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask
* task
) {
157 lock_
.AssertAcquired();
158 // A null task is passed when UpdateQueue is called before any task is run.
159 // In this case we don't want to pump an after_wakeup queue, so return true
164 // Return false if there are no task in the incoming queue.
165 if (incoming_queue_
.empty())
168 base::PendingTask oldest_queued_task
= incoming_queue_
.front();
169 DCHECK(oldest_queued_task
.delayed_run_time
.is_null());
170 DCHECK(task
->delayed_run_time
.is_null());
172 // Note: the comparison is correct due to the fact that the PendingTask
173 // operator inverts its comparison operation in order to work well in a heap
174 // based priority queue.
175 return oldest_queued_task
< *task
;
178 bool TaskQueue::ShouldAutoPumpQueueLocked(
179 const base::PendingTask
* previous_task
) {
180 lock_
.AssertAcquired();
181 if (pump_policy_
== TaskQueueManager::PumpPolicy::MANUAL
)
183 if (pump_policy_
== TaskQueueManager::PumpPolicy::AFTER_WAKEUP
&&
184 TaskIsOlderThanQueuedTasks(previous_task
))
186 if (incoming_queue_
.empty())
191 bool TaskQueue::UpdateWorkQueue(
192 base::TimeTicks
* next_pending_delayed_task
,
193 const base::PendingTask
* previous_task
) {
194 if (!work_queue_
.empty())
198 base::AutoLock
lock(lock_
);
199 if (!delayed_task_run_times_
.empty()) {
200 *next_pending_delayed_task
=
201 std::min(*next_pending_delayed_task
, delayed_task_run_times_
.top());
203 if (!ShouldAutoPumpQueueLocked(previous_task
))
205 work_queue_
.Swap(&incoming_queue_
);
206 TraceQueueSize(true);
211 base::PendingTask
TaskQueue::TakeTaskFromWorkQueue() {
212 base::PendingTask pending_task
= work_queue_
.front();
214 TraceQueueSize(false);
218 void TaskQueue::TraceQueueSize(bool is_locked
) const {
220 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
221 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), &is_tracing
);
222 if (!is_tracing
|| !name_
)
227 lock_
.AssertAcquired();
228 TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_
,
229 incoming_queue_
.size() + work_queue_
.size());
234 void TaskQueue::EnqueueTask(const base::PendingTask
& pending_task
) {
235 base::AutoLock
lock(lock_
);
236 EnqueueTaskLocked(pending_task
);
239 void TaskQueue::EnqueueTaskLocked(const base::PendingTask
& pending_task
) {
240 lock_
.AssertAcquired();
241 if (!task_queue_manager_
)
243 if (pump_policy_
== TaskQueueManager::PumpPolicy::AUTO
&&
244 incoming_queue_
.empty())
245 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
246 incoming_queue_
.push(pending_task
);
248 if (!pending_task
.delayed_run_time
.is_null()) {
249 // Update the time of the next pending delayed task.
250 while (!delayed_task_run_times_
.empty() &&
251 delayed_task_run_times_
.top() <= pending_task
.delayed_run_time
) {
252 delayed_task_run_times_
.pop();
254 // Clear the delayed run time because we've already applied the delay
255 // before getting here.
256 incoming_queue_
.back().delayed_run_time
= base::TimeTicks();
258 TraceQueueSize(true);
261 void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy
) {
262 base::AutoLock
lock(lock_
);
263 if (pump_policy
== TaskQueueManager::PumpPolicy::AUTO
&&
264 pump_policy_
!= TaskQueueManager::PumpPolicy::AUTO
) {
267 pump_policy_
= pump_policy
;
270 void TaskQueue::PumpQueueLocked() {
271 lock_
.AssertAcquired();
272 while (!incoming_queue_
.empty()) {
273 work_queue_
.push(incoming_queue_
.front());
274 incoming_queue_
.pop();
276 if (!work_queue_
.empty())
277 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
280 void TaskQueue::PumpQueue() {
281 base::AutoLock
lock(lock_
);
285 void TaskQueue::AsValueInto(base::trace_event::TracedValue
* state
) const {
286 base::AutoLock
lock(lock_
);
287 state
->BeginDictionary();
289 state
->SetString("name", name_
);
290 state
->SetString("pump_policy", PumpPolicyToString(pump_policy_
));
291 state
->BeginArray("incoming_queue");
292 QueueAsValueInto(incoming_queue_
, state
);
294 state
->BeginArray("work_queue");
295 QueueAsValueInto(work_queue_
, state
);
297 state
->EndDictionary();
301 const char* TaskQueue::PumpPolicyToString(
302 TaskQueueManager::PumpPolicy pump_policy
) {
303 switch (pump_policy
) {
304 case TaskQueueManager::PumpPolicy::AUTO
:
306 case TaskQueueManager::PumpPolicy::AFTER_WAKEUP
:
307 return "after_wakeup";
308 case TaskQueueManager::PumpPolicy::MANUAL
:
317 void TaskQueue::QueueAsValueInto(const base::TaskQueue
& queue
,
318 base::trace_event::TracedValue
* state
) {
319 base::TaskQueue
queue_copy(queue
);
320 while (!queue_copy
.empty()) {
321 TaskAsValueInto(queue_copy
.front(), state
);
327 void TaskQueue::TaskAsValueInto(const base::PendingTask
& task
,
328 base::trace_event::TracedValue
* state
) {
329 state
->BeginDictionary();
330 state
->SetString("posted_from", task
.posted_from
.ToString());
331 state
->SetInteger("sequence_num", task
.sequence_num
);
332 state
->SetBoolean("nestable", task
.nestable
);
333 state
->SetBoolean("is_high_res", task
.is_high_res
);
336 (task
.delayed_run_time
- base::TimeTicks()).InMicroseconds() / 1000.0L);
337 state
->EndDictionary();
340 } // namespace internal
342 TaskQueueManager::TaskQueueManager(
343 size_t task_queue_count
,
344 scoped_refptr
<base::SingleThreadTaskRunner
> main_task_runner
,
345 TaskQueueSelector
* selector
)
346 : main_task_runner_(main_task_runner
),
348 pending_dowork_count_(0),
350 time_source_(nullptr),
351 weak_factory_(this) {
352 DCHECK(main_task_runner
->RunsTasksOnCurrentThread());
353 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
354 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
357 task_queue_manager_weak_ptr_
= weak_factory_
.GetWeakPtr();
358 for (size_t i
= 0; i
< task_queue_count
; i
++) {
359 scoped_refptr
<internal::TaskQueue
> queue(
360 make_scoped_refptr(new internal::TaskQueue(this)));
361 queues_
.push_back(queue
);
364 std::vector
<const base::TaskQueue
*> work_queues
;
365 for (const auto& queue
: queues_
)
366 work_queues
.push_back(&queue
->work_queue());
367 selector_
->RegisterWorkQueues(work_queues
);
370 TaskQueueManager::~TaskQueueManager() {
371 TRACE_EVENT_OBJECT_DELETED_WITH_ID(
372 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
374 for (auto& queue
: queues_
)
375 queue
->WillDeleteTaskQueueManager();
378 internal::TaskQueue
* TaskQueueManager::Queue(size_t queue_index
) const {
379 DCHECK_LT(queue_index
, queues_
.size());
380 return queues_
[queue_index
].get();
383 scoped_refptr
<base::SingleThreadTaskRunner
>
384 TaskQueueManager::TaskRunnerForQueue(size_t queue_index
) const {
385 return Queue(queue_index
);
388 bool TaskQueueManager::IsQueueEmpty(size_t queue_index
) const {
389 internal::TaskQueue
* queue
= Queue(queue_index
);
390 return queue
->IsQueueEmpty();
393 void TaskQueueManager::SetPumpPolicy(size_t queue_index
,
394 PumpPolicy pump_policy
) {
395 DCHECK(main_thread_checker_
.CalledOnValidThread());
396 internal::TaskQueue
* queue
= Queue(queue_index
);
397 queue
->SetPumpPolicy(pump_policy
);
400 void TaskQueueManager::PumpQueue(size_t queue_index
) {
401 DCHECK(main_thread_checker_
.CalledOnValidThread());
402 internal::TaskQueue
* queue
= Queue(queue_index
);
406 bool TaskQueueManager::UpdateWorkQueues(
407 base::TimeTicks
* next_pending_delayed_task
,
408 const base::PendingTask
* previous_task
) {
409 // TODO(skyostil): This is not efficient when the number of queues grows very
410 // large due to the number of locks taken. Consider optimizing when we get
412 DCHECK(main_thread_checker_
.CalledOnValidThread());
413 bool has_work
= false;
414 for (auto& queue
: queues_
) {
415 has_work
|= queue
->UpdateWorkQueue(next_pending_delayed_task
,
417 if (!queue
->work_queue().empty()) {
418 // Currently we should not be getting tasks with delayed run times in any
419 // of the work queues.
420 DCHECK(queue
->work_queue().front().delayed_run_time
.is_null());
426 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
427 bool on_main_thread
= main_task_runner_
->BelongsToCurrentThread();
428 if (on_main_thread
) {
429 // We only want one pending DoWork posted from the main thread, or we risk
430 // an explosion of pending DoWorks which could starve out everything else.
431 if (pending_dowork_count_
> 0) {
434 pending_dowork_count_
++;
437 main_task_runner_
->PostTask(
438 FROM_HERE
, Bind(&TaskQueueManager::DoWork
, task_queue_manager_weak_ptr_
,
442 void TaskQueueManager::DoWork(bool posted_from_main_thread
) {
443 if (posted_from_main_thread
) {
444 pending_dowork_count_
--;
445 DCHECK_GE(pending_dowork_count_
, 0);
447 DCHECK(main_thread_checker_
.CalledOnValidThread());
449 base::TimeTicks
next_pending_delayed_task(
450 base::TimeTicks::FromInternalValue(kMaxTimeTicks
));
452 // Pass nullptr to UpdateWorkQueues here to prevent waking up an
453 // pump-after-wakeup queue.
454 if (!UpdateWorkQueues(&next_pending_delayed_task
, nullptr))
457 base::PendingTask
previous_task((tracked_objects::Location()),
459 for (int i
= 0; i
< work_batch_size_
; i
++) {
460 // Interrupt the work batch if we should run the next delayed task.
461 if (i
> 0 && next_pending_delayed_task
.ToInternalValue() != kMaxTimeTicks
&&
462 Now() >= next_pending_delayed_task
)
466 if (!SelectWorkQueueToService(&queue_index
))
468 // Note that this function won't post another call to DoWork if one is
469 // already pending, so it is safe to call it in a loop.
470 MaybePostDoWorkOnMainRunner();
471 ProcessTaskFromWorkQueue(queue_index
, i
> 0, &previous_task
);
473 if (!UpdateWorkQueues(&next_pending_delayed_task
, &previous_task
))
478 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index
) {
479 bool should_run
= selector_
->SelectWorkQueueToService(out_queue_index
);
480 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
481 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
482 AsValueWithSelectorResult(should_run
, *out_queue_index
));
486 void TaskQueueManager::DidQueueTask(base::PendingTask
* pending_task
) {
487 pending_task
->sequence_num
= task_sequence_num_
.GetNext();
488 task_annotator_
.DidQueueTask("TaskQueueManager::PostTask", *pending_task
);
491 void TaskQueueManager::ProcessTaskFromWorkQueue(
493 bool has_previous_task
,
494 base::PendingTask
* previous_task
) {
495 DCHECK(main_thread_checker_
.CalledOnValidThread());
496 internal::TaskQueue
* queue
= Queue(queue_index
);
497 base::PendingTask pending_task
= queue
->TakeTaskFromWorkQueue();
498 if (!pending_task
.nestable
) {
499 // Defer non-nestable work to the main task runner. NOTE these tasks can be
500 // arbitrarily delayed so the additional delay should not be a problem.
501 main_task_runner_
->PostNonNestableTask(pending_task
.posted_from
,
504 // Suppress "will" task observer notifications for the first and "did"
505 // notifications for the last task in the batch to avoid duplicate
507 if (has_previous_task
) {
508 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver
, task_observers_
,
509 DidProcessTask(*previous_task
));
510 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver
, task_observers_
,
511 WillProcessTask(pending_task
));
513 task_annotator_
.RunTask("TaskQueueManager::PostTask",
514 "TaskQueueManager::RunTask", pending_task
);
515 pending_task
.task
.Reset();
516 *previous_task
= pending_task
;
520 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
521 return main_task_runner_
->RunsTasksOnCurrentThread();
524 bool TaskQueueManager::PostDelayedTask(
525 const tracked_objects::Location
& from_here
,
526 const base::Closure
& task
,
527 base::TimeDelta delay
) {
528 DCHECK(delay
> base::TimeDelta());
529 return main_task_runner_
->PostDelayedTask(from_here
, task
, delay
);
532 void TaskQueueManager::SetQueueName(size_t queue_index
, const char* name
) {
533 DCHECK(main_thread_checker_
.CalledOnValidThread());
534 internal::TaskQueue
* queue
= Queue(queue_index
);
535 queue
->set_name(name
);
538 void TaskQueueManager::SetWorkBatchSize(int work_batch_size
) {
539 DCHECK(main_thread_checker_
.CalledOnValidThread());
540 DCHECK_GE(work_batch_size
, 1);
541 work_batch_size_
= work_batch_size
;
544 void TaskQueueManager::AddTaskObserver(
545 base::MessageLoop::TaskObserver
* task_observer
) {
546 DCHECK(main_thread_checker_
.CalledOnValidThread());
547 base::MessageLoop::current()->AddTaskObserver(task_observer
);
548 task_observers_
.AddObserver(task_observer
);
551 void TaskQueueManager::RemoveTaskObserver(
552 base::MessageLoop::TaskObserver
* task_observer
) {
553 DCHECK(main_thread_checker_
.CalledOnValidThread());
554 base::MessageLoop::current()->RemoveTaskObserver(task_observer
);
555 task_observers_
.RemoveObserver(task_observer
);
558 void TaskQueueManager::SetTimeSourceForTesting(
559 scoped_refptr
<cc::TestNowSource
> time_source
) {
560 DCHECK(main_thread_checker_
.CalledOnValidThread());
561 time_source_
= time_source
;
564 base::TimeTicks
TaskQueueManager::Now() const {
565 return UNLIKELY(time_source_
) ? time_source_
->Now() : base::TimeTicks::Now();
568 scoped_refptr
<base::trace_event::ConvertableToTraceFormat
>
569 TaskQueueManager::AsValueWithSelectorResult(bool should_run
,
570 size_t selected_queue
) const {
571 DCHECK(main_thread_checker_
.CalledOnValidThread());
572 scoped_refptr
<base::trace_event::TracedValue
> state
=
573 new base::trace_event::TracedValue();
574 state
->BeginArray("queues");
575 for (auto& queue
: queues_
)
576 queue
->AsValueInto(state
.get());
578 state
->BeginDictionary("selector");
579 selector_
->AsValueInto(state
.get());
580 state
->EndDictionary();
582 state
->SetInteger("selected_queue", selected_queue
);
586 } // namespace content