1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/drive/sync_client.h"
10 #include "base/thread_task_runner_handle.h"
11 #include "components/drive/drive.pb.h"
12 #include "components/drive/file_cache.h"
13 #include "components/drive/file_system/download_operation.h"
14 #include "components/drive/file_system/operation_delegate.h"
15 #include "components/drive/file_system_core_util.h"
16 #include "components/drive/job_scheduler.h"
17 #include "components/drive/sync/entry_update_performer.h"
18 #include "google_apis/drive/task_util.h"
25 // The delay constant is used to delay processing a sync task. We should not
26 // process SyncTasks immediately for the following reasons:
28 // 1) For fetching, the user may accidentally click on "Make available
29 // offline" checkbox on a file, and immediately cancel it in a second.
30 // It's a waste to fetch the file in this case.
32 // 2) For uploading, file writing via HTML5 file system API is performed in
33 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We
34 // shouldn't start uploading right after the step 1). Besides, the user
35 // may edit the same file repeatedly in a short period of time.
37 // TODO(satorux): We should find a way to handle the upload case more nicely,
38 // and shorten the delay. crbug.com/134774
39 const int kDelaySeconds
= 1;
41 // The delay constant is used to delay retrying a sync task on server errors.
42 const int kLongDelaySeconds
= 600;
44 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
45 // fetched (not present locally), to |to_update| if the file needs update.
46 void CollectBacklog(ResourceMetadata
* metadata
,
47 std::vector
<std::string
>* to_fetch
,
48 std::vector
<std::string
>* to_update
) {
52 scoped_ptr
<ResourceMetadata::Iterator
> it
= metadata
->GetIterator();
53 for (; !it
->IsAtEnd(); it
->Advance()) {
54 const std::string
& local_id
= it
->GetID();
55 const ResourceEntry
& entry
= it
->GetValue();
56 if (entry
.parent_local_id() == util::kDriveTrashDirLocalId
) {
57 to_update
->push_back(local_id
);
61 bool should_update
= false;
62 switch (entry
.metadata_edit_state()) {
63 case ResourceEntry::CLEAN
:
65 case ResourceEntry::SYNCING
:
66 case ResourceEntry::DIRTY
:
71 if (entry
.file_specific_info().cache_state().is_pinned() &&
72 !entry
.file_specific_info().cache_state().is_present())
73 to_fetch
->push_back(local_id
);
75 if (entry
.file_specific_info().cache_state().is_dirty())
79 to_update
->push_back(local_id
);
81 DCHECK(!it
->HasError());
84 // Iterates cache entries and collects IDs of ones with obsolete cache files.
85 void CheckExistingPinnedFiles(ResourceMetadata
* metadata
,
87 std::vector
<std::string
>* local_ids
) {
88 scoped_ptr
<ResourceMetadata::Iterator
> it
= metadata
->GetIterator();
89 for (; !it
->IsAtEnd(); it
->Advance()) {
90 const ResourceEntry
& entry
= it
->GetValue();
91 const FileCacheEntry
& cache_state
=
92 entry
.file_specific_info().cache_state();
93 const std::string
& local_id
= it
->GetID();
94 if (!cache_state
.is_pinned() || !cache_state
.is_present())
97 // If MD5s don't match, it indicates the local cache file is stale, unless
98 // the file is dirty (the MD5 is "local"). We should never re-fetch the
99 // file when we have a locally modified version.
100 if (entry
.file_specific_info().md5() == cache_state
.md5() ||
101 cache_state
.is_dirty())
104 FileError error
= cache
->Remove(local_id
);
105 if (error
!= FILE_ERROR_OK
) {
106 LOG(WARNING
) << "Failed to remove cache entry: " << local_id
;
110 error
= cache
->Pin(local_id
);
111 if (error
!= FILE_ERROR_OK
) {
112 LOG(WARNING
) << "Failed to pin cache entry: " << local_id
;
116 local_ids
->push_back(local_id
);
118 DCHECK(!it
->HasError());
121 // Gets the parent entry of the entry specified by the ID.
122 FileError
GetParentResourceEntry(ResourceMetadata
* metadata
,
123 const std::string
& local_id
,
124 ResourceEntry
* parent
) {
126 FileError error
= metadata
->GetResourceEntryById(local_id
, &entry
);
127 if (error
!= FILE_ERROR_OK
)
129 return metadata
->GetResourceEntryById(entry
.parent_local_id(), parent
);
134 SyncClient::SyncTask::SyncTask()
135 : state(SUSPENDED
), context(BACKGROUND
), should_run_again(false) {}
136 SyncClient::SyncTask::~SyncTask() {}
138 SyncClient::SyncClient(base::SequencedTaskRunner
* blocking_task_runner
,
139 file_system::OperationDelegate
* delegate
,
140 JobScheduler
* scheduler
,
141 ResourceMetadata
* metadata
,
143 LoaderController
* loader_controller
,
144 const base::FilePath
& temporary_file_directory
)
145 : blocking_task_runner_(blocking_task_runner
),
146 operation_delegate_(delegate
),
149 download_operation_(new file_system::DownloadOperation(
150 blocking_task_runner
,
155 temporary_file_directory
)),
156 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner
,
162 delay_(base::TimeDelta::FromSeconds(kDelaySeconds
)),
163 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds
)),
164 weak_ptr_factory_(this) {
167 SyncClient::~SyncClient() {
168 DCHECK(thread_checker_
.CalledOnValidThread());
171 void SyncClient::StartProcessingBacklog() {
172 DCHECK(thread_checker_
.CalledOnValidThread());
174 std::vector
<std::string
>* to_fetch
= new std::vector
<std::string
>;
175 std::vector
<std::string
>* to_update
= new std::vector
<std::string
>;
176 blocking_task_runner_
->PostTaskAndReply(
178 base::Bind(&CollectBacklog
, metadata_
, to_fetch
, to_update
),
179 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog
,
180 weak_ptr_factory_
.GetWeakPtr(),
181 base::Owned(to_fetch
),
182 base::Owned(to_update
)));
185 void SyncClient::StartCheckingExistingPinnedFiles() {
186 DCHECK(thread_checker_
.CalledOnValidThread());
188 std::vector
<std::string
>* local_ids
= new std::vector
<std::string
>;
189 blocking_task_runner_
->PostTaskAndReply(
191 base::Bind(&CheckExistingPinnedFiles
,
195 base::Bind(&SyncClient::AddFetchTasks
,
196 weak_ptr_factory_
.GetWeakPtr(),
197 base::Owned(local_ids
)));
200 void SyncClient::AddFetchTask(const std::string
& local_id
) {
201 DCHECK(thread_checker_
.CalledOnValidThread());
202 AddFetchTaskInternal(local_id
, delay_
);
205 void SyncClient::RemoveFetchTask(const std::string
& local_id
) {
206 DCHECK(thread_checker_
.CalledOnValidThread());
208 SyncTasks::iterator it
= tasks_
.find(SyncTasks::key_type(FETCH
, local_id
));
209 if (it
== tasks_
.end())
212 SyncTask
* task
= &it
->second
;
213 switch (task
->state
) {
216 OnTaskComplete(FETCH
, local_id
, FILE_ERROR_ABORT
);
219 if (!task
->cancel_closure
.is_null())
220 task
->cancel_closure
.Run();
225 void SyncClient::AddUpdateTask(const ClientContext
& context
,
226 const std::string
& local_id
) {
227 DCHECK(thread_checker_
.CalledOnValidThread());
228 AddUpdateTaskInternal(context
, local_id
, delay_
);
231 bool SyncClient:: WaitForUpdateTaskToComplete(
232 const std::string
& local_id
,
233 const FileOperationCallback
& callback
) {
234 DCHECK(thread_checker_
.CalledOnValidThread());
236 SyncTasks::iterator it
= tasks_
.find(SyncTasks::key_type(UPDATE
, local_id
));
237 if (it
== tasks_
.end())
240 SyncTask
* task
= &it
->second
;
241 task
->waiting_callbacks
.push_back(callback
);
245 base::Closure
SyncClient::PerformFetchTask(const std::string
& local_id
,
246 const ClientContext
& context
) {
247 DCHECK(thread_checker_
.CalledOnValidThread());
248 return download_operation_
->EnsureFileDownloadedByLocalId(
251 GetFileContentInitializedCallback(),
252 google_apis::GetContentCallback(),
253 base::Bind(&SyncClient::OnFetchFileComplete
,
254 weak_ptr_factory_
.GetWeakPtr(),
258 void SyncClient::AddFetchTaskInternal(const std::string
& local_id
,
259 const base::TimeDelta
& delay
) {
260 DCHECK(thread_checker_
.CalledOnValidThread());
263 task
.state
= PENDING
;
264 task
.context
= ClientContext(BACKGROUND
);
265 task
.task
= base::Bind(&SyncClient::PerformFetchTask
,
266 base::Unretained(this),
268 AddTask(SyncTasks::key_type(FETCH
, local_id
), task
, delay
);
271 base::Closure
SyncClient::PerformUpdateTask(const std::string
& local_id
,
272 const ClientContext
& context
) {
273 DCHECK(thread_checker_
.CalledOnValidThread());
274 entry_update_performer_
->UpdateEntry(
277 base::Bind(&SyncClient::OnTaskComplete
,
278 weak_ptr_factory_
.GetWeakPtr(),
281 return base::Closure();
284 void SyncClient::AddUpdateTaskInternal(const ClientContext
& context
,
285 const std::string
& local_id
,
286 const base::TimeDelta
& delay
) {
287 DCHECK(thread_checker_
.CalledOnValidThread());
290 task
.state
= PENDING
;
291 task
.context
= context
;
292 task
.task
= base::Bind(&SyncClient::PerformUpdateTask
,
293 base::Unretained(this),
295 AddTask(SyncTasks::key_type(UPDATE
, local_id
), task
, delay
);
298 void SyncClient::AddTask(const SyncTasks::key_type
& key
,
299 const SyncTask
& task
,
300 const base::TimeDelta
& delay
) {
301 DCHECK(thread_checker_
.CalledOnValidThread());
303 SyncTasks::iterator it
= tasks_
.find(key
);
304 if (it
!= tasks_
.end()) {
305 switch (it
->second
.state
) {
307 // Activate the task.
308 it
->second
.state
= PENDING
;
311 // The same task will run, do nothing.
314 // Something has changed since the task started. Schedule rerun.
315 it
->second
.should_run_again
= true;
321 DCHECK_EQ(PENDING
, task
.state
);
322 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
324 base::Bind(&SyncClient::StartTask
, weak_ptr_factory_
.GetWeakPtr(), key
),
328 void SyncClient::StartTask(const SyncTasks::key_type
& key
) {
329 ResourceEntry
* parent
= new ResourceEntry
;
330 base::PostTaskAndReplyWithResult(
331 blocking_task_runner_
.get(),
333 base::Bind(&GetParentResourceEntry
, metadata_
, key
.second
, parent
),
334 base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry
,
335 weak_ptr_factory_
.GetWeakPtr(),
337 base::Owned(parent
)));
340 void SyncClient::StartTaskAfterGetParentResourceEntry(
341 const SyncTasks::key_type
& key
,
342 const ResourceEntry
* parent
,
344 const SyncType type
= key
.first
;
345 const std::string
& local_id
= key
.second
;
346 SyncTasks::iterator it
= tasks_
.find(key
);
347 if (it
== tasks_
.end())
350 SyncTask
* task
= &it
->second
;
351 switch (task
->state
) {
355 case RUNNING
: // Do nothing.
359 if (error
!= FILE_ERROR_OK
) {
360 OnTaskComplete(type
, local_id
, error
);
364 if (type
== UPDATE
&&
365 parent
->resource_id().empty() &&
366 parent
->local_id() != util::kDriveTrashDirLocalId
) {
367 // Parent entry needs to be synced to get a resource ID.
368 // Suspend the task and register it as a dependent task of the parent.
369 const SyncTasks::key_type
key_parent(type
, parent
->local_id());
370 SyncTasks::iterator it_parent
= tasks_
.find(key_parent
);
371 if (it_parent
== tasks_
.end()) {
372 OnTaskComplete(type
, local_id
, FILE_ERROR_INVALID_OPERATION
);
373 LOG(WARNING
) << "Parent task not found: type = " << type
<< ", id = "
374 << local_id
<< ", parent_id = " << parent
->local_id();
377 task
->state
= SUSPENDED
;
378 it_parent
->second
.dependent_tasks
.push_back(key
);
383 task
->state
= RUNNING
;
384 task
->cancel_closure
= task
->task
.Run(task
->context
);
387 void SyncClient::OnGetLocalIdsOfBacklog(
388 const std::vector
<std::string
>* to_fetch
,
389 const std::vector
<std::string
>* to_update
) {
390 DCHECK(thread_checker_
.CalledOnValidThread());
392 // Give priority to upload tasks over fetch tasks, so that dirty files are
393 // uploaded as soon as possible.
394 for (size_t i
= 0; i
< to_update
->size(); ++i
) {
395 const std::string
& local_id
= (*to_update
)[i
];
396 DVLOG(1) << "Queuing to update: " << local_id
;
397 AddUpdateTask(ClientContext(BACKGROUND
), local_id
);
400 for (size_t i
= 0; i
< to_fetch
->size(); ++i
) {
401 const std::string
& local_id
= (*to_fetch
)[i
];
402 DVLOG(1) << "Queuing to fetch: " << local_id
;
403 AddFetchTaskInternal(local_id
, delay_
);
407 void SyncClient::AddFetchTasks(const std::vector
<std::string
>* local_ids
) {
408 DCHECK(thread_checker_
.CalledOnValidThread());
410 for (size_t i
= 0; i
< local_ids
->size(); ++i
)
411 AddFetchTask((*local_ids
)[i
]);
414 void SyncClient::OnTaskComplete(SyncType type
,
415 const std::string
& local_id
,
417 DCHECK(thread_checker_
.CalledOnValidThread());
419 const SyncTasks::key_type
key(type
, local_id
);
420 SyncTasks::iterator it
= tasks_
.find(key
);
421 DCHECK(it
!= tasks_
.end());
423 base::TimeDelta retry_delay
= base::TimeDelta::FromSeconds(0);
427 DVLOG(1) << "Completed: type = " << type
<< ", id = " << local_id
;
429 case FILE_ERROR_ABORT
:
430 // Ignore it because this is caused by user's cancel operations.
432 case FILE_ERROR_NO_CONNECTION
:
433 // Run the task again so that we'll retry once the connection is back.
434 it
->second
.should_run_again
= true;
435 it
->second
.context
= ClientContext(BACKGROUND
);
437 case FILE_ERROR_SERVICE_UNAVAILABLE
:
438 // Run the task again so that we'll retry once the service is back.
439 it
->second
.should_run_again
= true;
440 it
->second
.context
= ClientContext(BACKGROUND
);
441 retry_delay
= long_delay_
;
442 operation_delegate_
->OnDriveSyncError(
443 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE
, local_id
);
446 operation_delegate_
->OnDriveSyncError(
447 file_system::DRIVE_SYNC_ERROR_MISC
, local_id
);
448 LOG(WARNING
) << "Failed: type = " << type
<< ", id = " << local_id
449 << ": " << FileErrorToString(error
);
452 for (size_t i
= 0; i
< it
->second
.waiting_callbacks
.size(); ++i
) {
453 base::ThreadTaskRunnerHandle::Get()->PostTask(
454 FROM_HERE
, base::Bind(it
->second
.waiting_callbacks
[i
], error
));
456 it
->second
.waiting_callbacks
.clear();
458 if (it
->second
.should_run_again
) {
459 DVLOG(1) << "Running again: type = " << type
<< ", id = " << local_id
;
460 it
->second
.state
= PENDING
;
461 it
->second
.should_run_again
= false;
462 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
464 base::Bind(&SyncClient::StartTask
, weak_ptr_factory_
.GetWeakPtr(), key
),
467 for (size_t i
= 0; i
< it
->second
.dependent_tasks
.size(); ++i
)
468 StartTask(it
->second
.dependent_tasks
[i
]);
473 void SyncClient::OnFetchFileComplete(const std::string
& local_id
,
475 const base::FilePath
& local_path
,
476 scoped_ptr
<ResourceEntry
> entry
) {
477 DCHECK(thread_checker_
.CalledOnValidThread());
478 OnTaskComplete(FETCH
, local_id
, error
);
479 if (error
== FILE_ERROR_ABORT
) {
480 // If user cancels download, unpin the file so that we do not sync the file
482 base::PostTaskAndReplyWithResult(
483 blocking_task_runner_
.get(),
485 base::Bind(&FileCache::Unpin
, base::Unretained(cache_
), local_id
),
486 base::Bind(&util::EmptyFileOperationCallback
));
490 } // namespace internal