Elim cr-checkbox
[chromium-blink-merge.git] / chrome / browser / sync_file_system / drive_backend / sync_task_manager.cc
blob16e1f9bf53804206a0503fd3ff7820d2a9fb8695
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/memory/scoped_ptr.h"
10 #include "base/sequenced_task_runner.h"
11 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
12 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
13 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
15 using storage::FileSystemURL;
17 namespace sync_file_system {
18 namespace drive_backend {
20 namespace {
22 class SyncTaskAdapter : public ExclusiveTask {
23 public:
24 explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
25 ~SyncTaskAdapter() override {}
27 void RunExclusive(const SyncStatusCallback& callback) override {
28 task_.Run(callback);
31 private:
32 SyncTaskManager::Task task_;
34 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
37 } // namespace
39 SyncTaskManager::PendingTask::PendingTask() {}
41 SyncTaskManager::PendingTask::PendingTask(
42 const base::Closure& task, Priority pri, int seq)
43 : task(task), priority(pri), seq(seq) {}
45 SyncTaskManager::PendingTask::~PendingTask() {}
47 bool SyncTaskManager::PendingTaskComparator::operator()(
48 const PendingTask& left,
49 const PendingTask& right) const {
50 if (left.priority != right.priority)
51 return left.priority < right.priority;
52 return left.seq > right.seq;
55 SyncTaskManager::SyncTaskManager(
56 base::WeakPtr<Client> client,
57 size_t maximum_background_task,
58 const scoped_refptr<base::SequencedTaskRunner>& task_runner,
59 const scoped_refptr<base::SequencedWorkerPool>& worker_pool)
60 : client_(client),
61 maximum_background_task_(maximum_background_task),
62 pending_task_seq_(0),
63 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
64 task_runner_(task_runner),
65 worker_pool_(worker_pool),
66 weak_ptr_factory_(this) {
69 SyncTaskManager::~SyncTaskManager() {
70 weak_ptr_factory_.InvalidateWeakPtrs();
72 client_.reset();
73 token_.reset();
76 void SyncTaskManager::Initialize(SyncStatusCode status) {
77 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
78 DCHECK(!token_);
79 NotifyTaskDone(
80 SyncTaskToken::CreateForForegroundTask(
81 weak_ptr_factory_.GetWeakPtr(), task_runner_.get()),
82 status);
85 void SyncTaskManager::ScheduleTask(
86 const tracked_objects::Location& from_here,
87 const Task& task,
88 Priority priority,
89 const SyncStatusCallback& callback) {
90 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
92 ScheduleSyncTask(from_here,
93 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
94 priority,
95 callback);
98 void SyncTaskManager::ScheduleSyncTask(
99 const tracked_objects::Location& from_here,
100 scoped_ptr<SyncTask> task,
101 Priority priority,
102 const SyncStatusCallback& callback) {
103 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
105 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
106 if (!token) {
107 PushPendingTask(
108 base::Bind(&SyncTaskManager::ScheduleSyncTask,
109 weak_ptr_factory_.GetWeakPtr(), from_here,
110 base::Passed(&task), priority, callback),
111 priority);
112 return;
114 RunTask(token.Pass(), task.Pass());
117 bool SyncTaskManager::ScheduleTaskIfIdle(
118 const tracked_objects::Location& from_here,
119 const Task& task,
120 const SyncStatusCallback& callback) {
121 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
123 return ScheduleSyncTaskIfIdle(
124 from_here,
125 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
126 callback);
129 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
130 const tracked_objects::Location& from_here,
131 scoped_ptr<SyncTask> task,
132 const SyncStatusCallback& callback) {
133 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
135 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
136 if (!token)
137 return false;
138 RunTask(token.Pass(), task.Pass());
139 return true;
142 // static
143 void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
144 SyncStatusCode status) {
145 DCHECK(token);
147 SyncTaskManager* manager = token->manager();
148 if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
149 DCHECK(!manager);
150 SyncStatusCallback callback = token->callback();
151 token->clear_callback();
152 callback.Run(status);
153 return;
156 if (manager)
157 manager->NotifyTaskDoneBody(token.Pass(), status);
160 // static
161 void SyncTaskManager::UpdateTaskBlocker(
162 scoped_ptr<SyncTaskToken> current_task_token,
163 scoped_ptr<TaskBlocker> task_blocker,
164 const Continuation& continuation) {
165 DCHECK(current_task_token);
167 SyncTaskManager* manager = current_task_token->manager();
168 if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
169 DCHECK(!manager);
170 continuation.Run(current_task_token.Pass());
171 return;
174 if (!manager)
175 return;
177 scoped_ptr<SyncTaskToken> foreground_task_token;
178 scoped_ptr<SyncTaskToken> background_task_token;
179 scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
180 if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
181 foreground_task_token = current_task_token.Pass();
182 else
183 background_task_token = current_task_token.Pass();
185 manager->UpdateTaskBlockerBody(foreground_task_token.Pass(),
186 background_task_token.Pass(),
187 task_log.Pass(),
188 task_blocker.Pass(),
189 continuation);
192 bool SyncTaskManager::IsRunningTask(int64 token_id) const {
193 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
195 // If the client is gone, all task should be aborted.
196 if (!client_)
197 return false;
199 if (token_id == SyncTaskToken::kForegroundTaskTokenID)
200 return true;
202 return ContainsKey(running_background_tasks_, token_id);
205 void SyncTaskManager::DetachFromSequence() {
206 sequence_checker_.DetachFromSequence();
209 bool SyncTaskManager::ShouldTrackTaskToken() const {
210 return !worker_pool_ || !worker_pool_->IsShutdownInProgress();
213 void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
214 SyncStatusCode status) {
215 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
216 DCHECK(token);
218 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
219 << " (" << SyncStatusCodeToString(status) << ")"
220 << " " << token->location().ToString();
222 if (token->task_blocker()) {
223 dependency_manager_.Erase(token->task_blocker());
224 token->clear_task_blocker();
227 if (client_) {
228 if (token->has_task_log()) {
229 token->FinalizeTaskLog(SyncStatusCodeToString(status));
230 client_->RecordTaskLog(token->PassTaskLog());
234 scoped_ptr<SyncTask> task;
235 SyncStatusCallback callback = token->callback();
236 token->clear_callback();
237 if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
238 token_ = token.Pass();
239 task = running_foreground_task_.Pass();
240 } else {
241 task = running_background_tasks_.take_and_erase(token->token_id());
244 // Acquire the token to prevent a new task to jump into the queue.
245 token = token_.Pass();
247 bool task_used_network = false;
248 if (task)
249 task_used_network = task->used_network();
251 if (client_)
252 client_->NotifyLastOperationStatus(status, task_used_network);
254 if (!callback.is_null())
255 callback.Run(status);
257 // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
258 // making the call-chaing longer.
259 task_runner_->PostTask(
260 FROM_HERE,
261 base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask,
262 weak_ptr_factory_.GetWeakPtr(), base::Passed(&token)));
265 void SyncTaskManager::UpdateTaskBlockerBody(
266 scoped_ptr<SyncTaskToken> foreground_task_token,
267 scoped_ptr<SyncTaskToken> background_task_token,
268 scoped_ptr<TaskLogger::TaskLog> task_log,
269 scoped_ptr<TaskBlocker> task_blocker,
270 const Continuation& continuation) {
271 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
273 // Run the task directly if the parallelization is disabled.
274 if (!maximum_background_task_) {
275 DCHECK(foreground_task_token);
276 DCHECK(!background_task_token);
277 foreground_task_token->SetTaskLog(task_log.Pass());
278 continuation.Run(foreground_task_token.Pass());
279 return;
282 // Clear existing |task_blocker| from |dependency_manager_| before
283 // getting |foreground_task_token|, so that we can avoid dead lock.
284 if (background_task_token && background_task_token->task_blocker()) {
285 dependency_manager_.Erase(background_task_token->task_blocker());
286 background_task_token->clear_task_blocker();
289 // Try to get |foreground_task_token|. If it's not available, wait for
290 // current foreground task to finish.
291 if (!foreground_task_token) {
292 DCHECK(background_task_token);
293 foreground_task_token = GetToken(background_task_token->location(),
294 SyncStatusCallback());
295 if (!foreground_task_token) {
296 PushPendingTask(
297 base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
298 weak_ptr_factory_.GetWeakPtr(),
299 base::Passed(&foreground_task_token),
300 base::Passed(&background_task_token),
301 base::Passed(&task_log),
302 base::Passed(&task_blocker),
303 continuation),
304 PRIORITY_HIGH);
305 MaybeStartNextForegroundTask(nullptr);
306 return;
310 // Check if the task can run as a background task now.
311 // If there are too many task running or any other task blocks current
312 // task, wait for any other task to finish.
313 bool task_number_limit_exceeded =
314 !background_task_token &&
315 running_background_tasks_.size() >= maximum_background_task_;
316 if (task_number_limit_exceeded ||
317 !dependency_manager_.Insert(task_blocker.get())) {
318 DCHECK(!running_background_tasks_.empty());
319 DCHECK(pending_backgrounding_task_.is_null());
321 // Wait for NotifyTaskDone to release a |task_blocker|.
322 pending_backgrounding_task_ =
323 base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
324 weak_ptr_factory_.GetWeakPtr(),
325 base::Passed(&foreground_task_token),
326 base::Passed(&background_task_token),
327 base::Passed(&task_log),
328 base::Passed(&task_blocker),
329 continuation);
330 return;
333 if (background_task_token) {
334 background_task_token->set_task_blocker(task_blocker.Pass());
335 } else {
336 tracked_objects::Location from_here = foreground_task_token->location();
337 SyncStatusCallback callback = foreground_task_token->callback();
338 foreground_task_token->clear_callback();
340 background_task_token =
341 SyncTaskToken::CreateForBackgroundTask(weak_ptr_factory_.GetWeakPtr(),
342 task_runner_.get(),
343 task_token_seq_++,
344 task_blocker.Pass());
345 background_task_token->UpdateTask(from_here, callback);
346 running_background_tasks_.set(background_task_token->token_id(),
347 running_foreground_task_.Pass());
350 token_ = foreground_task_token.Pass();
351 MaybeStartNextForegroundTask(nullptr);
352 background_task_token->SetTaskLog(task_log.Pass());
353 continuation.Run(background_task_token.Pass());
356 scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
357 const tracked_objects::Location& from_here,
358 const SyncStatusCallback& callback) {
359 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
361 if (!token_)
362 return nullptr;
363 token_->UpdateTask(from_here, callback);
364 return token_.Pass();
367 void SyncTaskManager::PushPendingTask(
368 const base::Closure& closure, Priority priority) {
369 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
371 pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
374 void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
375 scoped_ptr<SyncTask> task) {
376 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
377 DCHECK(!running_foreground_task_);
379 running_foreground_task_ = task.Pass();
380 running_foreground_task_->RunPreflight(token.Pass());
383 void SyncTaskManager::MaybeStartNextForegroundTask(
384 scoped_ptr<SyncTaskToken> token) {
385 DCHECK(sequence_checker_.CalledOnValidSequencedThread());
387 if (token) {
388 DCHECK(!token_);
389 token_ = token.Pass();
392 if (!pending_backgrounding_task_.is_null()) {
393 base::Closure closure = pending_backgrounding_task_;
394 pending_backgrounding_task_.Reset();
395 closure.Run();
396 return;
399 if (!token_)
400 return;
402 if (!pending_tasks_.empty()) {
403 base::Closure closure = pending_tasks_.top().task;
404 pending_tasks_.pop();
405 closure.Run();
406 return;
409 if (client_)
410 client_->MaybeScheduleNextTask();
413 } // namespace drive_backend
414 } // namespace sync_file_system