1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/scheduler/child/task_queue_impl.h"
7 #include "components/scheduler/child/task_queue_manager.h"
12 TaskQueueImpl::TaskQueueImpl(
13 TaskQueueManager
* task_queue_manager
,
15 const char* disabled_by_default_tracing_category
,
16 const char* disabled_by_default_verbose_tracing_category
)
17 : thread_id_(base::PlatformThread::CurrentId()),
18 task_queue_manager_(task_queue_manager
),
19 pump_policy_(spec
.pump_policy
),
20 delayed_task_sequence_number_(0),
22 disabled_by_default_tracing_category_(
23 disabled_by_default_tracing_category
),
24 disabled_by_default_verbose_tracing_category_(
25 disabled_by_default_verbose_tracing_category
),
26 wakeup_policy_(spec
.wakeup_policy
),
28 should_monitor_quiescence_(spec
.should_monitor_quiescence
),
29 should_notify_observers_(spec
.should_notify_observers
) {}
31 TaskQueueImpl::~TaskQueueImpl() {}
33 void TaskQueueImpl::WillDeleteTaskQueueManager() {
34 base::AutoLock
lock(lock_
);
35 task_queue_manager_
= nullptr;
36 delayed_task_queue_
= base::DelayedTaskQueue();
37 incoming_queue_
= base::TaskQueue();
38 work_queue_
= base::TaskQueue();
41 bool TaskQueueImpl::RunsTasksOnCurrentThread() const {
42 base::AutoLock
lock(lock_
);
43 return base::PlatformThread::CurrentId() == thread_id_
;
46 bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location
& from_here
,
47 const base::Closure
& task
,
48 base::TimeDelta delay
) {
49 return PostDelayedTaskImpl(from_here
, task
, delay
, TaskType::NORMAL
);
52 bool TaskQueueImpl::PostNonNestableDelayedTask(
53 const tracked_objects::Location
& from_here
,
54 const base::Closure
& task
,
55 base::TimeDelta delay
) {
56 return PostDelayedTaskImpl(from_here
, task
, delay
, TaskType::NON_NESTABLE
);
59 bool TaskQueueImpl::PostDelayedTaskAt(
60 const tracked_objects::Location
& from_here
,
61 const base::Closure
& task
,
62 base::TimeTicks desired_run_time
) {
63 base::AutoLock
lock(lock_
);
64 if (!task_queue_manager_
)
66 LazyNow
lazy_now(task_queue_manager_
);
67 return PostDelayedTaskLocked(&lazy_now
, from_here
, task
, desired_run_time
,
71 bool TaskQueueImpl::PostDelayedTaskImpl(
72 const tracked_objects::Location
& from_here
,
73 const base::Closure
& task
,
74 base::TimeDelta delay
,
76 base::AutoLock
lock(lock_
);
77 if (!task_queue_manager_
)
79 LazyNow
lazy_now(task_queue_manager_
);
80 base::TimeTicks desired_run_time
;
81 if (delay
> base::TimeDelta())
82 desired_run_time
= lazy_now
.Now() + delay
;
83 return PostDelayedTaskLocked(&lazy_now
, from_here
, task
, desired_run_time
,
87 bool TaskQueueImpl::PostDelayedTaskLocked(
89 const tracked_objects::Location
& from_here
,
90 const base::Closure
& task
,
91 base::TimeTicks desired_run_time
,
93 lock_
.AssertAcquired();
94 DCHECK(task_queue_manager_
);
96 base::PendingTask
pending_task(from_here
, task
, base::TimeTicks(),
97 task_type
!= TaskType::NON_NESTABLE
);
98 task_queue_manager_
->DidQueueTask(pending_task
);
100 if (!desired_run_time
.is_null()) {
101 pending_task
.delayed_run_time
= std::max(lazy_now
->Now(), desired_run_time
);
102 pending_task
.sequence_num
= delayed_task_sequence_number_
++;
103 delayed_task_queue_
.push(pending_task
);
104 TraceQueueSize(true);
105 // If we changed the topmost task, then it is time to reschedule.
106 if (delayed_task_queue_
.top().task
.Equals(pending_task
.task
))
107 ScheduleDelayedWorkLocked(lazy_now
);
110 EnqueueTaskLocked(pending_task
);
114 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue() {
115 DCHECK(main_thread_checker_
.CalledOnValidThread());
116 base::AutoLock
lock(lock_
);
117 if (!task_queue_manager_
)
120 LazyNow
lazy_now(task_queue_manager_
);
121 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now
);
124 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueueLocked(
126 lock_
.AssertAcquired();
127 // Enqueue all delayed tasks that should be running now.
128 while (!delayed_task_queue_
.empty() &&
129 delayed_task_queue_
.top().delayed_run_time
<= lazy_now
->Now()) {
130 in_flight_kick_delayed_tasks_
.erase(
131 delayed_task_queue_
.top().delayed_run_time
);
132 EnqueueTaskLocked(delayed_task_queue_
.top());
133 delayed_task_queue_
.pop();
135 TraceQueueSize(true);
136 ScheduleDelayedWorkLocked(lazy_now
);
139 void TaskQueueImpl::ScheduleDelayedWorkLocked(LazyNow
* lazy_now
) {
140 lock_
.AssertAcquired();
141 // Any remaining tasks are in the future, so queue a task to kick them.
142 if (!delayed_task_queue_
.empty()) {
143 base::TimeTicks next_run_time
= delayed_task_queue_
.top().delayed_run_time
;
144 DCHECK_GE(next_run_time
, lazy_now
->Now());
145 // Make sure we don't have more than one
146 // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled
147 // run time (note it's fine to have multiple ones in flight for distinct
149 if (in_flight_kick_delayed_tasks_
.find(next_run_time
) ==
150 in_flight_kick_delayed_tasks_
.end()) {
151 in_flight_kick_delayed_tasks_
.insert(next_run_time
);
152 base::TimeDelta delay
= next_run_time
- lazy_now
->Now();
153 task_queue_manager_
->PostDelayedTask(
155 Bind(&TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue
, this),
161 bool TaskQueueImpl::IsQueueEnabled() const {
162 DCHECK(main_thread_checker_
.CalledOnValidThread());
163 if (!task_queue_manager_
)
166 return task_queue_manager_
->selector_
.IsQueueEnabled(this);
169 TaskQueue::QueueState
TaskQueueImpl::GetQueueState() const {
170 DCHECK(main_thread_checker_
.CalledOnValidThread());
171 if (!work_queue_
.empty())
172 return QueueState::HAS_WORK
;
175 base::AutoLock
lock(lock_
);
176 if (incoming_queue_
.empty()) {
177 return QueueState::EMPTY
;
179 return QueueState::NEEDS_PUMPING
;
184 bool TaskQueueImpl::TaskIsOlderThanQueuedTasks(const base::PendingTask
* task
) {
185 lock_
.AssertAcquired();
186 // A null task is passed when UpdateQueue is called before any task is run.
187 // In this case we don't want to pump an after_wakeup queue, so return true
192 // Return false if there are no task in the incoming queue.
193 if (incoming_queue_
.empty())
196 base::PendingTask oldest_queued_task
= incoming_queue_
.front();
197 DCHECK(oldest_queued_task
.delayed_run_time
.is_null());
198 DCHECK(task
->delayed_run_time
.is_null());
200 // Note: the comparison is correct due to the fact that the PendingTask
201 // operator inverts its comparison operation in order to work well in a heap
202 // based priority queue.
203 return oldest_queued_task
< *task
;
206 bool TaskQueueImpl::ShouldAutoPumpQueueLocked(
207 bool should_trigger_wakeup
,
208 const base::PendingTask
* previous_task
) {
209 lock_
.AssertAcquired();
210 if (pump_policy_
== PumpPolicy::MANUAL
)
212 if (pump_policy_
== PumpPolicy::AFTER_WAKEUP
&&
213 (!should_trigger_wakeup
|| TaskIsOlderThanQueuedTasks(previous_task
)))
215 if (incoming_queue_
.empty())
220 bool TaskQueueImpl::NextPendingDelayedTaskRunTime(
221 base::TimeTicks
* next_pending_delayed_task
) {
222 base::AutoLock
lock(lock_
);
223 if (delayed_task_queue_
.empty())
225 *next_pending_delayed_task
= delayed_task_queue_
.top().delayed_run_time
;
229 void TaskQueueImpl::UpdateWorkQueue(LazyNow
* lazy_now
,
230 bool should_trigger_wakeup
,
231 const base::PendingTask
* previous_task
) {
232 DCHECK(work_queue_
.empty());
233 base::AutoLock
lock(lock_
);
234 if (!ShouldAutoPumpQueueLocked(should_trigger_wakeup
, previous_task
))
236 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now
);
237 work_queue_
.Swap(&incoming_queue_
);
238 // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no
239 // longer needs to consider this queue for reloading.
240 task_queue_manager_
->UnregisterAsUpdatableTaskQueue(this);
241 if (!work_queue_
.empty()) {
242 DCHECK(task_queue_manager_
);
243 task_queue_manager_
->selector_
.GetTaskQueueSets()->OnPushQueue(this);
244 TraceQueueSize(true);
248 base::PendingTask
TaskQueueImpl::TakeTaskFromWorkQueue() {
249 base::PendingTask pending_task
= work_queue_
.front();
251 DCHECK(task_queue_manager_
);
252 task_queue_manager_
->selector_
.GetTaskQueueSets()->OnPopQueue(this);
253 TraceQueueSize(false);
257 void TaskQueueImpl::TraceQueueSize(bool is_locked
) const {
259 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_
,
266 lock_
.AssertAcquired();
268 disabled_by_default_tracing_category_
, GetName(),
269 incoming_queue_
.size() + work_queue_
.size() + delayed_task_queue_
.size());
274 void TaskQueueImpl::EnqueueTaskLocked(const base::PendingTask
& pending_task
) {
275 lock_
.AssertAcquired();
276 if (!task_queue_manager_
)
278 if (incoming_queue_
.empty())
279 task_queue_manager_
->RegisterAsUpdatableTaskQueue(this);
280 if (pump_policy_
== PumpPolicy::AUTO
&& incoming_queue_
.empty())
281 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
282 incoming_queue_
.push(pending_task
);
283 incoming_queue_
.back().sequence_num
=
284 task_queue_manager_
->GetNextSequenceNumber();
286 if (!pending_task
.delayed_run_time
.is_null()) {
287 // Clear the delayed run time because we've already applied the delay
288 // before getting here.
289 incoming_queue_
.back().delayed_run_time
= base::TimeTicks();
291 TraceQueueSize(true);
294 void TaskQueueImpl::SetPumpPolicy(PumpPolicy pump_policy
) {
295 base::AutoLock
lock(lock_
);
296 if (pump_policy
== PumpPolicy::AUTO
&& pump_policy_
!= PumpPolicy::AUTO
) {
299 pump_policy_
= pump_policy
;
302 void TaskQueueImpl::PumpQueueLocked() {
303 lock_
.AssertAcquired();
304 if (!task_queue_manager_
)
307 LazyNow
lazy_now(task_queue_manager_
);
308 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now
);
310 bool was_empty
= work_queue_
.empty();
311 while (!incoming_queue_
.empty()) {
312 work_queue_
.push(incoming_queue_
.front());
313 incoming_queue_
.pop();
315 // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no longer
316 // needs to consider this queue for reloading.
317 task_queue_manager_
->UnregisterAsUpdatableTaskQueue(this);
318 if (!work_queue_
.empty()) {
320 task_queue_manager_
->selector_
.GetTaskQueueSets()->OnPushQueue(this);
321 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
325 void TaskQueueImpl::PumpQueue() {
326 base::AutoLock
lock(lock_
);
330 const char* TaskQueueImpl::GetName() const {
334 bool TaskQueueImpl::GetWorkQueueFrontTaskAge(int* age
) const {
335 if (work_queue_
.empty())
337 *age
= work_queue_
.front().sequence_num
;
341 void TaskQueueImpl::PushTaskOntoWorkQueueForTest(
342 const base::PendingTask
& task
) {
343 work_queue_
.push(task
);
346 void TaskQueueImpl::PopTaskFromWorkQueueForTest() {
350 void TaskQueueImpl::SetQueuePriority(QueuePriority priority
) {
351 DCHECK(main_thread_checker_
.CalledOnValidThread());
352 if (!task_queue_manager_
)
355 task_queue_manager_
->selector_
.SetQueuePriority(this, priority
);
359 const char* TaskQueueImpl::PumpPolicyToString(
360 TaskQueue::PumpPolicy pump_policy
) {
361 switch (pump_policy
) {
362 case TaskQueue::PumpPolicy::AUTO
:
364 case TaskQueue::PumpPolicy::AFTER_WAKEUP
:
365 return "after_wakeup";
366 case TaskQueue::PumpPolicy::MANUAL
:
375 const char* TaskQueueImpl::WakeupPolicyToString(
376 TaskQueue::WakeupPolicy wakeup_policy
) {
377 switch (wakeup_policy
) {
378 case TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES
:
379 return "can_wake_other_queues";
380 case TaskQueue::WakeupPolicy::DONT_WAKE_OTHER_QUEUES
:
381 return "dont_wake_other_queues";
389 const char* TaskQueueImpl::PriorityToString(QueuePriority priority
) {
391 case CONTROL_PRIORITY
:
395 case NORMAL_PRIORITY
:
397 case BEST_EFFORT_PRIORITY
:
398 return "best_effort";
399 case DISABLED_PRIORITY
:
407 void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue
* state
) const {
408 base::AutoLock
lock(lock_
);
409 state
->BeginDictionary();
410 state
->SetString("name", GetName());
411 state
->SetString("pump_policy", PumpPolicyToString(pump_policy_
));
412 state
->SetString("wakeup_policy", WakeupPolicyToString(wakeup_policy_
));
413 bool verbose_tracing_enabled
= false;
414 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
415 disabled_by_default_verbose_tracing_category_
, &verbose_tracing_enabled
);
416 state
->SetInteger("incoming_queue_size", incoming_queue_
.size());
417 state
->SetInteger("work_queue_size", work_queue_
.size());
418 state
->SetInteger("delayed_task_queue_size", delayed_task_queue_
.size());
419 if (verbose_tracing_enabled
) {
420 state
->BeginArray("incoming_queue");
421 QueueAsValueInto(incoming_queue_
, state
);
423 state
->BeginArray("work_queue");
424 QueueAsValueInto(work_queue_
, state
);
426 state
->BeginArray("delayed_task_queue");
427 QueueAsValueInto(delayed_task_queue_
, state
);
430 state
->SetString("priority",
431 PriorityToString(static_cast<QueuePriority
>(set_index_
)));
432 state
->EndDictionary();
436 void TaskQueueImpl::QueueAsValueInto(const base::TaskQueue
& queue
,
437 base::trace_event::TracedValue
* state
) {
438 base::TaskQueue
queue_copy(queue
);
439 while (!queue_copy
.empty()) {
440 TaskAsValueInto(queue_copy
.front(), state
);
446 void TaskQueueImpl::QueueAsValueInto(const base::DelayedTaskQueue
& queue
,
447 base::trace_event::TracedValue
* state
) {
448 base::DelayedTaskQueue
queue_copy(queue
);
449 while (!queue_copy
.empty()) {
450 TaskAsValueInto(queue_copy
.top(), state
);
456 void TaskQueueImpl::TaskAsValueInto(const base::PendingTask
& task
,
457 base::trace_event::TracedValue
* state
) {
458 state
->BeginDictionary();
459 state
->SetString("posted_from", task
.posted_from
.ToString());
460 state
->SetInteger("sequence_num", task
.sequence_num
);
461 state
->SetBoolean("nestable", task
.nestable
);
462 state
->SetBoolean("is_high_res", task
.is_high_res
);
465 (task
.delayed_run_time
- base::TimeTicks()).InMicroseconds() / 1000.0L);
466 state
->EndDictionary();
469 } // namespace internal
470 } // namespace scheduler