ozone: evdev: Sync caps lock LED state to evdev
[chromium-blink-merge.git] / content / renderer / scheduler / task_queue_manager.cc
blob0b88d5a2f3791534dc5ac0c0f3a36014d1bbb9a5
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/renderer/scheduler/task_queue_manager.h"
7 #include <queue>
9 #include "base/bind.h"
10 #include "base/trace_event/trace_event.h"
11 #include "base/trace_event/trace_event_argument.h"
12 #include "cc/test/test_now_source.h"
13 #include "content/renderer/scheduler/task_queue_selector.h"
15 namespace {
16 const int64_t kMaxTimeTicks = std::numeric_limits<int64>::max();
19 namespace content {
20 namespace internal {
22 class TaskQueue : public base::SingleThreadTaskRunner {
23 public:
24 TaskQueue(TaskQueueManager* task_queue_manager);
26 // base::SingleThreadTaskRunner implementation.
27 bool RunsTasksOnCurrentThread() const override;
28 bool PostDelayedTask(const tracked_objects::Location& from_here,
29 const base::Closure& task,
30 base::TimeDelta delay) override {
31 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NORMAL);
34 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
35 const base::Closure& task,
36 base::TimeDelta delay) override {
37 return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE);
40 bool IsQueueEmpty() const;
42 void SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy);
43 void PumpQueue();
45 bool UpdateWorkQueue(base::TimeTicks* next_pending_delayed_task,
46 const base::PendingTask* previous_task);
47 base::PendingTask TakeTaskFromWorkQueue();
49 void WillDeleteTaskQueueManager();
51 base::TaskQueue& work_queue() { return work_queue_; }
53 void set_name(const char* name) { name_ = name; }
55 void AsValueInto(base::trace_event::TracedValue* state) const;
57 private:
58 enum class TaskType {
59 NORMAL,
60 NON_NESTABLE,
63 ~TaskQueue() override;
65 bool PostDelayedTaskImpl(const tracked_objects::Location& from_here,
66 const base::Closure& task,
67 base::TimeDelta delay,
68 TaskType task_type);
70 // Adds a task at the end of the incoming task queue and schedules a call to
71 // TaskQueueManager::DoWork() if the incoming queue was empty and automatic
72 // pumping is enabled. Can be called on an arbitrary thread.
73 void EnqueueTask(const base::PendingTask& pending_task);
75 void PumpQueueLocked();
76 bool TaskIsOlderThanQueuedTasks(const base::PendingTask* task);
77 bool ShouldAutoPumpQueueLocked(const base::PendingTask* previous_task);
78 void EnqueueTaskLocked(const base::PendingTask& pending_task);
80 void TraceQueueSize(bool is_locked) const;
81 static const char* PumpPolicyToString(
82 TaskQueueManager::PumpPolicy pump_policy);
83 static void QueueAsValueInto(const base::TaskQueue& queue,
84 base::trace_event::TracedValue* state);
85 static void TaskAsValueInto(const base::PendingTask& task,
86 base::trace_event::TracedValue* state);
88 // This lock protects all members except the work queue.
89 mutable base::Lock lock_;
90 TaskQueueManager* task_queue_manager_;
91 base::TaskQueue incoming_queue_;
92 TaskQueueManager::PumpPolicy pump_policy_;
93 const char* name_;
94 std::priority_queue<base::TimeTicks,
95 std::vector<base::TimeTicks>,
96 std::greater<base::TimeTicks>> delayed_task_run_times_;
98 base::TaskQueue work_queue_;
100 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
103 TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager)
104 : task_queue_manager_(task_queue_manager),
105 pump_policy_(TaskQueueManager::PumpPolicy::AUTO),
106 name_(nullptr) {
109 TaskQueue::~TaskQueue() {
112 void TaskQueue::WillDeleteTaskQueueManager() {
113 base::AutoLock lock(lock_);
114 task_queue_manager_ = nullptr;
117 bool TaskQueue::RunsTasksOnCurrentThread() const {
118 base::AutoLock lock(lock_);
119 if (!task_queue_manager_)
120 return false;
121 return task_queue_manager_->RunsTasksOnCurrentThread();
124 bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here,
125 const base::Closure& task,
126 base::TimeDelta delay,
127 TaskType task_type) {
128 base::AutoLock lock(lock_);
129 if (!task_queue_manager_)
130 return false;
132 base::PendingTask pending_task(from_here, task, base::TimeTicks(),
133 task_type != TaskType::NON_NESTABLE);
134 task_queue_manager_->DidQueueTask(&pending_task);
136 if (delay > base::TimeDelta()) {
137 pending_task.delayed_run_time = task_queue_manager_->Now() + delay;
138 delayed_task_run_times_.push(pending_task.delayed_run_time);
139 return task_queue_manager_->PostDelayedTask(
140 FROM_HERE, Bind(&TaskQueue::EnqueueTask, this, pending_task), delay);
142 EnqueueTaskLocked(pending_task);
143 return true;
146 bool TaskQueue::IsQueueEmpty() const {
147 if (!work_queue_.empty())
148 return false;
151 base::AutoLock lock(lock_);
152 return incoming_queue_.empty();
156 bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) {
157 lock_.AssertAcquired();
158 // A null task is passed when UpdateQueue is called before any task is run.
159 // In this case we don't want to pump an after_wakeup queue, so return true
160 // here.
161 if (!task)
162 return true;
164 // Return false if there are no task in the incoming queue.
165 if (incoming_queue_.empty())
166 return false;
168 base::PendingTask oldest_queued_task = incoming_queue_.front();
169 DCHECK(oldest_queued_task.delayed_run_time.is_null());
170 DCHECK(task->delayed_run_time.is_null());
172 // Note: the comparison is correct due to the fact that the PendingTask
173 // operator inverts its comparison operation in order to work well in a heap
174 // based priority queue.
175 return oldest_queued_task < *task;
178 bool TaskQueue::ShouldAutoPumpQueueLocked(
179 const base::PendingTask* previous_task) {
180 lock_.AssertAcquired();
181 if (pump_policy_ == TaskQueueManager::PumpPolicy::MANUAL)
182 return false;
183 if (pump_policy_ == TaskQueueManager::PumpPolicy::AFTER_WAKEUP &&
184 TaskIsOlderThanQueuedTasks(previous_task))
185 return false;
186 if (incoming_queue_.empty())
187 return false;
188 return true;
191 bool TaskQueue::UpdateWorkQueue(
192 base::TimeTicks* next_pending_delayed_task,
193 const base::PendingTask* previous_task) {
194 if (!work_queue_.empty())
195 return true;
198 base::AutoLock lock(lock_);
199 if (!delayed_task_run_times_.empty()) {
200 *next_pending_delayed_task =
201 std::min(*next_pending_delayed_task, delayed_task_run_times_.top());
203 if (!ShouldAutoPumpQueueLocked(previous_task))
204 return false;
205 work_queue_.Swap(&incoming_queue_);
206 TraceQueueSize(true);
207 return true;
211 base::PendingTask TaskQueue::TakeTaskFromWorkQueue() {
212 base::PendingTask pending_task = work_queue_.front();
213 work_queue_.pop();
214 TraceQueueSize(false);
215 return pending_task;
218 void TaskQueue::TraceQueueSize(bool is_locked) const {
219 bool is_tracing;
220 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
221 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), &is_tracing);
222 if (!is_tracing || !name_)
223 return;
224 if (!is_locked)
225 lock_.Acquire();
226 else
227 lock_.AssertAcquired();
228 TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), name_,
229 incoming_queue_.size() + work_queue_.size());
230 if (!is_locked)
231 lock_.Release();
234 void TaskQueue::EnqueueTask(const base::PendingTask& pending_task) {
235 base::AutoLock lock(lock_);
236 EnqueueTaskLocked(pending_task);
239 void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) {
240 lock_.AssertAcquired();
241 if (!task_queue_manager_)
242 return;
243 if (pump_policy_ == TaskQueueManager::PumpPolicy::AUTO &&
244 incoming_queue_.empty())
245 task_queue_manager_->MaybePostDoWorkOnMainRunner();
246 incoming_queue_.push(pending_task);
248 if (!pending_task.delayed_run_time.is_null()) {
249 // Update the time of the next pending delayed task.
250 while (!delayed_task_run_times_.empty() &&
251 delayed_task_run_times_.top() <= pending_task.delayed_run_time) {
252 delayed_task_run_times_.pop();
254 // Clear the delayed run time because we've already applied the delay
255 // before getting here.
256 incoming_queue_.back().delayed_run_time = base::TimeTicks();
258 TraceQueueSize(true);
261 void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) {
262 base::AutoLock lock(lock_);
263 if (pump_policy == TaskQueueManager::PumpPolicy::AUTO &&
264 pump_policy_ != TaskQueueManager::PumpPolicy::AUTO) {
265 PumpQueueLocked();
267 pump_policy_ = pump_policy;
270 void TaskQueue::PumpQueueLocked() {
271 lock_.AssertAcquired();
272 while (!incoming_queue_.empty()) {
273 work_queue_.push(incoming_queue_.front());
274 incoming_queue_.pop();
276 if (!work_queue_.empty())
277 task_queue_manager_->MaybePostDoWorkOnMainRunner();
280 void TaskQueue::PumpQueue() {
281 base::AutoLock lock(lock_);
282 PumpQueueLocked();
285 void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const {
286 base::AutoLock lock(lock_);
287 state->BeginDictionary();
288 if (name_)
289 state->SetString("name", name_);
290 state->SetString("pump_policy", PumpPolicyToString(pump_policy_));
291 state->BeginArray("incoming_queue");
292 QueueAsValueInto(incoming_queue_, state);
293 state->EndArray();
294 state->BeginArray("work_queue");
295 QueueAsValueInto(work_queue_, state);
296 state->EndArray();
297 state->EndDictionary();
300 // static
301 const char* TaskQueue::PumpPolicyToString(
302 TaskQueueManager::PumpPolicy pump_policy) {
303 switch (pump_policy) {
304 case TaskQueueManager::PumpPolicy::AUTO:
305 return "auto";
306 case TaskQueueManager::PumpPolicy::AFTER_WAKEUP:
307 return "after_wakeup";
308 case TaskQueueManager::PumpPolicy::MANUAL:
309 return "manual";
310 default:
311 NOTREACHED();
312 return nullptr;
316 // static
317 void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue,
318 base::trace_event::TracedValue* state) {
319 base::TaskQueue queue_copy(queue);
320 while (!queue_copy.empty()) {
321 TaskAsValueInto(queue_copy.front(), state);
322 queue_copy.pop();
326 // static
327 void TaskQueue::TaskAsValueInto(const base::PendingTask& task,
328 base::trace_event::TracedValue* state) {
329 state->BeginDictionary();
330 state->SetString("posted_from", task.posted_from.ToString());
331 state->SetInteger("sequence_num", task.sequence_num);
332 state->SetBoolean("nestable", task.nestable);
333 state->SetBoolean("is_high_res", task.is_high_res);
334 state->SetDouble(
335 "delayed_run_time",
336 (task.delayed_run_time - base::TimeTicks()).InMicroseconds() / 1000.0L);
337 state->EndDictionary();
340 } // namespace internal
342 TaskQueueManager::TaskQueueManager(
343 size_t task_queue_count,
344 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
345 TaskQueueSelector* selector)
346 : main_task_runner_(main_task_runner),
347 selector_(selector),
348 pending_dowork_count_(0),
349 work_batch_size_(1),
350 time_source_(nullptr),
351 weak_factory_(this) {
352 DCHECK(main_task_runner->RunsTasksOnCurrentThread());
353 TRACE_EVENT_OBJECT_CREATED_WITH_ID(
354 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
355 this);
357 task_queue_manager_weak_ptr_ = weak_factory_.GetWeakPtr();
358 for (size_t i = 0; i < task_queue_count; i++) {
359 scoped_refptr<internal::TaskQueue> queue(
360 make_scoped_refptr(new internal::TaskQueue(this)));
361 queues_.push_back(queue);
364 std::vector<const base::TaskQueue*> work_queues;
365 for (const auto& queue: queues_)
366 work_queues.push_back(&queue->work_queue());
367 selector_->RegisterWorkQueues(work_queues);
370 TaskQueueManager::~TaskQueueManager() {
371 TRACE_EVENT_OBJECT_DELETED_WITH_ID(
372 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager",
373 this);
374 for (auto& queue : queues_)
375 queue->WillDeleteTaskQueueManager();
378 internal::TaskQueue* TaskQueueManager::Queue(size_t queue_index) const {
379 DCHECK_LT(queue_index, queues_.size());
380 return queues_[queue_index].get();
383 scoped_refptr<base::SingleThreadTaskRunner>
384 TaskQueueManager::TaskRunnerForQueue(size_t queue_index) const {
385 return Queue(queue_index);
388 bool TaskQueueManager::IsQueueEmpty(size_t queue_index) const {
389 internal::TaskQueue* queue = Queue(queue_index);
390 return queue->IsQueueEmpty();
393 void TaskQueueManager::SetPumpPolicy(size_t queue_index,
394 PumpPolicy pump_policy) {
395 DCHECK(main_thread_checker_.CalledOnValidThread());
396 internal::TaskQueue* queue = Queue(queue_index);
397 queue->SetPumpPolicy(pump_policy);
400 void TaskQueueManager::PumpQueue(size_t queue_index) {
401 DCHECK(main_thread_checker_.CalledOnValidThread());
402 internal::TaskQueue* queue = Queue(queue_index);
403 queue->PumpQueue();
406 bool TaskQueueManager::UpdateWorkQueues(
407 base::TimeTicks* next_pending_delayed_task,
408 const base::PendingTask* previous_task) {
409 // TODO(skyostil): This is not efficient when the number of queues grows very
410 // large due to the number of locks taken. Consider optimizing when we get
411 // there.
412 DCHECK(main_thread_checker_.CalledOnValidThread());
413 bool has_work = false;
414 for (auto& queue : queues_) {
415 has_work |= queue->UpdateWorkQueue(next_pending_delayed_task,
416 previous_task);
417 if (!queue->work_queue().empty()) {
418 // Currently we should not be getting tasks with delayed run times in any
419 // of the work queues.
420 DCHECK(queue->work_queue().front().delayed_run_time.is_null());
423 return has_work;
426 void TaskQueueManager::MaybePostDoWorkOnMainRunner() {
427 bool on_main_thread = main_task_runner_->BelongsToCurrentThread();
428 if (on_main_thread) {
429 // We only want one pending DoWork posted from the main thread, or we risk
430 // an explosion of pending DoWorks which could starve out everything else.
431 if (pending_dowork_count_ > 0) {
432 return;
434 pending_dowork_count_++;
437 main_task_runner_->PostTask(
438 FROM_HERE, Bind(&TaskQueueManager::DoWork, task_queue_manager_weak_ptr_,
439 on_main_thread));
442 void TaskQueueManager::DoWork(bool posted_from_main_thread) {
443 if (posted_from_main_thread) {
444 pending_dowork_count_--;
445 DCHECK_GE(pending_dowork_count_, 0);
447 DCHECK(main_thread_checker_.CalledOnValidThread());
449 base::TimeTicks next_pending_delayed_task(
450 base::TimeTicks::FromInternalValue(kMaxTimeTicks));
452 // Pass nullptr to UpdateWorkQueues here to prevent waking up an
453 // pump-after-wakeup queue.
454 if (!UpdateWorkQueues(&next_pending_delayed_task, nullptr))
455 return;
457 base::PendingTask previous_task((tracked_objects::Location()),
458 (base::Closure()));
459 for (int i = 0; i < work_batch_size_; i++) {
460 // Interrupt the work batch if we should run the next delayed task.
461 if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks &&
462 Now() >= next_pending_delayed_task)
463 return;
465 size_t queue_index;
466 if (!SelectWorkQueueToService(&queue_index))
467 return;
468 // Note that this function won't post another call to DoWork if one is
469 // already pending, so it is safe to call it in a loop.
470 MaybePostDoWorkOnMainRunner();
471 ProcessTaskFromWorkQueue(queue_index, i > 0, &previous_task);
473 if (!UpdateWorkQueues(&next_pending_delayed_task, &previous_task))
474 return;
478 bool TaskQueueManager::SelectWorkQueueToService(size_t* out_queue_index) {
479 bool should_run = selector_->SelectWorkQueueToService(out_queue_index);
480 TRACE_EVENT_OBJECT_SNAPSHOT_WITH_ID(
481 TRACE_DISABLED_BY_DEFAULT("renderer.scheduler"), "TaskQueueManager", this,
482 AsValueWithSelectorResult(should_run, *out_queue_index));
483 return should_run;
486 void TaskQueueManager::DidQueueTask(base::PendingTask* pending_task) {
487 pending_task->sequence_num = task_sequence_num_.GetNext();
488 task_annotator_.DidQueueTask("TaskQueueManager::PostTask", *pending_task);
491 void TaskQueueManager::ProcessTaskFromWorkQueue(
492 size_t queue_index,
493 bool has_previous_task,
494 base::PendingTask* previous_task) {
495 DCHECK(main_thread_checker_.CalledOnValidThread());
496 internal::TaskQueue* queue = Queue(queue_index);
497 base::PendingTask pending_task = queue->TakeTaskFromWorkQueue();
498 if (!pending_task.nestable) {
499 // Defer non-nestable work to the main task runner. NOTE these tasks can be
500 // arbitrarily delayed so the additional delay should not be a problem.
501 main_task_runner_->PostNonNestableTask(pending_task.posted_from,
502 pending_task.task);
503 } else {
504 // Suppress "will" task observer notifications for the first and "did"
505 // notifications for the last task in the batch to avoid duplicate
506 // notifications.
507 if (has_previous_task) {
508 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
509 DidProcessTask(*previous_task));
510 FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_,
511 WillProcessTask(pending_task));
513 task_annotator_.RunTask("TaskQueueManager::PostTask",
514 "TaskQueueManager::RunTask", pending_task);
515 pending_task.task.Reset();
516 *previous_task = pending_task;
520 bool TaskQueueManager::RunsTasksOnCurrentThread() const {
521 return main_task_runner_->RunsTasksOnCurrentThread();
524 bool TaskQueueManager::PostDelayedTask(
525 const tracked_objects::Location& from_here,
526 const base::Closure& task,
527 base::TimeDelta delay) {
528 DCHECK(delay > base::TimeDelta());
529 return main_task_runner_->PostDelayedTask(from_here, task, delay);
532 void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) {
533 DCHECK(main_thread_checker_.CalledOnValidThread());
534 internal::TaskQueue* queue = Queue(queue_index);
535 queue->set_name(name);
538 void TaskQueueManager::SetWorkBatchSize(int work_batch_size) {
539 DCHECK(main_thread_checker_.CalledOnValidThread());
540 DCHECK_GE(work_batch_size, 1);
541 work_batch_size_ = work_batch_size;
544 void TaskQueueManager::AddTaskObserver(
545 base::MessageLoop::TaskObserver* task_observer) {
546 DCHECK(main_thread_checker_.CalledOnValidThread());
547 base::MessageLoop::current()->AddTaskObserver(task_observer);
548 task_observers_.AddObserver(task_observer);
551 void TaskQueueManager::RemoveTaskObserver(
552 base::MessageLoop::TaskObserver* task_observer) {
553 DCHECK(main_thread_checker_.CalledOnValidThread());
554 base::MessageLoop::current()->RemoveTaskObserver(task_observer);
555 task_observers_.RemoveObserver(task_observer);
558 void TaskQueueManager::SetTimeSourceForTesting(
559 scoped_refptr<cc::TestNowSource> time_source) {
560 DCHECK(main_thread_checker_.CalledOnValidThread());
561 time_source_ = time_source;
564 base::TimeTicks TaskQueueManager::Now() const {
565 return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now();
568 scoped_refptr<base::trace_event::ConvertableToTraceFormat>
569 TaskQueueManager::AsValueWithSelectorResult(bool should_run,
570 size_t selected_queue) const {
571 DCHECK(main_thread_checker_.CalledOnValidThread());
572 scoped_refptr<base::trace_event::TracedValue> state =
573 new base::trace_event::TracedValue();
574 state->BeginArray("queues");
575 for (auto& queue : queues_)
576 queue->AsValueInto(state.get());
577 state->EndArray();
578 state->BeginDictionary("selector");
579 selector_->AsValueInto(state.get());
580 state->EndDictionary();
581 if (should_run)
582 state->SetInteger("selected_queue", selected_queue);
583 return state;
586 } // namespace content