Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / components / scheduler / child / task_queue_impl.cc
blob556332b2f9b98cadddff2e0b94e3fe0b31d1466a
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/scheduler/child/task_queue_impl.h"
7 #include "components/scheduler/child/task_queue_manager.h"
9 namespace scheduler {
10 namespace internal {
12 TaskQueueImpl::TaskQueueImpl(
13 TaskQueueManager* task_queue_manager,
14 const Spec& spec,
15 const char* disabled_by_default_tracing_category,
16 const char* disabled_by_default_verbose_tracing_category)
17 : thread_id_(base::PlatformThread::CurrentId()),
18 task_queue_manager_(task_queue_manager),
19 pump_policy_(spec.pump_policy),
20 name_(spec.name),
21 disabled_by_default_tracing_category_(
22 disabled_by_default_tracing_category),
23 disabled_by_default_verbose_tracing_category_(
24 disabled_by_default_verbose_tracing_category),
25 wakeup_policy_(spec.wakeup_policy),
26 set_index_(0),
27 should_monitor_quiescence_(spec.should_monitor_quiescence),
28 should_notify_observers_(spec.should_notify_observers) {}
30 TaskQueueImpl::~TaskQueueImpl() {}
32 TaskQueueImpl::Task::Task()
33 : PendingTask(tracked_objects::Location(),
34 base::Closure(),
35 base::TimeTicks(),
36 true),
37 #ifndef NDEBUG
38 enqueue_order_set_(false),
39 #endif
40 enqueue_order_(0) {
41 sequence_num = 0;
44 TaskQueueImpl::Task::Task(const tracked_objects::Location& posted_from,
45 const base::Closure& task,
46 int sequence_number,
47 bool nestable)
48 : PendingTask(posted_from, task, base::TimeTicks(), nestable),
49 #ifndef NDEBUG
50 enqueue_order_set_(false),
51 #endif
52 enqueue_order_(0) {
53 sequence_num = sequence_number;
56 void TaskQueueImpl::UnregisterTaskQueue() {
57 if (!task_queue_manager_)
58 return;
59 task_queue_manager_->UnregisterTaskQueue(make_scoped_refptr(this));
62 base::AutoLock lock(lock_);
63 task_queue_manager_ = nullptr;
64 delayed_task_queue_ = std::priority_queue<Task>();
65 incoming_queue_ = std::queue<Task>();
66 work_queue_ = std::queue<Task>();
70 bool TaskQueueImpl::RunsTasksOnCurrentThread() const {
71 base::AutoLock lock(lock_);
72 return base::PlatformThread::CurrentId() == thread_id_;
75 bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location& from_here,
76 const base::Closure& task,
77 base::TimeDelta delay) {
78 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL);
81 bool TaskQueueImpl::PostNonNestableDelayedTask(
82 const tracked_objects::Location& from_here,
83 const base::Closure& task,
84 base::TimeDelta delay) {
85 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE);
88 bool TaskQueueImpl::PostDelayedTaskAt(
89 const tracked_objects::Location& from_here,
90 const base::Closure& task,
91 base::TimeTicks desired_run_time) {
92 base::AutoLock lock(lock_);
93 if (!task_queue_manager_)
94 return false;
95 LazyNow lazy_now(task_queue_manager_);
96 return PostDelayedTaskLocked(&lazy_now, from_here, task, desired_run_time,
97 TaskType::NORMAL);
100 bool TaskQueueImpl::PostDelayedTaskImpl(
101 const tracked_objects::Location& from_here,
102 const base::Closure& task,
103 base::TimeDelta delay,
104 TaskType task_type) {
105 base::AutoLock lock(lock_);
106 if (!task_queue_manager_)
107 return false;
108 LazyNow lazy_now(task_queue_manager_);
109 base::TimeTicks desired_run_time;
110 if (delay > base::TimeDelta())
111 desired_run_time = lazy_now.Now() + delay;
112 return PostDelayedTaskLocked(&lazy_now, from_here, task, desired_run_time,
113 task_type);
116 bool TaskQueueImpl::PostDelayedTaskLocked(
117 LazyNow* lazy_now,
118 const tracked_objects::Location& from_here,
119 const base::Closure& task,
120 base::TimeTicks desired_run_time,
121 TaskType task_type) {
122 lock_.AssertAcquired();
123 DCHECK(task_queue_manager_);
124 Task pending_task(from_here, task,
125 task_queue_manager_->GetNextSequenceNumber(),
126 task_type != TaskType::NON_NESTABLE);
127 task_queue_manager_->DidQueueTask(pending_task);
129 if (!desired_run_time.is_null()) {
130 pending_task.delayed_run_time = std::max(lazy_now->Now(), desired_run_time);
131 // TODO(alexclarke): consider emplace() when C++11 library features allowed.
132 delayed_task_queue_.push(pending_task);
133 TraceQueueSize(true);
134 // Schedule a later call to MoveReadyDelayedTasksToIncomingQueue.
135 task_queue_manager_->ScheduleDelayedWork(this, desired_run_time, lazy_now);
136 return true;
138 pending_task.set_enqueue_order(pending_task.sequence_num);
139 EnqueueTaskLocked(pending_task);
140 return true;
143 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue(LazyNow* lazy_now) {
144 base::AutoLock lock(lock_);
145 if (!task_queue_manager_)
146 return;
148 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
151 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueueLocked(
152 LazyNow* lazy_now) {
153 lock_.AssertAcquired();
154 // Enqueue all delayed tasks that should be running now.
155 while (!delayed_task_queue_.empty() &&
156 delayed_task_queue_.top().delayed_run_time <= lazy_now->Now()) {
157 // TODO(alexclarke): consider std::move() when allowed.
158 EnqueueDelayedTaskLocked(delayed_task_queue_.top());
159 delayed_task_queue_.pop();
161 TraceQueueSize(true);
164 bool TaskQueueImpl::IsQueueEnabled() const {
165 DCHECK(main_thread_checker_.CalledOnValidThread());
166 if (!task_queue_manager_)
167 return false;
169 return task_queue_manager_->selector_.IsQueueEnabled(this);
172 TaskQueue::QueueState TaskQueueImpl::GetQueueState() const {
173 DCHECK(main_thread_checker_.CalledOnValidThread());
174 if (!work_queue_.empty())
175 return QueueState::HAS_WORK;
178 base::AutoLock lock(lock_);
179 if (incoming_queue_.empty()) {
180 return QueueState::EMPTY;
181 } else {
182 return QueueState::NEEDS_PUMPING;
187 bool TaskQueueImpl::TaskIsOlderThanQueuedTasks(const Task* task) {
188 lock_.AssertAcquired();
189 // A null task is passed when UpdateQueue is called before any task is run.
190 // In this case we don't want to pump an after_wakeup queue, so return true
191 // here.
192 if (!task)
193 return true;
195 // Return false if there are no task in the incoming queue.
196 if (incoming_queue_.empty())
197 return false;
199 const TaskQueueImpl::Task& oldest_queued_task = incoming_queue_.front();
200 return task->enqueue_order() < oldest_queued_task.enqueue_order();
203 bool TaskQueueImpl::ShouldAutoPumpQueueLocked(bool should_trigger_wakeup,
204 const Task* previous_task) {
205 lock_.AssertAcquired();
206 if (pump_policy_ == PumpPolicy::MANUAL)
207 return false;
208 if (pump_policy_ == PumpPolicy::AFTER_WAKEUP &&
209 (!should_trigger_wakeup || TaskIsOlderThanQueuedTasks(previous_task)))
210 return false;
211 if (incoming_queue_.empty())
212 return false;
213 return true;
216 bool TaskQueueImpl::NextPendingDelayedTaskRunTime(
217 base::TimeTicks* next_pending_delayed_task) {
218 base::AutoLock lock(lock_);
219 if (delayed_task_queue_.empty())
220 return false;
221 *next_pending_delayed_task = delayed_task_queue_.top().delayed_run_time;
222 return true;
225 void TaskQueueImpl::UpdateWorkQueue(LazyNow* lazy_now,
226 bool should_trigger_wakeup,
227 const Task* previous_task) {
228 DCHECK(work_queue_.empty());
229 base::AutoLock lock(lock_);
230 if (!ShouldAutoPumpQueueLocked(should_trigger_wakeup, previous_task))
231 return;
232 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now);
233 std::swap(work_queue_, incoming_queue_);
234 // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no
235 // longer needs to consider this queue for reloading.
236 task_queue_manager_->UnregisterAsUpdatableTaskQueue(this);
237 if (!work_queue_.empty()) {
238 DCHECK(task_queue_manager_);
239 task_queue_manager_->selector_.GetTaskQueueSets()->OnPushQueue(this);
240 TraceQueueSize(true);
244 TaskQueueImpl::Task TaskQueueImpl::TakeTaskFromWorkQueue() {
245 // TODO(alexclarke): consider std::move() when allowed.
246 Task pending_task = work_queue_.front();
247 work_queue_.pop();
248 DCHECK(task_queue_manager_);
249 task_queue_manager_->selector_.GetTaskQueueSets()->OnPopQueue(this);
250 TraceQueueSize(false);
251 return pending_task;
254 void TaskQueueImpl::TraceQueueSize(bool is_locked) const {
255 bool is_tracing;
256 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_,
257 &is_tracing);
258 if (!is_tracing)
259 return;
260 if (!is_locked)
261 lock_.Acquire();
262 else
263 lock_.AssertAcquired();
264 TRACE_COUNTER1(
265 disabled_by_default_tracing_category_, GetName(),
266 incoming_queue_.size() + work_queue_.size() + delayed_task_queue_.size());
267 if (!is_locked)
268 lock_.Release();
271 void TaskQueueImpl::EnqueueTaskLocked(const Task& pending_task) {
272 lock_.AssertAcquired();
273 if (!task_queue_manager_)
274 return;
275 if (incoming_queue_.empty())
276 task_queue_manager_->RegisterAsUpdatableTaskQueue(this);
277 if (pump_policy_ == PumpPolicy::AUTO && incoming_queue_.empty()) {
278 task_queue_manager_->MaybePostDoWorkOnMainRunner();
280 // TODO(alexclarke): consider std::move() when allowed.
281 incoming_queue_.push(pending_task);
282 TraceQueueSize(true);
285 void TaskQueueImpl::EnqueueDelayedTaskLocked(const Task& pending_task) {
286 lock_.AssertAcquired();
287 if (!task_queue_manager_)
288 return;
289 if (incoming_queue_.empty())
290 task_queue_manager_->RegisterAsUpdatableTaskQueue(this);
291 // TODO(alexclarke): consider std::move() when allowed.
292 incoming_queue_.push(pending_task);
293 incoming_queue_.back().set_enqueue_order(
294 task_queue_manager_->GetNextSequenceNumber());
295 TraceQueueSize(true);
298 void TaskQueueImpl::SetPumpPolicy(PumpPolicy pump_policy) {
299 base::AutoLock lock(lock_);
300 if (pump_policy == PumpPolicy::AUTO && pump_policy_ != PumpPolicy::AUTO) {
301 PumpQueueLocked();
303 pump_policy_ = pump_policy;
306 void TaskQueueImpl::PumpQueueLocked() {
307 lock_.AssertAcquired();
308 if (!task_queue_manager_)
309 return;
311 LazyNow lazy_now(task_queue_manager_);
312 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now);
314 bool was_empty = work_queue_.empty();
315 while (!incoming_queue_.empty()) {
316 // TODO(alexclarke): consider std::move() when allowed.
317 work_queue_.push(incoming_queue_.front());
318 incoming_queue_.pop();
320 // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no longer
321 // needs to consider this queue for reloading.
322 task_queue_manager_->UnregisterAsUpdatableTaskQueue(this);
323 if (!work_queue_.empty()) {
324 if (was_empty)
325 task_queue_manager_->selector_.GetTaskQueueSets()->OnPushQueue(this);
326 task_queue_manager_->MaybePostDoWorkOnMainRunner();
330 void TaskQueueImpl::PumpQueue() {
331 base::AutoLock lock(lock_);
332 PumpQueueLocked();
335 const char* TaskQueueImpl::GetName() const {
336 return name_;
339 bool TaskQueueImpl::GetWorkQueueFrontTaskEnqueueOrder(
340 int* enqueue_order) const {
341 if (work_queue_.empty())
342 return false;
343 *enqueue_order = work_queue_.front().enqueue_order();
344 return true;
347 void TaskQueueImpl::PushTaskOntoWorkQueueForTest(const Task& task) {
348 work_queue_.push(task);
351 void TaskQueueImpl::PopTaskFromWorkQueueForTest() {
352 work_queue_.pop();
355 void TaskQueueImpl::SetQueuePriority(QueuePriority priority) {
356 DCHECK(main_thread_checker_.CalledOnValidThread());
357 if (!task_queue_manager_)
358 return;
360 task_queue_manager_->selector_.SetQueuePriority(this, priority);
363 // static
364 const char* TaskQueueImpl::PumpPolicyToString(
365 TaskQueue::PumpPolicy pump_policy) {
366 switch (pump_policy) {
367 case TaskQueue::PumpPolicy::AUTO:
368 return "auto";
369 case TaskQueue::PumpPolicy::AFTER_WAKEUP:
370 return "after_wakeup";
371 case TaskQueue::PumpPolicy::MANUAL:
372 return "manual";
373 default:
374 NOTREACHED();
375 return nullptr;
379 // static
380 const char* TaskQueueImpl::WakeupPolicyToString(
381 TaskQueue::WakeupPolicy wakeup_policy) {
382 switch (wakeup_policy) {
383 case TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES:
384 return "can_wake_other_queues";
385 case TaskQueue::WakeupPolicy::DONT_WAKE_OTHER_QUEUES:
386 return "dont_wake_other_queues";
387 default:
388 NOTREACHED();
389 return nullptr;
393 // static
394 const char* TaskQueueImpl::PriorityToString(QueuePriority priority) {
395 switch (priority) {
396 case CONTROL_PRIORITY:
397 return "control";
398 case HIGH_PRIORITY:
399 return "high";
400 case NORMAL_PRIORITY:
401 return "normal";
402 case BEST_EFFORT_PRIORITY:
403 return "best_effort";
404 case DISABLED_PRIORITY:
405 return "disabled";
406 default:
407 NOTREACHED();
408 return nullptr;
412 void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue* state) const {
413 base::AutoLock lock(lock_);
414 state->BeginDictionary();
415 state->SetString("name", GetName());
416 state->SetString("pump_policy", PumpPolicyToString(pump_policy_));
417 state->SetString("wakeup_policy", WakeupPolicyToString(wakeup_policy_));
418 bool verbose_tracing_enabled = false;
419 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
420 disabled_by_default_verbose_tracing_category_, &verbose_tracing_enabled);
421 state->SetInteger("incoming_queue_size", incoming_queue_.size());
422 state->SetInteger("work_queue_size", work_queue_.size());
423 state->SetInteger("delayed_task_queue_size", delayed_task_queue_.size());
424 if (verbose_tracing_enabled) {
425 state->BeginArray("incoming_queue");
426 QueueAsValueInto(incoming_queue_, state);
427 state->EndArray();
428 state->BeginArray("work_queue");
429 QueueAsValueInto(work_queue_, state);
430 state->EndArray();
431 state->BeginArray("delayed_task_queue");
432 QueueAsValueInto(delayed_task_queue_, state);
433 state->EndArray();
435 state->SetString("priority",
436 PriorityToString(static_cast<QueuePriority>(set_index_)));
437 state->EndDictionary();
440 void TaskQueueImpl::AddTaskObserver(
441 base::MessageLoop::TaskObserver* task_observer) {
442 DCHECK(main_thread_checker_.CalledOnValidThread());
443 task_observers_.AddObserver(task_observer);
446 void TaskQueueImpl::RemoveTaskObserver(
447 base::MessageLoop::TaskObserver* task_observer) {
448 DCHECK(main_thread_checker_.CalledOnValidThread());
449 task_observers_.RemoveObserver(task_observer);
452 void TaskQueueImpl::NotifyWillProcessTask(
453 const base::PendingTask& pending_task) {
454 DCHECK(main_thread_checker_.CalledOnValidThread());
455 DCHECK(should_notify_observers_);
456 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
457 WillProcessTask(pending_task));
460 void TaskQueueImpl::NotifyDidProcessTask(
461 const base::PendingTask& pending_task) {
462 DCHECK(main_thread_checker_.CalledOnValidThread());
463 DCHECK(should_notify_observers_);
464 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
465 DidProcessTask(pending_task));
468 // static
469 void TaskQueueImpl::QueueAsValueInto(const std::queue<Task>& queue,
470 base::trace_event::TracedValue* state) {
471 std::queue<Task> queue_copy(queue);
472 while (!queue_copy.empty()) {
473 TaskAsValueInto(queue_copy.front(), state);
474 queue_copy.pop();
478 // static
479 void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue,
480 base::trace_event::TracedValue* state) {
481 std::priority_queue<Task> queue_copy(queue);
482 while (!queue_copy.empty()) {
483 TaskAsValueInto(queue_copy.top(), state);
484 queue_copy.pop();
488 // static
489 void TaskQueueImpl::TaskAsValueInto(const Task& task,
490 base::trace_event::TracedValue* state) {
491 state->BeginDictionary();
492 state->SetString("posted_from", task.posted_from.ToString());
493 state->SetInteger("enqueue_order", task.enqueue_order());
494 state->SetInteger("sequence_num", task.sequence_num);
495 state->SetBoolean("nestable", task.nestable);
496 state->SetBoolean("is_high_res", task.is_high_res);
497 state->SetDouble(
498 "delayed_run_time",
499 (task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L);
500 state->EndDictionary();
503 } // namespace internal
504 } // namespace scheduler