1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/renderer/scheduler/task_queue_manager.h"
10 #include "base/trace_event/trace_event.h"
11 #include "base/trace_event/trace_event_argument.h"
12 #include "cc/test/test_now_source.h"
13 #include "content/renderer/scheduler/task_queue_selector.h"
16 const int64_t kMaxTimeTicks
= std::numeric_limits
<int64
>::max();
22 class TaskQueue
: public base::SingleThreadTaskRunner
{
24 TaskQueue(TaskQueueManager
* task_queue_manager
);
26 // base::SingleThreadTaskRunner implementation.
27 bool RunsTasksOnCurrentThread() const override
;
28 bool PostDelayedTask(const tracked_objects::Location
& from_here
,
29 const base::Closure
& task
,
30 base::TimeDelta delay
) override
{
31 return PostDelayedTaskImpl(from_here
, task
, delay
, true);
34 bool PostNonNestableDelayedTask(const tracked_objects::Location
& from_here
,
35 const base::Closure
& task
,
36 base::TimeDelta delay
) override
{
37 return PostDelayedTaskImpl(from_here
, task
, delay
, false);
40 // Adds a task at the end of the incoming task queue and schedules a call to
41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
42 // pumping is enabled. Can be called on an arbitrary thread.
43 void EnqueueTask(const base::PendingTask
& pending_task
);
45 bool IsQueueEmpty() const;
47 void SetAutoPump(bool auto_pump
);
50 bool UpdateWorkQueue(base::TimeTicks
* next_pending_delayed_task
);
51 base::PendingTask
TakeTaskFromWorkQueue();
53 void WillDeleteTaskQueueManager();
55 base::TaskQueue
& work_queue() { return work_queue_
; }
57 void set_name(const char* name
) { name_
= name
; }
59 void AsValueInto(base::debug::TracedValue
* state
) const;
62 ~TaskQueue() override
;
64 bool PostDelayedTaskImpl(const tracked_objects::Location
& from_here
,
65 const base::Closure
& task
,
66 base::TimeDelta delay
,
69 void PumpQueueLocked();
70 void EnqueueTaskLocked(const base::PendingTask
& pending_task
);
72 void TraceWorkQueueSize() const;
73 static void QueueAsValueInto(const base::TaskQueue
& queue
,
74 base::debug::TracedValue
* state
);
75 static void TaskAsValueInto(const base::PendingTask
& task
,
76 base::debug::TracedValue
* state
);
78 // This lock protects all members except the work queue.
79 mutable base::Lock lock_
;
80 TaskQueueManager
* task_queue_manager_
;
81 base::TaskQueue incoming_queue_
;
84 std::priority_queue
<base::TimeTicks
,
85 std::vector
<base::TimeTicks
>,
86 std::greater
<base::TimeTicks
>> delayed_task_run_times_
;
88 base::TaskQueue work_queue_
;
90 DISALLOW_COPY_AND_ASSIGN(TaskQueue
);
93 TaskQueue::TaskQueue(TaskQueueManager
* task_queue_manager
)
94 : task_queue_manager_(task_queue_manager
),
99 TaskQueue::~TaskQueue() {
102 void TaskQueue::WillDeleteTaskQueueManager() {
103 base::AutoLock
lock(lock_
);
104 task_queue_manager_
= nullptr;
107 bool TaskQueue::RunsTasksOnCurrentThread() const {
108 base::AutoLock
lock(lock_
);
109 if (!task_queue_manager_
)
111 return task_queue_manager_
->RunsTasksOnCurrentThread();
114 bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location
& from_here
,
115 const base::Closure
& task
,
116 base::TimeDelta delay
,
118 base::AutoLock
lock(lock_
);
119 if (!task_queue_manager_
)
122 base::PendingTask
pending_task(from_here
, task
, base::TimeTicks(), nestable
);
123 task_queue_manager_
->DidQueueTask(&pending_task
);
125 if (delay
> base::TimeDelta()) {
126 pending_task
.delayed_run_time
= task_queue_manager_
->Now() + delay
;
127 delayed_task_run_times_
.push(pending_task
.delayed_run_time
);
128 return task_queue_manager_
->PostDelayedTask(
129 from_here
, Bind(&TaskQueue::EnqueueTask
, this, pending_task
), delay
);
131 EnqueueTaskLocked(pending_task
);
135 bool TaskQueue::IsQueueEmpty() const {
136 if (!work_queue_
.empty())
140 base::AutoLock
lock(lock_
);
141 return incoming_queue_
.empty();
145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks
* next_pending_delayed_task
) {
146 if (!work_queue_
.empty())
150 base::AutoLock
lock(lock_
);
151 if (!delayed_task_run_times_
.empty()) {
152 *next_pending_delayed_task
=
153 std::min(*next_pending_delayed_task
, delayed_task_run_times_
.top());
155 if (!auto_pump_
|| incoming_queue_
.empty())
157 work_queue_
.Swap(&incoming_queue_
);
158 TraceWorkQueueSize();
163 base::PendingTask
TaskQueue::TakeTaskFromWorkQueue() {
164 base::PendingTask pending_task
= work_queue_
.front();
166 TraceWorkQueueSize();
170 void TaskQueue::TraceWorkQueueSize() const {
173 TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_
,
177 void TaskQueue::EnqueueTask(const base::PendingTask
& pending_task
) {
178 base::AutoLock
lock(lock_
);
179 EnqueueTaskLocked(pending_task
);
182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask
& pending_task
) {
183 lock_
.AssertAcquired();
184 if (!task_queue_manager_
)
186 if (auto_pump_
&& incoming_queue_
.empty())
187 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
188 incoming_queue_
.push(pending_task
);
190 if (!pending_task
.delayed_run_time
.is_null()) {
191 // Update the time of the next pending delayed task.
192 while (!delayed_task_run_times_
.empty() &&
193 delayed_task_run_times_
.top() <= pending_task
.delayed_run_time
) {
194 delayed_task_run_times_
.pop();
196 // Clear the delayed run time because we've already applied the delay
197 // before getting here.
198 incoming_queue_
.back().delayed_run_time
= base::TimeTicks();
202 void TaskQueue::SetAutoPump(bool auto_pump
) {
203 base::AutoLock
lock(lock_
);
212 void TaskQueue::PumpQueueLocked() {
213 lock_
.AssertAcquired();
214 while (!incoming_queue_
.empty()) {
215 work_queue_
.push(incoming_queue_
.front());
216 incoming_queue_
.pop();
218 if (!work_queue_
.empty())
219 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
222 void TaskQueue::PumpQueue() {
223 base::AutoLock
lock(lock_
);
227 void TaskQueue::AsValueInto(base::debug::TracedValue
* state
) const {
228 base::AutoLock
lock(lock_
);
229 state
->BeginDictionary();
231 state
->SetString("name", name_
);
232 state
->SetBoolean("auto_pump", auto_pump_
);
233 state
->BeginArray("incoming_queue");
234 QueueAsValueInto(incoming_queue_
, state
);
236 state
->BeginArray("work_queue");
237 QueueAsValueInto(work_queue_
, state
);
239 state
->EndDictionary();
243 void TaskQueue::QueueAsValueInto(const base::TaskQueue
& queue
,
244 base::debug::TracedValue
* state
) {
245 base::TaskQueue
queue_copy(queue
);
246 while (!queue_copy
.empty()) {
247 TaskAsValueInto(queue_copy
.front(), state
);
253 void TaskQueue::TaskAsValueInto(const base::PendingTask
& task
,
254 base::debug::TracedValue
* state
) {
255 state
->BeginDictionary();
256 state
->SetString("posted_from", task
.posted_from
.ToString());
257 state
->SetInteger("sequence_num", task
.sequence_num
);
258 state
->SetBoolean("nestable", task
.nestable
);
259 state
->SetBoolean("is_high_res", task
.is_high_res
);
262 (task
.delayed_run_time
- base::TimeTicks()).InMicroseconds() / 1000.0L);
263 state
->EndDictionary();
266 } // namespace internal
268 TaskQueueManager::TaskQueueManager(
269 size_t task_queue_count
,
270 scoped_refptr
<base::SingleThreadTaskRunner
> main_task_runner
,
271 TaskQueueSelector
* selector
)
272 : main_task_runner_(main_task_runner
),
274 pending_dowork_count_(0),
276 time_source_(nullptr),
277 weak_factory_(this) {
278 DCHECK(main_task_runner
->RunsTasksOnCurrentThread());
279 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
280 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
283 task_queue_manager_weak_ptr_
= weak_factory_
.GetWeakPtr();
284 for (size_t i
= 0; i
< task_queue_count
; i
++) {
285 scoped_refptr
<internal::TaskQueue
> queue(
286 make_scoped_refptr(new internal::TaskQueue(this)));
287 queues_
.push_back(queue
);
290 std::vector
<const base::TaskQueue
*> work_queues
;
291 for (const auto& queue
: queues_
)
292 work_queues
.push_back(&queue
->work_queue());
293 selector_
->RegisterWorkQueues(work_queues
);
296 TaskQueueManager::~TaskQueueManager() {
297 TRACE_EVENT_OBJECT_DELETED_WITH_ID(
298 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
300 for (auto& queue
: queues_
)
301 queue
->WillDeleteTaskQueueManager();
304 internal::TaskQueue
* TaskQueueManager::Queue(size_t queue_index
) const {
305 DCHECK_LT(queue_index
, queues_
.size());
306 return queues_
[queue_index
].get();
309 scoped_refptr
<base::SingleThreadTaskRunner
>
310 TaskQueueManager::TaskRunnerForQueue(size_t queue_index
) const {
311 return Queue(queue_index
);
314 bool TaskQueueManager::IsQueueEmpty(size_t queue_index
) const {
315 internal::TaskQueue
* queue
= Queue(queue_index
);
316 return queue
->IsQueueEmpty();
319 void TaskQueueManager::SetAutoPump(size_t queue_index
, bool auto_pump
) {
320 main_thread_checker_
.CalledOnValidThread();
321 internal::TaskQueue
* queue
= Queue(queue_index
);
322 queue
->SetAutoPump(auto_pump
);
325 void TaskQueueManager::PumpQueue(size_t queue_index
) {
326 main_thread_checker_
.CalledOnValidThread();
327 internal::TaskQueue
* queue
= Queue(queue_index
);
331 bool TaskQueueManager::UpdateWorkQueues(
332 base::TimeTicks
* next_pending_delayed_task
) {
333 // TODO(skyostil): This is not efficient when the number of queues grows very
334 // large due to the number of locks taken. Consider optimizing when we get
336 main_thread_checker_
.CalledOnValidThread();
337 bool has_work
= false;
338 for (auto& queue
: queues_
) {
339 has_work
|= queue
->UpdateWorkQueue(next_pending_delayed_task
);
340 if (!queue
->work_queue().empty()) {
341 // Currently we should not be getting tasks with delayed run times in any
342 // of the work queues.
343 DCHECK(queue
->work_queue().front().delayed_run_time
.is_null());
349 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
350 bool on_main_thread
= main_task_runner_
->BelongsToCurrentThread();
351 if (on_main_thread
) {
352 // We only want one pending DoWork posted from the main thread, or we risk
353 // an explosion of pending DoWorks which could starve out everything else.
354 if (pending_dowork_count_
> 0) {
357 pending_dowork_count_
++;
360 main_task_runner_
->PostTask(
361 FROM_HERE
, Bind(&TaskQueueManager::DoWork
, task_queue_manager_weak_ptr_
,
365 void TaskQueueManager::DoWork(bool posted_from_main_thread
) {
366 if (posted_from_main_thread
) {
367 pending_dowork_count_
--;
368 DCHECK_GE(pending_dowork_count_
, 0);
370 main_thread_checker_
.CalledOnValidThread();
372 base::TimeTicks
next_pending_delayed_task(
373 base::TimeTicks::FromInternalValue(kMaxTimeTicks
));
374 for (int i
= 0; i
< work_batch_size_
; i
++) {
375 if (!UpdateWorkQueues(&next_pending_delayed_task
))
378 // Interrupt the work batch if we should run the next delayed task.
379 if (i
> 0 && next_pending_delayed_task
.ToInternalValue() != kMaxTimeTicks
&&
380 Now() >= next_pending_delayed_task
)
384 if (!SelectWorkQueueToService(&queue_index
))
386 // Note that this function won't post another call to DoWork if one is
387 // already pending, so it is safe to call it in a loop.
388 MaybePostDoWorkOnMainRunner();
389 ProcessTaskFromWorkQueue(queue_index
);
393 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index
) {
394 bool should_run
= selector_
->SelectWorkQueueToService(out_queue_index
);
395 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
396 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
397 AsValueWithSelectorResult(should_run
, *out_queue_index
));
401 void TaskQueueManager::DidQueueTask(base::PendingTask
* pending_task
) {
402 pending_task
->sequence_num
= task_sequence_num_
.GetNext();
403 task_annotator_
.DidQueueTask("TaskQueueManager::PostTask", *pending_task
);
406 void TaskQueueManager::ProcessTaskFromWorkQueue(size_t queue_index
) {
407 main_thread_checker_
.CalledOnValidThread();
408 internal::TaskQueue
* queue
= Queue(queue_index
);
409 base::PendingTask pending_task
= queue
->TakeTaskFromWorkQueue();
410 if (!pending_task
.nestable
) {
411 // Defer non-nestable work to the main task runner. NOTE these tasks can be
412 // arbitrarily delayed so the additional delay should not be a problem.
413 main_task_runner_
->PostNonNestableTask(pending_task
.posted_from
,
416 task_annotator_
.RunTask("TaskQueueManager::PostTask",
417 "TaskQueueManager::RunTask", pending_task
);
421 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
422 return main_task_runner_
->RunsTasksOnCurrentThread();
425 bool TaskQueueManager::PostDelayedTask(
426 const tracked_objects::Location
& from_here
,
427 const base::Closure
& task
,
428 base::TimeDelta delay
) {
429 DCHECK(delay
> base::TimeDelta());
430 return main_task_runner_
->PostDelayedTask(from_here
, task
, delay
);
433 void TaskQueueManager::SetQueueName(size_t queue_index
, const char* name
) {
434 main_thread_checker_
.CalledOnValidThread();
435 internal::TaskQueue
* queue
= Queue(queue_index
);
436 queue
->set_name(name
);
439 void TaskQueueManager::SetWorkBatchSize(int work_batch_size
) {
440 main_thread_checker_
.CalledOnValidThread();
441 DCHECK_GE(work_batch_size
, 1);
442 work_batch_size_
= work_batch_size
;
445 void TaskQueueManager::SetTimeSourceForTesting(
446 scoped_refptr
<cc::TestNowSource
> time_source
) {
447 main_thread_checker_
.CalledOnValidThread();
448 time_source_
= time_source
;
451 base::TimeTicks
TaskQueueManager::Now() const {
452 return UNLIKELY(time_source_
) ? time_source_
->Now() : base::TimeTicks::Now();
455 scoped_refptr
<base::debug::ConvertableToTraceFormat
>
456 TaskQueueManager::AsValueWithSelectorResult(bool should_run
,
457 size_t selected_queue
) const {
458 main_thread_checker_
.CalledOnValidThread();
459 scoped_refptr
<base::debug::TracedValue
> state
=
460 new base::debug::TracedValue();
461 state
->BeginArray("queues");
462 for (auto& queue
: queues_
)
463 queue
->AsValueInto(state
.get());
465 state
->BeginDictionary("selector");
466 selector_
->AsValueInto(state
.get());
467 state
->EndDictionary();
469 state
->SetInteger("selected_queue", selected_queue
);
473 } // namespace content