Allow only one bookmark to be added for multiple fast starring
[chromium-blink-merge.git] / components / scheduler / child / task_queue_manager.cc
blobc557665ab7a4b6a5fac49b02fa6f58a461135f56
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/scheduler/child/task_queue_manager.h"
7 #include <queue>
8 #include <set>
10 #include "base/bind.h"
11 #include "base/time/default_tick_clock.h"
12 #include "components/scheduler/child/lazy_now.h"
13 #include "components/scheduler/child/nestable_single_thread_task_runner.h"
14 #include "components/scheduler/child/task_queue_impl.h"
15 #include "components/scheduler/child/task_queue_selector.h"
16 #include "components/scheduler/child/task_queue_sets.h"
18 namespace {
19 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
22 namespace scheduler {
24 TaskQueueManager::TaskQueueManager(
25 scoped_refptr<NestableSingleThreadTaskRunner> main_task_runner,
26 const char* disabled_by_default_tracing_category,
27 const char* disabled_by_default_verbose_tracing_category)
28 : main_task_runner_(main_task_runner),
29 task_was_run_on_quiescence_monitored_queue_(false),
30 pending_dowork_count_(0),
31 work_batch_size_(1),
32 time_source_(new base::DefaultTickClock),
33 disabled_by_default_tracing_category_(
34 disabled_by_default_tracing_category),
35 disabled_by_default_verbose_tracing_category_(
36 disabled_by_default_verbose_tracing_category),
37 deletion_sentinel_(new DeletionSentinel()),
38 weak_factory_(this) {
39 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
40 TRACE_EVENT_OBJECT_CREATED_WITH_ID(disabled_by_default_tracing_category,
41 "TaskQueueManager", this);
42 selector_.SetTaskQueueSelectorObserver(this);
44 do_work_from_main_thread_closure_ =
45 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), true);
46 do_work_from_other_thread_closure_ =
47 base::Bind(&TaskQueueManager::DoWork, weak_factory_.GetWeakPtr(), false);
50 TaskQueueManager::~TaskQueueManager() {
51 TRACE_EVENT_OBJECT_DELETED_WITH_ID(disabled_by_default_tracing_category_,
52 "TaskQueueManager", this);
53 for (auto& queue : queues_)
54 queue->WillDeleteTaskQueueManager();
55 selector_.SetTaskQueueSelectorObserver(nullptr);
58 scoped_refptr<internal::TaskQueueImpl> TaskQueueManager::NewTaskQueue(
59 const TaskQueue::Spec& spec) {
60 DCHECK(main_thread_checker_.CalledOnValidThread());
61 scoped_refptr<internal::TaskQueueImpl> queue(
62 make_scoped_refptr(new internal::TaskQueueImpl(
63 this, spec, disabled_by_default_tracing_category_,
64 disabled_by_default_verbose_tracing_category_)));
65 queues_.insert(queue.get());
66 selector_.AddQueue(queue.get());
67 return queue;
70 base::TimeTicks TaskQueueManager::NextPendingDelayedTaskRunTime() {
71 DCHECK(main_thread_checker_.CalledOnValidThread());
72 bool found_pending_task = false;
73 base::TimeTicks next_pending_delayed_task(
74 base::TimeTicks::FromInternalValue(kMaxTimeTicks));
75 for (auto& queue : queues_) {
76 base::TimeTicks queues_next_pending_delayed_task;
77 if (queue->NextPendingDelayedTaskRunTime(
78 &queues_next_pending_delayed_task)) {
79 found_pending_task = true;
80 next_pending_delayed_task =
81 std::min(next_pending_delayed_task, queues_next_pending_delayed_task);
85 if (!found_pending_task)
86 return base::TimeTicks();
88 DCHECK_NE(next_pending_delayed_task,
89 base::TimeTicks::FromInternalValue(kMaxTimeTicks));
90 return next_pending_delayed_task;
93 void TaskQueueManager::RegisterAsUpdatableTaskQueue(
94 internal::TaskQueueImpl* queue) {
95 base::AutoLock lock(newly_updatable_lock_);
96 newly_updatable_.push_back(queue);
99 void TaskQueueManager::UnregisterAsUpdatableTaskQueue(
100 internal::TaskQueueImpl* queue) {
101 DCHECK(main_thread_checker_.CalledOnValidThread());
102 updatable_queue_set_.erase(queue);
105 void TaskQueueManager::UpdateWorkQueues(
106 bool should_trigger_wakeup,
107 const base::PendingTask* previous_task) {
108 DCHECK(main_thread_checker_.CalledOnValidThread());
109 internal::LazyNow lazy_now(this);
111 // Insert any newly updatable queues into the updatable_queue_set_.
113 base::AutoLock lock(newly_updatable_lock_);
114 while (!newly_updatable_.empty()) {
115 updatable_queue_set_.insert(newly_updatable_.back());
116 newly_updatable_.pop_back();
120 auto iter = updatable_queue_set_.begin();
121 while (iter != updatable_queue_set_.end()) {
122 internal::TaskQueueImpl* queue = *iter++;
123 // NOTE Update work queue may erase itself from |updatable_queue_set_|.
124 // This is fine, erasing an element won't invalidate any interator, as long
125 // as the iterator isn't the element being delated.
126 if (queue->work_queue().empty())
127 queue->UpdateWorkQueue(&lazy_now, should_trigger_wakeup, previous_task);
128 if (!queue->work_queue().empty()) {
129 // Currently we should not be getting tasks with delayed run times in any
130 // of the work queues.
131 DCHECK(queue->work_queue().front().delayed_run_time.is_null());
136 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
137 bool on_main_thread = main_task_runner_->BelongsToCurrentThread();
138 if (on_main_thread) {
139 // We only want one pending DoWork posted from the main thread, or we risk
140 // an explosion of pending DoWorks which could starve out everything else.
141 if (pending_dowork_count_ > 0) {
142 return;
144 pending_dowork_count_++;
145 main_task_runner_->PostTask(FROM_HERE, do_work_from_main_thread_closure_);
146 } else {
147 main_task_runner_->PostTask(FROM_HERE, do_work_from_other_thread_closure_);
151 void TaskQueueManager::DoWork(bool posted_from_main_thread) {
152 if (posted_from_main_thread) {
153 pending_dowork_count_--;
154 DCHECK_GE(pending_dowork_count_, 0);
156 DCHECK(main_thread_checker_.CalledOnValidThread());
158 // Pass false and nullptr to UpdateWorkQueues here to prevent waking up a
159 // pump-after-wakeup queue.
160 UpdateWorkQueues(false, nullptr);
162 base::PendingTask previous_task((tracked_objects::Location()),
163 (base::Closure()));
164 for (int i = 0; i < work_batch_size_; i++) {
165 internal::TaskQueueImpl* queue;
166 if (!SelectQueueToService(&queue))
167 return;
168 // Note that this function won't post another call to DoWork if one is
169 // already pending, so it is safe to call it in a loop.
170 MaybePostDoWorkOnMainRunner();
172 if (ProcessTaskFromWorkQueue(queue, &previous_task))
173 return; // The TaskQueueManager got deleted, we must bail out.
175 bool should_trigger_wakeup = queue->wakeup_policy() ==
176 TaskQueue::WakeupPolicy::CAN_WAKE_OTHER_QUEUES;
177 UpdateWorkQueues(should_trigger_wakeup, &previous_task);
179 // Only run a single task per batch in nested run loops so that we can
180 // properly exit the nested loop when someone calls RunLoop::Quit().
181 if (main_task_runner_->IsNested())
182 break;
186 bool TaskQueueManager::SelectQueueToService(
187 internal::TaskQueueImpl** out_queue) {
188 bool should_run = selector_.SelectQueueToService(out_queue);
189 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
190 disabled_by_default_tracing_category_, "TaskQueueManager", this,
191 AsValueWithSelectorResult(should_run, *out_queue));
192 return should_run;
195 void TaskQueueManager::DidQueueTask(const base::PendingTask& pending_task) {
196 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", pending_task);
199 bool TaskQueueManager::ProcessTaskFromWorkQueue(
200 internal::TaskQueueImpl* queue,
201 base::PendingTask* out_previous_task) {
202 DCHECK(main_thread_checker_.CalledOnValidThread());
203 scoped_refptr<DeletionSentinel> protect(deletion_sentinel_);
204 base::PendingTask pending_task = queue->TakeTaskFromWorkQueue();
206 if (queue->GetQuiescenceMonitored())
207 task_was_run_on_quiescence_monitored_queue_ = true;
209 if (!pending_task.nestable && main_task_runner_->IsNested()) {
210 // Defer non-nestable work to the main task runner. NOTE these tasks can be
211 // arbitrarily delayed so the additional delay should not be a problem.
212 main_task_runner_->PostNonNestableTask(pending_task.posted_from,
213 pending_task.task);
214 } else {
215 TRACE_TASK_EXECUTION("TaskQueueManager::ProcessTaskFromWorkQueue",
216 pending_task);
217 if (queue->GetShouldNotifyObservers()) {
218 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
219 WillProcessTask(pending_task));
221 task_annotator_.RunTask("TaskQueueManager::PostTask", pending_task);
223 // Detect if the TaskQueueManager just got deleted. If this happens we must
224 // not access any member variables after this point.
225 if (protect->HasOneRef())
226 return true;
228 if (queue->GetShouldNotifyObservers()) {
229 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
230 DidProcessTask(pending_task));
233 pending_task.task.Reset();
234 *out_previous_task = pending_task;
236 return false;
239 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
240 return main_task_runner_->RunsTasksOnCurrentThread();
243 bool TaskQueueManager::PostDelayedTask(
244 const tracked_objects::Location& from_here,
245 const base::Closure& task,
246 base::TimeDelta delay) {
247 DCHECK_GE(delay, base::TimeDelta());
248 return main_task_runner_->PostDelayedTask(from_here, task, delay);
251 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
252 DCHECK(main_thread_checker_.CalledOnValidThread());
253 DCHECK_GE(work_batch_size, 1);
254 work_batch_size_ = work_batch_size;
257 void TaskQueueManager::AddTaskObserver(
258 base::MessageLoop::TaskObserver* task_observer) {
259 DCHECK(main_thread_checker_.CalledOnValidThread());
260 task_observers_.AddObserver(task_observer);
263 void TaskQueueManager::RemoveTaskObserver(
264 base::MessageLoop::TaskObserver* task_observer) {
265 DCHECK(main_thread_checker_.CalledOnValidThread());
266 task_observers_.RemoveObserver(task_observer);
269 void TaskQueueManager::SetTimeSourceForTesting(
270 scoped_ptr<base::TickClock> time_source) {
271 DCHECK(main_thread_checker_.CalledOnValidThread());
272 time_source_ = time_source.Pass();
275 bool TaskQueueManager::GetAndClearSystemIsQuiescentBit() {
276 bool task_was_run = task_was_run_on_quiescence_monitored_queue_;
277 task_was_run_on_quiescence_monitored_queue_ = false;
278 return !task_was_run;
281 base::TimeTicks TaskQueueManager::Now() const {
282 return time_source_->NowTicks();
285 int TaskQueueManager::GetNextSequenceNumber() {
286 return task_sequence_num_.GetNext();
289 scoped_refptr<base::trace_event::ConvertableToTraceFormat>
290 TaskQueueManager::AsValueWithSelectorResult(
291 bool should_run,
292 internal::TaskQueueImpl* selected_queue) const {
293 DCHECK(main_thread_checker_.CalledOnValidThread());
294 scoped_refptr<base::trace_event::TracedValue> state =
295 new base::trace_event::TracedValue();
296 state->BeginArray("queues");
297 for (auto& queue : queues_)
298 queue->AsValueInto(state.get());
299 state->EndArray();
300 state->BeginDictionary("selector");
301 selector_.AsValueInto(state.get());
302 state->EndDictionary();
303 if (should_run)
304 state->SetString("selected_queue", selected_queue->GetName());
306 state->BeginArray("updatable_queue_set");
307 for (auto& queue : updatable_queue_set_)
308 state->AppendString(queue->GetName());
309 state->EndArray();
310 return state;
313 void TaskQueueManager::OnTaskQueueEnabled() {
314 DCHECK(main_thread_checker_.CalledOnValidThread());
315 MaybePostDoWorkOnMainRunner();
318 } // namespace scheduler