1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/scheduler/child/task_queue_impl.h"
7 #include "components/scheduler/child/task_queue_manager.h"
12 TaskQueueImpl::TaskQueueImpl(
13 TaskQueueManager
* task_queue_manager
,
15 const char* disabled_by_default_tracing_category
,
16 const char* disabled_by_default_verbose_tracing_category
)
17 : thread_id_(base::PlatformThread::CurrentId()),
18 task_queue_manager_(task_queue_manager
),
19 pump_policy_(spec
.pump_policy
),
21 disabled_by_default_tracing_category_(
22 disabled_by_default_tracing_category
),
23 disabled_by_default_verbose_tracing_category_(
24 disabled_by_default_verbose_tracing_category
),
25 wakeup_policy_(spec
.wakeup_policy
),
27 should_monitor_quiescence_(spec
.should_monitor_quiescence
),
28 should_notify_observers_(spec
.should_notify_observers
) {}
30 TaskQueueImpl::~TaskQueueImpl() {}
32 TaskQueueImpl::Task::Task()
33 : PendingTask(tracked_objects::Location(),
38 enqueue_order_set_(false),
44 TaskQueueImpl::Task::Task(const tracked_objects::Location
& posted_from
,
45 const base::Closure
& task
,
48 : PendingTask(posted_from
, task
, base::TimeTicks(), nestable
),
50 enqueue_order_set_(false),
53 sequence_num
= sequence_number
;
56 void TaskQueueImpl::UnregisterTaskQueue() {
57 if (!task_queue_manager_
)
59 task_queue_manager_
->UnregisterTaskQueue(make_scoped_refptr(this));
62 base::AutoLock
lock(lock_
);
63 task_queue_manager_
= nullptr;
64 delayed_task_queue_
= std::priority_queue
<Task
>();
65 incoming_queue_
= std::queue
<Task
>();
66 work_queue_
= std::queue
<Task
>();
70 bool TaskQueueImpl::RunsTasksOnCurrentThread() const {
71 base::AutoLock
lock(lock_
);
72 return base::PlatformThread::CurrentId() == thread_id_
;
75 bool TaskQueueImpl::PostDelayedTask(const tracked_objects::Location
& from_here
,
76 const base::Closure
& task
,
77 base::TimeDelta delay
) {
78 return PostDelayedTaskImpl(from_here
, task
, delay
, TaskType::NORMAL
);
81 bool TaskQueueImpl::PostNonNestableDelayedTask(
82 const tracked_objects::Location
& from_here
,
83 const base::Closure
& task
,
84 base::TimeDelta delay
) {
85 return PostDelayedTaskImpl(from_here
, task
, delay
, TaskType::NON_NESTABLE
);
88 bool TaskQueueImpl::PostDelayedTaskAt(
89 const tracked_objects::Location
& from_here
,
90 const base::Closure
& task
,
91 base::TimeTicks desired_run_time
) {
92 base::AutoLock
lock(lock_
);
93 if (!task_queue_manager_
)
95 LazyNow
lazy_now(task_queue_manager_
);
96 return PostDelayedTaskLocked(&lazy_now
, from_here
, task
, desired_run_time
,
100 bool TaskQueueImpl::PostDelayedTaskImpl(
101 const tracked_objects::Location
& from_here
,
102 const base::Closure
& task
,
103 base::TimeDelta delay
,
104 TaskType task_type
) {
105 base::AutoLock
lock(lock_
);
106 if (!task_queue_manager_
)
108 LazyNow
lazy_now(task_queue_manager_
);
109 base::TimeTicks desired_run_time
;
110 if (delay
> base::TimeDelta())
111 desired_run_time
= lazy_now
.Now() + delay
;
112 return PostDelayedTaskLocked(&lazy_now
, from_here
, task
, desired_run_time
,
116 bool TaskQueueImpl::PostDelayedTaskLocked(
118 const tracked_objects::Location
& from_here
,
119 const base::Closure
& task
,
120 base::TimeTicks desired_run_time
,
121 TaskType task_type
) {
122 lock_
.AssertAcquired();
123 DCHECK(task_queue_manager_
);
124 Task
pending_task(from_here
, task
,
125 task_queue_manager_
->GetNextSequenceNumber(),
126 task_type
!= TaskType::NON_NESTABLE
);
127 task_queue_manager_
->DidQueueTask(pending_task
);
129 if (!desired_run_time
.is_null()) {
130 pending_task
.delayed_run_time
= std::max(lazy_now
->Now(), desired_run_time
);
131 // TODO(alexclarke): consider emplace() when C++11 library features allowed.
132 delayed_task_queue_
.push(pending_task
);
133 TraceQueueSize(true);
134 // Schedule a later call to MoveReadyDelayedTasksToIncomingQueue.
135 task_queue_manager_
->ScheduleDelayedWork(this, desired_run_time
, lazy_now
);
138 pending_task
.set_enqueue_order(pending_task
.sequence_num
);
139 EnqueueTaskLocked(pending_task
);
143 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue(LazyNow
* lazy_now
) {
144 base::AutoLock
lock(lock_
);
145 if (!task_queue_manager_
)
148 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now
);
151 void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueueLocked(
153 lock_
.AssertAcquired();
154 // Enqueue all delayed tasks that should be running now.
155 while (!delayed_task_queue_
.empty() &&
156 delayed_task_queue_
.top().delayed_run_time
<= lazy_now
->Now()) {
157 // TODO(alexclarke): consider std::move() when allowed.
158 EnqueueDelayedTaskLocked(delayed_task_queue_
.top());
159 delayed_task_queue_
.pop();
161 TraceQueueSize(true);
164 bool TaskQueueImpl::IsQueueEnabled() const {
165 DCHECK(main_thread_checker_
.CalledOnValidThread());
166 if (!task_queue_manager_
)
169 return task_queue_manager_
->selector_
.IsQueueEnabled(this);
172 TaskQueue::QueueState
TaskQueueImpl::GetQueueState() const {
173 DCHECK(main_thread_checker_
.CalledOnValidThread());
174 if (!work_queue_
.empty())
175 return QueueState::HAS_WORK
;
178 base::AutoLock
lock(lock_
);
179 if (incoming_queue_
.empty()) {
180 return QueueState::EMPTY
;
182 return QueueState::NEEDS_PUMPING
;
187 bool TaskQueueImpl::TaskIsOlderThanQueuedTasks(const Task
* task
) {
188 lock_
.AssertAcquired();
189 // A null task is passed when UpdateQueue is called before any task is run.
190 // In this case we don't want to pump an after_wakeup queue, so return true
195 // Return false if there are no task in the incoming queue.
196 if (incoming_queue_
.empty())
199 const TaskQueueImpl::Task
& oldest_queued_task
= incoming_queue_
.front();
200 return task
->enqueue_order() < oldest_queued_task
.enqueue_order();
203 bool TaskQueueImpl::ShouldAutoPumpQueueLocked(bool should_trigger_wakeup
,
204 const Task
* previous_task
) {
205 lock_
.AssertAcquired();
206 if (pump_policy_
== PumpPolicy::MANUAL
)
208 if (pump_policy_
== PumpPolicy::AFTER_WAKEUP
&&
209 (!should_trigger_wakeup
|| TaskIsOlderThanQueuedTasks(previous_task
)))
211 if (incoming_queue_
.empty())
216 bool TaskQueueImpl::NextPendingDelayedTaskRunTime(
217 base::TimeTicks
* next_pending_delayed_task
) {
218 base::AutoLock
lock(lock_
);
219 if (delayed_task_queue_
.empty())
221 *next_pending_delayed_task
= delayed_task_queue_
.top().delayed_run_time
;
225 void TaskQueueImpl::UpdateWorkQueue(LazyNow
* lazy_now
,
226 bool should_trigger_wakeup
,
227 const Task
* previous_task
) {
228 DCHECK(work_queue_
.empty());
229 base::AutoLock
lock(lock_
);
230 if (!ShouldAutoPumpQueueLocked(should_trigger_wakeup
, previous_task
))
232 MoveReadyDelayedTasksToIncomingQueueLocked(lazy_now
);
233 std::swap(work_queue_
, incoming_queue_
);
234 // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no
235 // longer needs to consider this queue for reloading.
236 task_queue_manager_
->UnregisterAsUpdatableTaskQueue(this);
237 if (!work_queue_
.empty()) {
238 DCHECK(task_queue_manager_
);
239 task_queue_manager_
->selector_
.GetTaskQueueSets()->OnPushQueue(this);
240 TraceQueueSize(true);
244 TaskQueueImpl::Task
TaskQueueImpl::TakeTaskFromWorkQueue() {
245 // TODO(alexclarke): consider std::move() when allowed.
246 Task pending_task
= work_queue_
.front();
248 DCHECK(task_queue_manager_
);
249 task_queue_manager_
->selector_
.GetTaskQueueSets()->OnPopQueue(this);
250 TraceQueueSize(false);
254 void TaskQueueImpl::TraceQueueSize(bool is_locked
) const {
256 TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_
,
263 lock_
.AssertAcquired();
265 disabled_by_default_tracing_category_
, GetName(),
266 incoming_queue_
.size() + work_queue_
.size() + delayed_task_queue_
.size());
271 void TaskQueueImpl::EnqueueTaskLocked(const Task
& pending_task
) {
272 lock_
.AssertAcquired();
273 if (!task_queue_manager_
)
275 if (incoming_queue_
.empty())
276 task_queue_manager_
->RegisterAsUpdatableTaskQueue(this);
277 if (pump_policy_
== PumpPolicy::AUTO
&& incoming_queue_
.empty()) {
278 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
280 // TODO(alexclarke): consider std::move() when allowed.
281 incoming_queue_
.push(pending_task
);
282 TraceQueueSize(true);
285 void TaskQueueImpl::EnqueueDelayedTaskLocked(const Task
& pending_task
) {
286 lock_
.AssertAcquired();
287 if (!task_queue_manager_
)
289 if (incoming_queue_
.empty())
290 task_queue_manager_
->RegisterAsUpdatableTaskQueue(this);
291 // TODO(alexclarke): consider std::move() when allowed.
292 incoming_queue_
.push(pending_task
);
293 incoming_queue_
.back().set_enqueue_order(
294 task_queue_manager_
->GetNextSequenceNumber());
295 TraceQueueSize(true);
298 void TaskQueueImpl::SetPumpPolicy(PumpPolicy pump_policy
) {
299 base::AutoLock
lock(lock_
);
300 if (pump_policy
== PumpPolicy::AUTO
&& pump_policy_
!= PumpPolicy::AUTO
) {
303 pump_policy_
= pump_policy
;
306 void TaskQueueImpl::PumpQueueLocked() {
307 lock_
.AssertAcquired();
308 if (!task_queue_manager_
)
311 LazyNow
lazy_now(task_queue_manager_
);
312 MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now
);
314 bool was_empty
= work_queue_
.empty();
315 while (!incoming_queue_
.empty()) {
316 // TODO(alexclarke): consider std::move() when allowed.
317 work_queue_
.push(incoming_queue_
.front());
318 incoming_queue_
.pop();
320 // |incoming_queue_| is now empty so TaskQueueManager::UpdateQueues no longer
321 // needs to consider this queue for reloading.
322 task_queue_manager_
->UnregisterAsUpdatableTaskQueue(this);
323 if (!work_queue_
.empty()) {
325 task_queue_manager_
->selector_
.GetTaskQueueSets()->OnPushQueue(this);
326 task_queue_manager_
->MaybePostDoWorkOnMainRunner();
330 void TaskQueueImpl::PumpQueue() {
331 base::AutoLock
lock(lock_
);
335 const char* TaskQueueImpl::GetName() const {
339 bool TaskQueueImpl::GetWorkQueueFrontTaskEnqueueOrder(
340 int* enqueue_order
) const {
341 if (work_queue_
.empty())
343 *enqueue_order
= work_queue_
.front().enqueue_order();
347 void TaskQueueImpl::PushTaskOntoWorkQueueForTest(const Task
& task
) {
348 work_queue_
.push(task
);
351 void TaskQueueImpl::PopTaskFromWorkQueueForTest() {
355 void TaskQueueImpl::SetQueuePriority(QueuePriority priority
) {
356 DCHECK(main_thread_checker_
.CalledOnValidThread());
357 if (!task_queue_manager_
)
360 task_queue_manager_
->selector_
.SetQueuePriority(this, priority
);
364 const char* TaskQueueImpl::PumpPolicyToString(
365 TaskQueue::PumpPolicy pump_policy
) {
366 switch (pump_policy
) {
367 case TaskQueue::PumpPolicy::AUTO
:
369 case TaskQueue::PumpPolicy::AFTER_WAKEUP
:
370 return "after_wakeup";
371 case TaskQueue::PumpPolicy::MANUAL
:
380 const char* TaskQueueImpl::WakeupPolicyToString(
381 TaskQueue::WakeupPolicy wakeup_policy
) {
382 switch (wakeup_policy
) {
383 case TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES
:
384 return "can_wake_other_queues";
385 case TaskQueue::WakeupPolicy::DONT_WAKE_OTHER_QUEUES
:
386 return "dont_wake_other_queues";
394 const char* TaskQueueImpl::PriorityToString(QueuePriority priority
) {
396 case CONTROL_PRIORITY
:
400 case NORMAL_PRIORITY
:
402 case BEST_EFFORT_PRIORITY
:
403 return "best_effort";
404 case DISABLED_PRIORITY
:
412 void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue
* state
) const {
413 base::AutoLock
lock(lock_
);
414 state
->BeginDictionary();
415 state
->SetString("name", GetName());
416 state
->SetString("pump_policy", PumpPolicyToString(pump_policy_
));
417 state
->SetString("wakeup_policy", WakeupPolicyToString(wakeup_policy_
));
418 bool verbose_tracing_enabled
= false;
419 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
420 disabled_by_default_verbose_tracing_category_
, &verbose_tracing_enabled
);
421 state
->SetInteger("incoming_queue_size", incoming_queue_
.size());
422 state
->SetInteger("work_queue_size", work_queue_
.size());
423 state
->SetInteger("delayed_task_queue_size", delayed_task_queue_
.size());
424 if (verbose_tracing_enabled
) {
425 state
->BeginArray("incoming_queue");
426 QueueAsValueInto(incoming_queue_
, state
);
428 state
->BeginArray("work_queue");
429 QueueAsValueInto(work_queue_
, state
);
431 state
->BeginArray("delayed_task_queue");
432 QueueAsValueInto(delayed_task_queue_
, state
);
435 state
->SetString("priority",
436 PriorityToString(static_cast<QueuePriority
>(set_index_
)));
437 state
->EndDictionary();
440 void TaskQueueImpl::AddTaskObserver(
441 base::MessageLoop::TaskObserver
* task_observer
) {
442 DCHECK(main_thread_checker_
.CalledOnValidThread());
443 task_observers_
.AddObserver(task_observer
);
446 void TaskQueueImpl::RemoveTaskObserver(
447 base::MessageLoop::TaskObserver
* task_observer
) {
448 DCHECK(main_thread_checker_
.CalledOnValidThread());
449 task_observers_
.RemoveObserver(task_observer
);
452 void TaskQueueImpl::NotifyWillProcessTask(
453 const base::PendingTask
& pending_task
) {
454 DCHECK(main_thread_checker_
.CalledOnValidThread());
455 DCHECK(should_notify_observers_
);
456 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver
, task_observers_
,
457 WillProcessTask(pending_task
));
460 void TaskQueueImpl::NotifyDidProcessTask(
461 const base::PendingTask
& pending_task
) {
462 DCHECK(main_thread_checker_
.CalledOnValidThread());
463 DCHECK(should_notify_observers_
);
464 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver
, task_observers_
,
465 DidProcessTask(pending_task
));
469 void TaskQueueImpl::QueueAsValueInto(const std::queue
<Task
>& queue
,
470 base::trace_event::TracedValue
* state
) {
471 std::queue
<Task
> queue_copy(queue
);
472 while (!queue_copy
.empty()) {
473 TaskAsValueInto(queue_copy
.front(), state
);
479 void TaskQueueImpl::QueueAsValueInto(const std::priority_queue
<Task
>& queue
,
480 base::trace_event::TracedValue
* state
) {
481 std::priority_queue
<Task
> queue_copy(queue
);
482 while (!queue_copy
.empty()) {
483 TaskAsValueInto(queue_copy
.top(), state
);
489 void TaskQueueImpl::TaskAsValueInto(const Task
& task
,
490 base::trace_event::TracedValue
* state
) {
491 state
->BeginDictionary();
492 state
->SetString("posted_from", task
.posted_from
.ToString());
493 state
->SetInteger("enqueue_order", task
.enqueue_order());
494 state
->SetInteger("sequence_num", task
.sequence_num
);
495 state
->SetBoolean("nestable", task
.nestable
);
496 state
->SetBoolean("is_high_res", task
.is_high_res
);
499 (task
.delayed_run_time
- base::TimeTicks()).InMicroseconds() / 1000.0L);
500 state
->EndDictionary();
503 } // namespace internal
504 } // namespace scheduler