1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/scheduler/child/task_queue_manager.h"
10 #include "base/bind.h"
11 #include "base/time/default_tick_clock.h"
12 #include "components/scheduler/child/lazy_now.h"
13 #include "components/scheduler/child/nestable_single_thread_task_runner.h"
14 #include "components/scheduler/child/task_queue_impl.h"
15 #include "components/scheduler/child/task_queue_selector.h"
16 #include "components/scheduler/child/task_queue_sets.h"
19 const int64_t kMaxTimeTicks
= std::numeric_limits
<int64
>::max();
24 TaskQueueManager::TaskQueueManager(
25 scoped_refptr
<NestableSingleThreadTaskRunner
> main_task_runner
,
26 const char* disabled_by_default_tracing_category
,
27 const char* disabled_by_default_verbose_tracing_category
)
28 : main_task_runner_(main_task_runner
),
29 task_was_run_on_quiescence_monitored_queue_(false),
30 pending_dowork_count_(0),
32 time_source_(new base::DefaultTickClock
),
33 disabled_by_default_tracing_category_(
34 disabled_by_default_tracing_category
),
35 disabled_by_default_verbose_tracing_category_(
36 disabled_by_default_verbose_tracing_category
),
37 deletion_sentinel_(new DeletionSentinel()),
39 DCHECK(main_task_runner
->RunsTasksOnCurrentThread());
40 TRACE_EVENT_OBJECT_CREATED_WITH_ID(disabled_by_default_tracing_category
,
41 "TaskQueueManager", this);
42 selector_
.SetTaskQueueSelectorObserver(this);
44 do_work_from_main_thread_closure_
=
45 base::Bind(&TaskQueueManager::DoWork
, weak_factory_
.GetWeakPtr(), true);
46 do_work_from_other_thread_closure_
=
47 base::Bind(&TaskQueueManager::DoWork
, weak_factory_
.GetWeakPtr(), false);
50 TaskQueueManager::~TaskQueueManager() {
51 TRACE_EVENT_OBJECT_DELETED_WITH_ID(disabled_by_default_tracing_category_
,
52 "TaskQueueManager", this);
53 for (auto& queue
: queues_
)
54 queue
->WillDeleteTaskQueueManager();
55 selector_
.SetTaskQueueSelectorObserver(nullptr);
58 scoped_refptr
<internal::TaskQueueImpl
> TaskQueueManager::NewTaskQueue(
59 const TaskQueue::Spec
& spec
) {
60 DCHECK(main_thread_checker_
.CalledOnValidThread());
61 scoped_refptr
<internal::TaskQueueImpl
> queue(
62 make_scoped_refptr(new internal::TaskQueueImpl(
63 this, spec
, disabled_by_default_tracing_category_
,
64 disabled_by_default_verbose_tracing_category_
)));
65 queues_
.insert(queue
.get());
66 selector_
.AddQueue(queue
.get());
70 base::TimeTicks
TaskQueueManager::NextPendingDelayedTaskRunTime() {
71 DCHECK(main_thread_checker_
.CalledOnValidThread());
72 bool found_pending_task
= false;
73 base::TimeTicks
next_pending_delayed_task(
74 base::TimeTicks::FromInternalValue(kMaxTimeTicks
));
75 for (auto& queue
: queues_
) {
76 base::TimeTicks queues_next_pending_delayed_task
;
77 if (queue
->NextPendingDelayedTaskRunTime(
78 &queues_next_pending_delayed_task
)) {
79 found_pending_task
= true;
80 next_pending_delayed_task
=
81 std::min(next_pending_delayed_task
, queues_next_pending_delayed_task
);
85 if (!found_pending_task
)
86 return base::TimeTicks();
88 DCHECK_NE(next_pending_delayed_task
,
89 base::TimeTicks::FromInternalValue(kMaxTimeTicks
));
90 return next_pending_delayed_task
;
93 void TaskQueueManager::RegisterAsUpdatableTaskQueue(
94 internal::TaskQueueImpl
* queue
) {
95 base::AutoLock
lock(newly_updatable_lock_
);
96 newly_updatable_
.push_back(queue
);
99 void TaskQueueManager::UnregisterAsUpdatableTaskQueue(
100 internal::TaskQueueImpl
* queue
) {
101 DCHECK(main_thread_checker_
.CalledOnValidThread());
102 updatable_queue_set_
.erase(queue
);
105 void TaskQueueManager::UpdateWorkQueues(
106 bool should_trigger_wakeup
,
107 const base::PendingTask
* previous_task
) {
108 DCHECK(main_thread_checker_
.CalledOnValidThread());
109 internal::LazyNow
lazy_now(this);
111 // Insert any newly updatable queues into the updatable_queue_set_.
113 base::AutoLock
lock(newly_updatable_lock_
);
114 while (!newly_updatable_
.empty()) {
115 updatable_queue_set_
.insert(newly_updatable_
.back());
116 newly_updatable_
.pop_back();
120 auto iter
= updatable_queue_set_
.begin();
121 while (iter
!= updatable_queue_set_
.end()) {
122 internal::TaskQueueImpl
* queue
= *iter
++;
123 // NOTE Update work queue may erase itself from |updatable_queue_set_|.
124 // This is fine, erasing an element won't invalidate any interator, as long
125 // as the iterator isn't the element being delated.
126 if (queue
->work_queue().empty())
127 queue
->UpdateWorkQueue(&lazy_now
, should_trigger_wakeup
, previous_task
);
128 if (!queue
->work_queue().empty()) {
129 // Currently we should not be getting tasks with delayed run times in any
130 // of the work queues.
131 DCHECK(queue
->work_queue().front().delayed_run_time
.is_null());
136 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
137 bool on_main_thread
= main_task_runner_
->BelongsToCurrentThread();
138 if (on_main_thread
) {
139 // We only want one pending DoWork posted from the main thread, or we risk
140 // an explosion of pending DoWorks which could starve out everything else.
141 if (pending_dowork_count_
> 0) {
144 pending_dowork_count_
++;
145 main_task_runner_
->PostTask(FROM_HERE
, do_work_from_main_thread_closure_
);
147 main_task_runner_
->PostTask(FROM_HERE
, do_work_from_other_thread_closure_
);
151 void TaskQueueManager::DoWork(bool posted_from_main_thread
) {
152 if (posted_from_main_thread
) {
153 pending_dowork_count_
--;
154 DCHECK_GE(pending_dowork_count_
, 0);
156 DCHECK(main_thread_checker_
.CalledOnValidThread());
158 // Pass false and nullptr to UpdateWorkQueues here to prevent waking up a
159 // pump-after-wakeup queue.
160 UpdateWorkQueues(false, nullptr);
162 base::PendingTask
previous_task((tracked_objects::Location()),
164 for (int i
= 0; i
< work_batch_size_
; i
++) {
165 internal::TaskQueueImpl
* queue
;
166 if (!SelectQueueToService(&queue
))
168 // Note that this function won't post another call to DoWork if one is
169 // already pending, so it is safe to call it in a loop.
170 MaybePostDoWorkOnMainRunner();
172 if (ProcessTaskFromWorkQueue(queue
, &previous_task
))
173 return; // The TaskQueueManager got deleted, we must bail out.
175 bool should_trigger_wakeup
= queue
->wakeup_policy() ==
176 TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES
;
177 UpdateWorkQueues(should_trigger_wakeup
, &previous_task
);
179 // Only run a single task per batch in nested run loops so that we can
180 // properly exit the nested loop when someone calls RunLoop::Quit().
181 if (main_task_runner_
->IsNested())
186 bool TaskQueueManager::SelectQueueToService(
187 internal::TaskQueueImpl
** out_queue
) {
188 bool should_run
= selector_
.SelectQueueToService(out_queue
);
189 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
190 disabled_by_default_tracing_category_
, "TaskQueueManager", this,
191 AsValueWithSelectorResult(should_run
, *out_queue
));
195 void TaskQueueManager::DidQueueTask(const base::PendingTask
& pending_task
) {
196 task_annotator_
.DidQueueTask("TaskQueueManager::PostTask", pending_task
);
199 bool TaskQueueManager::ProcessTaskFromWorkQueue(
200 internal::TaskQueueImpl
* queue
,
201 base::PendingTask
* out_previous_task
) {
202 DCHECK(main_thread_checker_
.CalledOnValidThread());
203 scoped_refptr
<DeletionSentinel
> protect(deletion_sentinel_
);
204 base::PendingTask pending_task
= queue
->TakeTaskFromWorkQueue();
206 if (queue
->GetQuiescenceMonitored())
207 task_was_run_on_quiescence_monitored_queue_
= true;
209 if (!pending_task
.nestable
&& main_task_runner_
->IsNested()) {
210 // Defer non-nestable work to the main task runner. NOTE these tasks can be
211 // arbitrarily delayed so the additional delay should not be a problem.
212 main_task_runner_
->PostNonNestableTask(pending_task
.posted_from
,
215 TRACE_TASK_EXECUTION("TaskQueueManager::ProcessTaskFromWorkQueue",
217 if (queue
->GetShouldNotifyObservers()) {
218 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver
, task_observers_
,
219 WillProcessTask(pending_task
));
221 task_annotator_
.RunTask("TaskQueueManager::PostTask", pending_task
);
223 // Detect if the TaskQueueManager just got deleted. If this happens we must
224 // not access any member variables after this point.
225 if (protect
->HasOneRef())
228 if (queue
->GetShouldNotifyObservers()) {
229 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver
, task_observers_
,
230 DidProcessTask(pending_task
));
233 pending_task
.task
.Reset();
234 *out_previous_task
= pending_task
;
239 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
240 return main_task_runner_
->RunsTasksOnCurrentThread();
243 bool TaskQueueManager::PostDelayedTask(
244 const tracked_objects::Location
& from_here
,
245 const base::Closure
& task
,
246 base::TimeDelta delay
) {
247 DCHECK_GE(delay
, base::TimeDelta());
248 return main_task_runner_
->PostDelayedTask(from_here
, task
, delay
);
251 void TaskQueueManager::SetWorkBatchSize(int work_batch_size
) {
252 DCHECK(main_thread_checker_
.CalledOnValidThread());
253 DCHECK_GE(work_batch_size
, 1);
254 work_batch_size_
= work_batch_size
;
257 void TaskQueueManager::AddTaskObserver(
258 base::MessageLoop::TaskObserver
* task_observer
) {
259 DCHECK(main_thread_checker_
.CalledOnValidThread());
260 task_observers_
.AddObserver(task_observer
);
263 void TaskQueueManager::RemoveTaskObserver(
264 base::MessageLoop::TaskObserver
* task_observer
) {
265 DCHECK(main_thread_checker_
.CalledOnValidThread());
266 task_observers_
.RemoveObserver(task_observer
);
269 void TaskQueueManager::SetTimeSourceForTesting(
270 scoped_ptr
<base::TickClock
> time_source
) {
271 DCHECK(main_thread_checker_
.CalledOnValidThread());
272 time_source_
= time_source
.Pass();
275 bool TaskQueueManager::GetAndClearSystemIsQuiescentBit() {
276 bool task_was_run
= task_was_run_on_quiescence_monitored_queue_
;
277 task_was_run_on_quiescence_monitored_queue_
= false;
278 return !task_was_run
;
281 base::TimeTicks
TaskQueueManager::Now() const {
282 return time_source_
->NowTicks();
285 int TaskQueueManager::GetNextSequenceNumber() {
286 return task_sequence_num_
.GetNext();
289 scoped_refptr
<base::trace_event::ConvertableToTraceFormat
>
290 TaskQueueManager::AsValueWithSelectorResult(
292 internal::TaskQueueImpl
* selected_queue
) const {
293 DCHECK(main_thread_checker_
.CalledOnValidThread());
294 scoped_refptr
<base::trace_event::TracedValue
> state
=
295 new base::trace_event::TracedValue();
296 state
->BeginArray("queues");
297 for (auto& queue
: queues_
)
298 queue
->AsValueInto(state
.get());
300 state
->BeginDictionary("selector");
301 selector_
.AsValueInto(state
.get());
302 state
->EndDictionary();
304 state
->SetString("selected_queue", selected_queue
->GetName());
306 state
->BeginArray("updatable_queue_set");
307 for (auto& queue
: updatable_queue_set_
)
308 state
->AppendString(queue
->GetName());
313 void TaskQueueManager::OnTaskQueueEnabled() {
314 DCHECK(main_thread_checker_
.CalledOnValidThread());
315 MaybePostDoWorkOnMainRunner();
318 } // namespace scheduler