1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
8 #include "base/location.h"
9 #include "base/memory/scoped_ptr.h"
10 #include "base/sequenced_task_runner.h"
11 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
12 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
13 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
15 using storage::FileSystemURL
;
17 namespace sync_file_system
{
18 namespace drive_backend
{
22 class SyncTaskAdapter
: public ExclusiveTask
{
24 explicit SyncTaskAdapter(const SyncTaskManager::Task
& task
) : task_(task
) {}
25 ~SyncTaskAdapter() override
{}
27 void RunExclusive(const SyncStatusCallback
& callback
) override
{
32 SyncTaskManager::Task task_
;
34 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter
);
39 SyncTaskManager::PendingTask::PendingTask() {}
41 SyncTaskManager::PendingTask::PendingTask(
42 const base::Closure
& task
, Priority pri
, int seq
)
43 : task(task
), priority(pri
), seq(seq
) {}
45 SyncTaskManager::PendingTask::~PendingTask() {}
47 bool SyncTaskManager::PendingTaskComparator::operator()(
48 const PendingTask
& left
,
49 const PendingTask
& right
) const {
50 if (left
.priority
!= right
.priority
)
51 return left
.priority
< right
.priority
;
52 return left
.seq
> right
.seq
;
55 SyncTaskManager::SyncTaskManager(
56 base::WeakPtr
<Client
> client
,
57 size_t maximum_background_task
,
58 const scoped_refptr
<base::SequencedTaskRunner
>& task_runner
,
59 const scoped_refptr
<base::SequencedWorkerPool
>& worker_pool
)
61 maximum_background_task_(maximum_background_task
),
63 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID
),
64 task_runner_(task_runner
),
65 worker_pool_(worker_pool
),
66 weak_ptr_factory_(this) {
69 SyncTaskManager::~SyncTaskManager() {
70 weak_ptr_factory_
.InvalidateWeakPtrs();
76 void SyncTaskManager::Initialize(SyncStatusCode status
) {
77 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
80 SyncTaskToken::CreateForForegroundTask(
81 weak_ptr_factory_
.GetWeakPtr(), task_runner_
.get()),
85 void SyncTaskManager::ScheduleTask(
86 const tracked_objects::Location
& from_here
,
89 const SyncStatusCallback
& callback
) {
90 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
92 ScheduleSyncTask(from_here
,
93 scoped_ptr
<SyncTask
>(new SyncTaskAdapter(task
)),
98 void SyncTaskManager::ScheduleSyncTask(
99 const tracked_objects::Location
& from_here
,
100 scoped_ptr
<SyncTask
> task
,
102 const SyncStatusCallback
& callback
) {
103 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
105 scoped_ptr
<SyncTaskToken
> token(GetToken(from_here
, callback
));
108 base::Bind(&SyncTaskManager::ScheduleSyncTask
,
109 weak_ptr_factory_
.GetWeakPtr(), from_here
,
110 base::Passed(&task
), priority
, callback
),
114 RunTask(token
.Pass(), task
.Pass());
117 bool SyncTaskManager::ScheduleTaskIfIdle(
118 const tracked_objects::Location
& from_here
,
120 const SyncStatusCallback
& callback
) {
121 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
123 return ScheduleSyncTaskIfIdle(
125 scoped_ptr
<SyncTask
>(new SyncTaskAdapter(task
)),
129 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
130 const tracked_objects::Location
& from_here
,
131 scoped_ptr
<SyncTask
> task
,
132 const SyncStatusCallback
& callback
) {
133 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
135 scoped_ptr
<SyncTaskToken
> token(GetToken(from_here
, callback
));
138 RunTask(token
.Pass(), task
.Pass());
143 void SyncTaskManager::NotifyTaskDone(scoped_ptr
<SyncTaskToken
> token
,
144 SyncStatusCode status
) {
147 SyncTaskManager
* manager
= token
->manager();
148 if (token
->token_id() == SyncTaskToken::kTestingTaskTokenID
) {
150 SyncStatusCallback callback
= token
->callback();
151 token
->clear_callback();
152 callback
.Run(status
);
157 manager
->NotifyTaskDoneBody(token
.Pass(), status
);
161 void SyncTaskManager::UpdateTaskBlocker(
162 scoped_ptr
<SyncTaskToken
> current_task_token
,
163 scoped_ptr
<TaskBlocker
> task_blocker
,
164 const Continuation
& continuation
) {
165 DCHECK(current_task_token
);
167 SyncTaskManager
* manager
= current_task_token
->manager();
168 if (current_task_token
->token_id() == SyncTaskToken::kTestingTaskTokenID
) {
170 continuation
.Run(current_task_token
.Pass());
177 scoped_ptr
<SyncTaskToken
> foreground_task_token
;
178 scoped_ptr
<SyncTaskToken
> background_task_token
;
179 scoped_ptr
<TaskLogger::TaskLog
> task_log
= current_task_token
->PassTaskLog();
180 if (current_task_token
->token_id() == SyncTaskToken::kForegroundTaskTokenID
)
181 foreground_task_token
= current_task_token
.Pass();
183 background_task_token
= current_task_token
.Pass();
185 manager
->UpdateTaskBlockerBody(foreground_task_token
.Pass(),
186 background_task_token
.Pass(),
192 bool SyncTaskManager::IsRunningTask(int64 token_id
) const {
193 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
195 // If the client is gone, all task should be aborted.
199 if (token_id
== SyncTaskToken::kForegroundTaskTokenID
)
202 return ContainsKey(running_background_tasks_
, token_id
);
205 void SyncTaskManager::DetachFromSequence() {
206 sequence_checker_
.DetachFromSequence();
209 bool SyncTaskManager::ShouldTrackTaskToken() const {
210 return !worker_pool_
|| !worker_pool_
->IsShutdownInProgress();
213 void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr
<SyncTaskToken
> token
,
214 SyncStatusCode status
) {
215 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
218 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
219 << " (" << SyncStatusCodeToString(status
) << ")"
220 << " " << token
->location().ToString();
222 if (token
->task_blocker()) {
223 dependency_manager_
.Erase(token
->task_blocker());
224 token
->clear_task_blocker();
228 if (token
->has_task_log()) {
229 token
->FinalizeTaskLog(SyncStatusCodeToString(status
));
230 client_
->RecordTaskLog(token
->PassTaskLog());
234 scoped_ptr
<SyncTask
> task
;
235 SyncStatusCallback callback
= token
->callback();
236 token
->clear_callback();
237 if (token
->token_id() == SyncTaskToken::kForegroundTaskTokenID
) {
238 token_
= token
.Pass();
239 task
= running_foreground_task_
.Pass();
241 task
= running_background_tasks_
.take_and_erase(token
->token_id());
244 // Acquire the token to prevent a new task to jump into the queue.
245 token
= token_
.Pass();
247 bool task_used_network
= false;
249 task_used_network
= task
->used_network();
252 client_
->NotifyLastOperationStatus(status
, task_used_network
);
254 if (!callback
.is_null())
255 callback
.Run(status
);
257 // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
258 // making the call-chaing longer.
259 task_runner_
->PostTask(
261 base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask
,
262 weak_ptr_factory_
.GetWeakPtr(), base::Passed(&token
)));
265 void SyncTaskManager::UpdateTaskBlockerBody(
266 scoped_ptr
<SyncTaskToken
> foreground_task_token
,
267 scoped_ptr
<SyncTaskToken
> background_task_token
,
268 scoped_ptr
<TaskLogger::TaskLog
> task_log
,
269 scoped_ptr
<TaskBlocker
> task_blocker
,
270 const Continuation
& continuation
) {
271 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
273 // Run the task directly if the parallelization is disabled.
274 if (!maximum_background_task_
) {
275 DCHECK(foreground_task_token
);
276 DCHECK(!background_task_token
);
277 foreground_task_token
->SetTaskLog(task_log
.Pass());
278 continuation
.Run(foreground_task_token
.Pass());
282 // Clear existing |task_blocker| from |dependency_manager_| before
283 // getting |foreground_task_token|, so that we can avoid dead lock.
284 if (background_task_token
&& background_task_token
->task_blocker()) {
285 dependency_manager_
.Erase(background_task_token
->task_blocker());
286 background_task_token
->clear_task_blocker();
289 // Try to get |foreground_task_token|. If it's not available, wait for
290 // current foreground task to finish.
291 if (!foreground_task_token
) {
292 DCHECK(background_task_token
);
293 foreground_task_token
= GetToken(background_task_token
->location(),
294 SyncStatusCallback());
295 if (!foreground_task_token
) {
297 base::Bind(&SyncTaskManager::UpdateTaskBlockerBody
,
298 weak_ptr_factory_
.GetWeakPtr(),
299 base::Passed(&foreground_task_token
),
300 base::Passed(&background_task_token
),
301 base::Passed(&task_log
),
302 base::Passed(&task_blocker
),
305 MaybeStartNextForegroundTask(nullptr);
310 // Check if the task can run as a background task now.
311 // If there are too many task running or any other task blocks current
312 // task, wait for any other task to finish.
313 bool task_number_limit_exceeded
=
314 !background_task_token
&&
315 running_background_tasks_
.size() >= maximum_background_task_
;
316 if (task_number_limit_exceeded
||
317 !dependency_manager_
.Insert(task_blocker
.get())) {
318 DCHECK(!running_background_tasks_
.empty());
319 DCHECK(pending_backgrounding_task_
.is_null());
321 // Wait for NotifyTaskDone to release a |task_blocker|.
322 pending_backgrounding_task_
=
323 base::Bind(&SyncTaskManager::UpdateTaskBlockerBody
,
324 weak_ptr_factory_
.GetWeakPtr(),
325 base::Passed(&foreground_task_token
),
326 base::Passed(&background_task_token
),
327 base::Passed(&task_log
),
328 base::Passed(&task_blocker
),
333 if (background_task_token
) {
334 background_task_token
->set_task_blocker(task_blocker
.Pass());
336 tracked_objects::Location from_here
= foreground_task_token
->location();
337 SyncStatusCallback callback
= foreground_task_token
->callback();
338 foreground_task_token
->clear_callback();
340 background_task_token
=
341 SyncTaskToken::CreateForBackgroundTask(weak_ptr_factory_
.GetWeakPtr(),
344 task_blocker
.Pass());
345 background_task_token
->UpdateTask(from_here
, callback
);
346 running_background_tasks_
.set(background_task_token
->token_id(),
347 running_foreground_task_
.Pass());
350 token_
= foreground_task_token
.Pass();
351 MaybeStartNextForegroundTask(nullptr);
352 background_task_token
->SetTaskLog(task_log
.Pass());
353 continuation
.Run(background_task_token
.Pass());
356 scoped_ptr
<SyncTaskToken
> SyncTaskManager::GetToken(
357 const tracked_objects::Location
& from_here
,
358 const SyncStatusCallback
& callback
) {
359 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
363 token_
->UpdateTask(from_here
, callback
);
364 return token_
.Pass();
367 void SyncTaskManager::PushPendingTask(
368 const base::Closure
& closure
, Priority priority
) {
369 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
371 pending_tasks_
.push(PendingTask(closure
, priority
, pending_task_seq_
++));
374 void SyncTaskManager::RunTask(scoped_ptr
<SyncTaskToken
> token
,
375 scoped_ptr
<SyncTask
> task
) {
376 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
377 DCHECK(!running_foreground_task_
);
379 running_foreground_task_
= task
.Pass();
380 running_foreground_task_
->RunPreflight(token
.Pass());
383 void SyncTaskManager::MaybeStartNextForegroundTask(
384 scoped_ptr
<SyncTaskToken
> token
) {
385 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
389 token_
= token
.Pass();
392 if (!pending_backgrounding_task_
.is_null()) {
393 base::Closure closure
= pending_backgrounding_task_
;
394 pending_backgrounding_task_
.Reset();
402 if (!pending_tasks_
.empty()) {
403 base::Closure closure
= pending_tasks_
.top().task
;
404 pending_tasks_
.pop();
410 client_
->MaybeScheduleNextTask();
413 } // namespace drive_backend
414 } // namespace sync_file_system