[ServiceWorker] Implement WebServiceWorkerContextClient::openWindow().
[chromium-blink-merge.git] / content / renderer / scheduler / task_queue_manager.cc
blob083c7734acdd7f9e61fbbf6e4d3492ea5e021285
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/renderer/scheduler/task_queue_manager.h"
7 #include <queue>
9 #include "base/bind.h"
10 #include "base/trace_event/trace_event.h"
11 #include "base/trace_event/trace_event_argument.h"
12 #include "cc/test/test_now_source.h"
13 #include "content/renderer/scheduler/task_queue_selector.h"
15 namespace {
16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
19 namespace content {
20 namespace internal {
22 class TaskQueue : public base::SingleThreadTaskRunner {
23 public:
24 TaskQueue(TaskQueueManager* task_queue_manager);
26 // base::SingleThreadTaskRunner implementation.
27 bool RunsTasksOnCurrentThread() const override;
28 bool PostDelayedTask(const tracked_objects::Location& from_here,
29 const base::Closure& task,
30 base::TimeDelta delay) override {
31 return PostDelayedTaskImpl(from_here, task, delay, true);
34 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
35 const base::Closure& task,
36 base::TimeDelta delay) override {
37 return PostDelayedTaskImpl(from_here, task, delay, false);
40 // Adds a task at the end of the incoming task queue and schedules a call to
41 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
42 // pumping is enabled. Can be called on an arbitrary thread.
43 void EnqueueTask(const base::PendingTask& pending_task);
45 bool IsQueueEmpty() const;
47 void SetAutoPump(bool auto_pump);
48 void PumpQueue();
50 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task);
51 base::PendingTask TakeTaskFromWorkQueue();
53 void WillDeleteTaskQueueManager();
55 base::TaskQueue& work_queue() { return work_queue_; }
57 void set_name(const char* name) { name_ = name; }
59 void AsValueInto(base::debug::TracedValue* state) const;
61 private:
62 ~TaskQueue() override;
64 bool PostDelayedTaskImpl(const tracked_objects::Location& from_here,
65 const base::Closure& task,
66 base::TimeDelta delay,
67 bool nestable);
69 void PumpQueueLocked();
70 void EnqueueTaskLocked(const base::PendingTask& pending_task);
72 void TraceWorkQueueSize() const;
73 static void QueueAsValueInto(const base::TaskQueue& queue,
74 base::debug::TracedValue* state);
75 static void TaskAsValueInto(const base::PendingTask& task,
76 base::debug::TracedValue* state);
78 // This lock protects all members except the work queue.
79 mutable base::Lock lock_;
80 TaskQueueManager* task_queue_manager_;
81 base::TaskQueue incoming_queue_;
82 bool auto_pump_;
83 const char* name_;
84 std::priority_queue<base::TimeTicks,
85 std::vector<base::TimeTicks>,
86 std::greater<base::TimeTicks>> delayed_task_run_times_;
88 base::TaskQueue work_queue_;
90 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
93 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager)
94 : task_queue_manager_(task_queue_manager),
95 auto_pump_(true),
96 name_(nullptr) {
99 TaskQueue::~TaskQueue() {
102 void TaskQueue::WillDeleteTaskQueueManager() {
103 base::AutoLock lock(lock_);
104 task_queue_manager_ = nullptr;
107 bool TaskQueue::RunsTasksOnCurrentThread() const {
108 base::AutoLock lock(lock_);
109 if (!task_queue_manager_)
110 return false;
111 return task_queue_manager_->RunsTasksOnCurrentThread();
114 bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here,
115 const base::Closure& task,
116 base::TimeDelta delay,
117 bool nestable) {
118 base::AutoLock lock(lock_);
119 if (!task_queue_manager_)
120 return false;
122 base::PendingTask pending_task(from_here, task, base::TimeTicks(), nestable);
123 task_queue_manager_->DidQueueTask(&pending_task);
125 if (delay > base::TimeDelta()) {
126 pending_task.delayed_run_time = task_queue_manager_->Now() + delay;
127 delayed_task_run_times_.push(pending_task.delayed_run_time);
128 return task_queue_manager_->PostDelayedTask(
129 from_here, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay);
131 EnqueueTaskLocked(pending_task);
132 return true;
135 bool TaskQueue::IsQueueEmpty() const {
136 if (!work_queue_.empty())
137 return false;
140 base::AutoLock lock(lock_);
141 return incoming_queue_.empty();
145 bool TaskQueue::UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task) {
146 if (!work_queue_.empty())
147 return true;
150 base::AutoLock lock(lock_);
151 if (!delayed_task_run_times_.empty()) {
152 *next_pending_delayed_task =
153 std::min(*next_pending_delayed_task, delayed_task_run_times_.top());
155 if (!auto_pump_ || incoming_queue_.empty())
156 return false;
157 work_queue_.Swap(&incoming_queue_);
158 TraceWorkQueueSize();
159 return true;
163 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() {
164 base::PendingTask pending_task = work_queue_.front();
165 work_queue_.pop();
166 TraceWorkQueueSize();
167 return pending_task;
170 void TaskQueue::TraceWorkQueueSize() const {
171 if (!name_)
172 return;
173 TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_,
174 work_queue_.size());
177 void TaskQueue::EnqueueTask(const base::PendingTask& pending_task) {
178 base::AutoLock lock(lock_);
179 EnqueueTaskLocked(pending_task);
182 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
183 lock_.AssertAcquired();
184 if (!task_queue_manager_)
185 return;
186 if (auto_pump_ && incoming_queue_.empty())
187 task_queue_manager_->MaybePostDoWorkOnMainRunner();
188 incoming_queue_.push(pending_task);
190 if (!pending_task.delayed_run_time.is_null()) {
191 // Update the time of the next pending delayed task.
192 while (!delayed_task_run_times_.empty() &&
193 delayed_task_run_times_.top() <= pending_task.delayed_run_time) {
194 delayed_task_run_times_.pop();
196 // Clear the delayed run time because we've already applied the delay
197 // before getting here.
198 incoming_queue_.back().delayed_run_time = base::TimeTicks();
202 void TaskQueue::SetAutoPump(bool auto_pump) {
203 base::AutoLock lock(lock_);
204 if (auto_pump) {
205 auto_pump_ = true;
206 PumpQueueLocked();
207 } else {
208 auto_pump_ = false;
212 void TaskQueue::PumpQueueLocked() {
213 lock_.AssertAcquired();
214 while (!incoming_queue_.empty()) {
215 work_queue_.push(incoming_queue_.front());
216 incoming_queue_.pop();
218 if (!work_queue_.empty())
219 task_queue_manager_->MaybePostDoWorkOnMainRunner();
222 void TaskQueue::PumpQueue() {
223 base::AutoLock lock(lock_);
224 PumpQueueLocked();
227 void TaskQueue::AsValueInto(base::debug::TracedValue* state) const {
228 base::AutoLock lock(lock_);
229 state->BeginDictionary();
230 if (name_)
231 state->SetString("name", name_);
232 state->SetBoolean("auto_pump", auto_pump_);
233 state->BeginArray("incoming_queue");
234 QueueAsValueInto(incoming_queue_, state);
235 state->EndArray();
236 state->BeginArray("work_queue");
237 QueueAsValueInto(work_queue_, state);
238 state->EndArray();
239 state->EndDictionary();
242 // static
243 void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue,
244 base::debug::TracedValue* state) {
245 base::TaskQueue queue_copy(queue);
246 while (!queue_copy.empty()) {
247 TaskAsValueInto(queue_copy.front(), state);
248 queue_copy.pop();
252 // static
253 void TaskQueue::TaskAsValueInto(const base::PendingTask& task,
254 base::debug::TracedValue* state) {
255 state->BeginDictionary();
256 state->SetString("posted_from", task.posted_from.ToString());
257 state->SetInteger("sequence_num", task.sequence_num);
258 state->SetBoolean("nestable", task.nestable);
259 state->SetBoolean("is_high_res", task.is_high_res);
260 state->SetDouble(
261 "delayed_run_time",
262 (task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L);
263 state->EndDictionary();
266 } // namespace internal
268 TaskQueueManager::TaskQueueManager(
269 size_t task_queue_count,
270 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
271 TaskQueueSelector* selector)
272 : main_task_runner_(main_task_runner),
273 selector_(selector),
274 pending_dowork_count_(0),
275 work_batch_size_(1),
276 time_source_(nullptr),
277 weak_factory_(this) {
278 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
279 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
280 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
281 this);
283 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr();
284 for (size_t i = 0; i < task_queue_count; i++) {
285 scoped_refptr<internal::TaskQueue> queue(
286 make_scoped_refptr(new internal::TaskQueue(this)));
287 queues_.push_back(queue);
290 std::vector<const base::TaskQueue*> work_queues;
291 for (const auto& queue: queues_)
292 work_queues.push_back(&queue->work_queue());
293 selector_->RegisterWorkQueues(work_queues);
296 TaskQueueManager::~TaskQueueManager() {
297 TRACE_EVENT_OBJECT_DELETED_WITH_ID(
298 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
299 this);
300 for (auto& queue : queues_)
301 queue->WillDeleteTaskQueueManager();
304 internal::TaskQueue* TaskQueueManager::Queue(size_t queue_index) const {
305 DCHECK_LT(queue_index, queues_.size());
306 return queues_[queue_index].get();
309 scoped_refptr<base::SingleThreadTaskRunner>
310 TaskQueueManager::TaskRunnerForQueue(size_t queue_index) const {
311 return Queue(queue_index);
314 bool TaskQueueManager::IsQueueEmpty(size_t queue_index) const {
315 internal::TaskQueue* queue = Queue(queue_index);
316 return queue->IsQueueEmpty();
319 void TaskQueueManager::SetAutoPump(size_t queue_index, bool auto_pump) {
320 main_thread_checker_.CalledOnValidThread();
321 internal::TaskQueue* queue = Queue(queue_index);
322 queue->SetAutoPump(auto_pump);
325 void TaskQueueManager::PumpQueue(size_t queue_index) {
326 main_thread_checker_.CalledOnValidThread();
327 internal::TaskQueue* queue = Queue(queue_index);
328 queue->PumpQueue();
331 bool TaskQueueManager::UpdateWorkQueues(
332 base::TimeTicks* next_pending_delayed_task) {
333 // TODO(skyostil): This is not efficient when the number of queues grows very
334 // large due to the number of locks taken. Consider optimizing when we get
335 // there.
336 main_thread_checker_.CalledOnValidThread();
337 bool has_work = false;
338 for (auto& queue : queues_) {
339 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task);
340 if (!queue->work_queue().empty()) {
341 // Currently we should not be getting tasks with delayed run times in any
342 // of the work queues.
343 DCHECK(queue->work_queue().front().delayed_run_time.is_null());
346 return has_work;
349 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
350 bool on_main_thread = main_task_runner_->BelongsToCurrentThread();
351 if (on_main_thread) {
352 // We only want one pending DoWork posted from the main thread, or we risk
353 // an explosion of pending DoWorks which could starve out everything else.
354 if (pending_dowork_count_ > 0) {
355 return;
357 pending_dowork_count_++;
360 main_task_runner_->PostTask(
361 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_,
362 on_main_thread));
365 void TaskQueueManager::DoWork(bool posted_from_main_thread) {
366 if (posted_from_main_thread) {
367 pending_dowork_count_--;
368 DCHECK_GE(pending_dowork_count_, 0);
370 main_thread_checker_.CalledOnValidThread();
372 base::TimeTicks next_pending_delayed_task(
373 base::TimeTicks::FromInternalValue(kMaxTimeTicks));
374 for (int i = 0; i < work_batch_size_; i++) {
375 if (!UpdateWorkQueues(&next_pending_delayed_task))
376 return;
378 // Interrupt the work batch if we should run the next delayed task.
379 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks &&
380 Now() >= next_pending_delayed_task)
381 return;
383 size_t queue_index;
384 if (!SelectWorkQueueToService(&queue_index))
385 return;
386 // Note that this function won't post another call to DoWork if one is
387 // already pending, so it is safe to call it in a loop.
388 MaybePostDoWorkOnMainRunner();
389 ProcessTaskFromWorkQueue(queue_index);
393 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) {
394 bool should_run = selector_->SelectWorkQueueToService(out_queue_index);
395 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
396 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
397 AsValueWithSelectorResult(should_run, *out_queue_index));
398 return should_run;
401 void TaskQueueManager::DidQueueTask(base::PendingTask* pending_task) {
402 pending_task->sequence_num = task_sequence_num_.GetNext();
403 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", *pending_task);
406 void TaskQueueManager::ProcessTaskFromWorkQueue(size_t queue_index) {
407 main_thread_checker_.CalledOnValidThread();
408 internal::TaskQueue* queue = Queue(queue_index);
409 base::PendingTask pending_task = queue->TakeTaskFromWorkQueue();
410 if (!pending_task.nestable) {
411 // Defer non-nestable work to the main task runner. NOTE these tasks can be
412 // arbitrarily delayed so the additional delay should not be a problem.
413 main_task_runner_->PostNonNestableTask(pending_task.posted_from,
414 pending_task.task);
415 } else {
416 task_annotator_.RunTask("TaskQueueManager::PostTask",
417 "TaskQueueManager::RunTask", pending_task);
421 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
422 return main_task_runner_->RunsTasksOnCurrentThread();
425 bool TaskQueueManager::PostDelayedTask(
426 const tracked_objects::Location& from_here,
427 const base::Closure& task,
428 base::TimeDelta delay) {
429 DCHECK(delay > base::TimeDelta());
430 return main_task_runner_->PostDelayedTask(from_here, task, delay);
433 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
434 main_thread_checker_.CalledOnValidThread();
435 internal::TaskQueue* queue = Queue(queue_index);
436 queue->set_name(name);
439 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
440 main_thread_checker_.CalledOnValidThread();
441 DCHECK_GE(work_batch_size, 1);
442 work_batch_size_ = work_batch_size;
445 void TaskQueueManager::SetTimeSourceForTesting(
446 scoped_refptr<cc::TestNowSource> time_source) {
447 main_thread_checker_.CalledOnValidThread();
448 time_source_ = time_source;
451 base::TimeTicks TaskQueueManager::Now() const {
452 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now();
455 scoped_refptr<base::debug::ConvertableToTraceFormat>
456 TaskQueueManager::AsValueWithSelectorResult(bool should_run,
457 size_t selected_queue) const {
458 main_thread_checker_.CalledOnValidThread();
459 scoped_refptr<base::debug::TracedValue> state =
460 new base::debug::TracedValue();
461 state->BeginArray("queues");
462 for (auto& queue : queues_)
463 queue->AsValueInto(state.get());
464 state->EndArray();
465 state->BeginDictionary("selector");
466 selector_->AsValueInto(state.get());
467 state->EndDictionary();
468 if (should_run)
469 state->SetInteger("selected_queue", selected_queue);
470 return state;
473 } // namespace content