1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/renderer/scheduler/task_queue_manager.h"
8 #include "base/trace_event/trace_event.h"
9 #include "base/trace_event/trace_event_argument.h"
10 #include "content/renderer/scheduler/task_queue_selector.h"
15 class TaskQueue
: public base::SingleThreadTaskRunner
{
17 TaskQueue(TaskQueueManager
* task_queue_manager
);
19 // base::SingleThreadTaskRunner implementation.
20 bool RunsTasksOnCurrentThread() const override
;
21 bool PostDelayedTask(const tracked_objects::Location
& from_here
,
22 const base::Closure
& task
,
23 base::TimeDelta delay
) override
{
24 return PostDelayedTaskImpl(from_here
, task
, delay
, true);
27 bool PostNonNestableDelayedTask(const tracked_objects::Location
& from_here
,
28 const base::Closure
& task
,
29 base::TimeDelta delay
) override
{
30 return PostDelayedTaskImpl(from_here
, task
, delay
, false);
33 // Adds a task at the end of the incoming task queue and schedules a call to
34 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
35 // pumping is enabled. Can be called on an arbitrary thread.
36 void EnqueueTask(const base::PendingTask
& pending_task
);
38 bool IsQueueEmpty() const;
40 void SetAutoPump(bool auto_pump
);
43 bool UpdateWorkQueue();
44 base::PendingTask
TakeTaskFromWorkQueue();
46 void WillDeleteTaskQueueManager();
48 base::TaskQueue
& work_queue() { return work_queue_
; }
50 void set_name(const char* name
) { name_
= name
; }
52 void AsValueInto(base::debug::TracedValue
* state
) const;
55 ~TaskQueue() override
;
57 bool PostDelayedTaskImpl(const tracked_objects::Location
& from_here
,
58 const base::Closure
& task
,
59 base::TimeDelta delay
,
62 void PumpQueueLocked();
63 void EnqueueTaskLocked(const base::PendingTask
& pending_task
);
65 void TraceWorkQueueSize() const;
66 static void QueueAsValueInto(const base::TaskQueue
& queue
,
67 base::debug::TracedValue
* state
);
68 static void TaskAsValueInto(const base::PendingTask
& task
,
69 base::debug::TracedValue
* state
);
71 // This lock protects all members except the work queue.
72 mutable base::Lock lock_
;
73 TaskQueueManager
* task_queue_manager_
;
74 base::TaskQueue incoming_queue_
;
78 base::TaskQueue work_queue_
;
80 DISALLOW_COPY_AND_ASSIGN(TaskQueue
);
83 TaskQueue::TaskQueue(TaskQueueManager
* task_queue_manager
)
84 : task_queue_manager_(task_queue_manager
),
89 TaskQueue::~TaskQueue() {
92 void TaskQueue::WillDeleteTaskQueueManager() {
93 base::AutoLock
lock(lock_
);
94 task_queue_manager_
= nullptr;
97 bool TaskQueue::RunsTasksOnCurrentThread() const {
98 base::AutoLock
lock(lock_
);
99 if (!task_queue_manager_
)
101 return task_queue_manager_
->RunsTasksOnCurrentThread();
104 bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location
& from_here
,
105 const base::Closure
& task
,
106 base::TimeDelta delay
,
108 base::AutoLock
lock(lock_
);
109 if (!task_queue_manager_
)
112 base::PendingTask
pending_task(from_here
, task
, base::TimeTicks(), nestable
);
113 task_queue_manager_
->DidQueueTask(&pending_task
);
115 if (delay
> base::TimeDelta()) {
116 return task_queue_manager_
->PostDelayedTask(
117 from_here
, Bind(&TaskQueue::EnqueueTask
, this, pending_task
), delay
);
119 EnqueueTaskLocked(pending_task
);
123 bool TaskQueue::IsQueueEmpty() const {
124 if (!work_queue_
.empty())
128 base::AutoLock
lock(lock_
);
129 return incoming_queue_
.empty();
133 bool TaskQueue::UpdateWorkQueue() {
134 if (!work_queue_
.empty())
138 base::AutoLock
lock(lock_
);
139 if (!auto_pump_
|| incoming_queue_
.empty())
141 work_queue_
.Swap(&incoming_queue_
);
142 TraceWorkQueueSize();
147 base::PendingTask
TaskQueue::TakeTaskFromWorkQueue() {
148 base::PendingTask pending_task
= work_queue_
.front();
150 TraceWorkQueueSize();
154 void TaskQueue::TraceWorkQueueSize() const {
157 TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_
,
161 void TaskQueue::EnqueueTask(const base::PendingTask
& pending_task
) {
162 base::AutoLock
lock(lock_
);
163 EnqueueTaskLocked(pending_task
);
166 void TaskQueue::EnqueueTaskLocked(const base::PendingTask
& pending_task
) {
167 lock_
.AssertAcquired();
168 if (!task_queue_manager_
)
170 if (auto_pump_
&& incoming_queue_
.empty())
171 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
172 incoming_queue_
.push(pending_task
);
175 void TaskQueue::SetAutoPump(bool auto_pump
) {
176 base::AutoLock
lock(lock_
);
185 void TaskQueue::PumpQueueLocked() {
186 lock_
.AssertAcquired();
187 while (!incoming_queue_
.empty()) {
188 work_queue_
.push(incoming_queue_
.front());
189 incoming_queue_
.pop();
191 if (!work_queue_
.empty())
192 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
195 void TaskQueue::PumpQueue() {
196 base::AutoLock
lock(lock_
);
200 void TaskQueue::AsValueInto(base::debug::TracedValue
* state
) const {
201 base::AutoLock
lock(lock_
);
202 state
->BeginDictionary();
204 state
->SetString("name", name_
);
205 state
->SetBoolean("auto_pump", auto_pump_
);
206 state
->BeginArray("incoming_queue");
207 QueueAsValueInto(incoming_queue_
, state
);
209 state
->BeginArray("work_queue");
210 QueueAsValueInto(work_queue_
, state
);
212 state
->EndDictionary();
216 void TaskQueue::QueueAsValueInto(const base::TaskQueue
& queue
,
217 base::debug::TracedValue
* state
) {
218 base::TaskQueue
queue_copy(queue
);
219 while (!queue_copy
.empty()) {
220 TaskAsValueInto(queue_copy
.front(), state
);
226 void TaskQueue::TaskAsValueInto(const base::PendingTask
& task
,
227 base::debug::TracedValue
* state
) {
228 state
->BeginDictionary();
229 state
->SetString("posted_from", task
.posted_from
.ToString());
230 state
->SetInteger("sequence_num", task
.sequence_num
);
231 state
->SetBoolean("nestable", task
.nestable
);
232 state
->SetBoolean("is_high_res", task
.is_high_res
);
235 (task
.delayed_run_time
- base::TimeTicks()).InMicroseconds() / 1000.0L);
236 state
->EndDictionary();
239 } // namespace internal
241 TaskQueueManager::TaskQueueManager(
242 size_t task_queue_count
,
243 scoped_refptr
<base::SingleThreadTaskRunner
> main_task_runner
,
244 TaskQueueSelector
* selector
)
245 : main_task_runner_(main_task_runner
),
247 pending_dowork_count_(0),
248 weak_factory_(this) {
249 DCHECK(main_task_runner
->RunsTasksOnCurrentThread());
250 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
251 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
254 task_queue_manager_weak_ptr_
= weak_factory_
.GetWeakPtr();
255 for (size_t i
= 0; i
< task_queue_count
; i
++) {
256 scoped_refptr
<internal::TaskQueue
> queue(
257 make_scoped_refptr(new internal::TaskQueue(this)));
258 queues_
.push_back(queue
);
261 std::vector
<const base::TaskQueue
*> work_queues
;
262 for (const auto& queue
: queues_
)
263 work_queues
.push_back(&queue
->work_queue());
264 selector_
->RegisterWorkQueues(work_queues
);
267 TaskQueueManager::~TaskQueueManager() {
268 TRACE_EVENT_OBJECT_DELETED_WITH_ID(
269 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
271 for (auto& queue
: queues_
)
272 queue
->WillDeleteTaskQueueManager();
275 internal::TaskQueue
* TaskQueueManager::Queue(size_t queue_index
) const {
276 DCHECK_LT(queue_index
, queues_
.size());
277 return queues_
[queue_index
].get();
280 scoped_refptr
<base::SingleThreadTaskRunner
>
281 TaskQueueManager::TaskRunnerForQueue(size_t queue_index
) const {
282 return Queue(queue_index
);
285 bool TaskQueueManager::IsQueueEmpty(size_t queue_index
) const {
286 internal::TaskQueue
* queue
= Queue(queue_index
);
287 return queue
->IsQueueEmpty();
290 void TaskQueueManager::SetAutoPump(size_t queue_index
, bool auto_pump
) {
291 main_thread_checker_
.CalledOnValidThread();
292 internal::TaskQueue
* queue
= Queue(queue_index
);
293 queue
->SetAutoPump(auto_pump
);
296 void TaskQueueManager::PumpQueue(size_t queue_index
) {
297 main_thread_checker_
.CalledOnValidThread();
298 internal::TaskQueue
* queue
= Queue(queue_index
);
302 bool TaskQueueManager::UpdateWorkQueues() {
303 // TODO(skyostil): This is not efficient when the number of queues grows very
304 // large due to the number of locks taken. Consider optimizing when we get
306 main_thread_checker_
.CalledOnValidThread();
307 bool has_work
= false;
308 for (auto& queue
: queues_
)
309 has_work
|= queue
->UpdateWorkQueue();
313 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
314 bool on_main_thread
= main_task_runner_
->BelongsToCurrentThread();
315 if (on_main_thread
) {
316 // We only want one pending DoWork posted from the main thread, or we risk
317 // an explosion of pending DoWorks which could starve out everything else.
318 if (pending_dowork_count_
> 0) {
321 pending_dowork_count_
++;
324 main_task_runner_
->PostTask(
325 FROM_HERE
, Bind(&TaskQueueManager::DoWork
, task_queue_manager_weak_ptr_
,
329 void TaskQueueManager::DoWork(bool posted_from_main_thread
) {
330 if (posted_from_main_thread
) {
331 pending_dowork_count_
--;
332 DCHECK_GE(pending_dowork_count_
, 0);
334 main_thread_checker_
.CalledOnValidThread();
335 if (!UpdateWorkQueues())
339 if (!SelectWorkQueueToService(&queue_index
))
341 MaybePostDoWorkOnMainRunner();
342 ProcessTaskFromWorkQueue(queue_index
);
345 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index
) {
346 bool should_run
= selector_
->SelectWorkQueueToService(out_queue_index
);
347 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
348 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
349 AsValueWithSelectorResult(should_run
, *out_queue_index
));
353 void TaskQueueManager::DidQueueTask(base::PendingTask
* pending_task
) {
354 pending_task
->sequence_num
= task_sequence_num_
.GetNext();
355 task_annotator_
.DidQueueTask("TaskQueueManager::PostTask", *pending_task
);
358 void TaskQueueManager::ProcessTaskFromWorkQueue(size_t queue_index
) {
359 main_thread_checker_
.CalledOnValidThread();
360 internal::TaskQueue
* queue
= Queue(queue_index
);
361 base::PendingTask pending_task
= queue
->TakeTaskFromWorkQueue();
362 if (!pending_task
.nestable
) {
363 // Defer non-nestable work to the main task runner. NOTE these tasks can be
364 // arbitrarily delayed so the additional delay should not be a problem.
365 main_task_runner_
->PostNonNestableTask(pending_task
.posted_from
,
368 task_annotator_
.RunTask("TaskQueueManager::PostTask",
369 "TaskQueueManager::RunTask", pending_task
);
373 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
374 return main_task_runner_
->RunsTasksOnCurrentThread();
377 bool TaskQueueManager::PostDelayedTask(
378 const tracked_objects::Location
& from_here
,
379 const base::Closure
& task
,
380 base::TimeDelta delay
) {
381 DCHECK(delay
> base::TimeDelta());
382 return main_task_runner_
->PostDelayedTask(from_here
, task
, delay
);
385 void TaskQueueManager::SetQueueName(size_t queue_index
, const char* name
) {
386 main_thread_checker_
.CalledOnValidThread();
387 internal::TaskQueue
* queue
= Queue(queue_index
);
388 queue
->set_name(name
);
391 scoped_refptr
<base::debug::ConvertableToTraceFormat
>
392 TaskQueueManager::AsValueWithSelectorResult(bool should_run
,
393 size_t selected_queue
) const {
394 main_thread_checker_
.CalledOnValidThread();
395 scoped_refptr
<base::debug::TracedValue
> state
=
396 new base::debug::TracedValue();
397 state
->BeginArray("queues");
398 for (auto& queue
: queues_
)
399 queue
->AsValueInto(state
.get());
401 state
->BeginDictionary("selector");
402 selector_
->AsValueInto(state
.get());
403 state
->EndDictionary();
405 state
->SetInteger("selected_queue", selected_queue
);
409 } // namespace content