Roll src/third_party/WebKit 9f7fb92:f103b33 (svn 202621:202622)
[chromium-blink-merge.git] / components / drive / sync_client.cc
blob0b27928b772667848d18ac0bf85c06e80816c033
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/drive/sync_client.h"
7 #include <vector>
9 #include "base/bind.h"
10 #include "base/thread_task_runner_handle.h"
11 #include "components/drive/drive.pb.h"
12 #include "components/drive/file_cache.h"
13 #include "components/drive/file_system/download_operation.h"
14 #include "components/drive/file_system/operation_delegate.h"
15 #include "components/drive/file_system_core_util.h"
16 #include "components/drive/job_scheduler.h"
17 #include "components/drive/sync/entry_update_performer.h"
18 #include "google_apis/drive/task_util.h"
20 namespace drive {
21 namespace internal {
23 namespace {
25 // The delay constant is used to delay processing a sync task. We should not
26 // process SyncTasks immediately for the following reasons:
28 // 1) For fetching, the user may accidentally click on "Make available
29 // offline" checkbox on a file, and immediately cancel it in a second.
30 // It's a waste to fetch the file in this case.
32 // 2) For uploading, file writing via HTML5 file system API is performed in
33 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We
34 // shouldn't start uploading right after the step 1). Besides, the user
35 // may edit the same file repeatedly in a short period of time.
37 // TODO(satorux): We should find a way to handle the upload case more nicely,
38 // and shorten the delay. crbug.com/134774
39 const int kDelaySeconds = 1;
41 // The delay constant is used to delay retrying a sync task on server errors.
42 const int kLongDelaySeconds = 600;
44 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
45 // fetched (not present locally), to |to_update| if the file needs update.
46 void CollectBacklog(ResourceMetadata* metadata,
47 std::vector<std::string>* to_fetch,
48 std::vector<std::string>* to_update) {
49 DCHECK(to_fetch);
50 DCHECK(to_update);
52 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
53 for (; !it->IsAtEnd(); it->Advance()) {
54 const std::string& local_id = it->GetID();
55 const ResourceEntry& entry = it->GetValue();
56 if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
57 to_update->push_back(local_id);
58 continue;
61 bool should_update = false;
62 switch (entry.metadata_edit_state()) {
63 case ResourceEntry::CLEAN:
64 break;
65 case ResourceEntry::SYNCING:
66 case ResourceEntry::DIRTY:
67 should_update = true;
68 break;
71 if (entry.file_specific_info().cache_state().is_pinned() &&
72 !entry.file_specific_info().cache_state().is_present())
73 to_fetch->push_back(local_id);
75 if (entry.file_specific_info().cache_state().is_dirty())
76 should_update = true;
78 if (should_update)
79 to_update->push_back(local_id);
81 DCHECK(!it->HasError());
84 // Iterates cache entries and collects IDs of ones with obsolete cache files.
85 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
86 FileCache* cache,
87 std::vector<std::string>* local_ids) {
88 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
89 for (; !it->IsAtEnd(); it->Advance()) {
90 const ResourceEntry& entry = it->GetValue();
91 const FileCacheEntry& cache_state =
92 entry.file_specific_info().cache_state();
93 const std::string& local_id = it->GetID();
94 if (!cache_state.is_pinned() || !cache_state.is_present())
95 continue;
97 // If MD5s don't match, it indicates the local cache file is stale, unless
98 // the file is dirty (the MD5 is "local"). We should never re-fetch the
99 // file when we have a locally modified version.
100 if (entry.file_specific_info().md5() == cache_state.md5() ||
101 cache_state.is_dirty())
102 continue;
104 FileError error = cache->Remove(local_id);
105 if (error != FILE_ERROR_OK) {
106 LOG(WARNING) << "Failed to remove cache entry: " << local_id;
107 continue;
110 error = cache->Pin(local_id);
111 if (error != FILE_ERROR_OK) {
112 LOG(WARNING) << "Failed to pin cache entry: " << local_id;
113 continue;
116 local_ids->push_back(local_id);
118 DCHECK(!it->HasError());
121 // Gets the parent entry of the entry specified by the ID.
122 FileError GetParentResourceEntry(ResourceMetadata* metadata,
123 const std::string& local_id,
124 ResourceEntry* parent) {
125 ResourceEntry entry;
126 FileError error = metadata->GetResourceEntryById(local_id, &entry);
127 if (error != FILE_ERROR_OK)
128 return error;
129 return metadata->GetResourceEntryById(entry.parent_local_id(), parent);
132 } // namespace
134 SyncClient::SyncTask::SyncTask()
135 : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {}
136 SyncClient::SyncTask::~SyncTask() {}
138 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
139 file_system::OperationDelegate* delegate,
140 JobScheduler* scheduler,
141 ResourceMetadata* metadata,
142 FileCache* cache,
143 LoaderController* loader_controller,
144 const base::FilePath& temporary_file_directory)
145 : blocking_task_runner_(blocking_task_runner),
146 operation_delegate_(delegate),
147 metadata_(metadata),
148 cache_(cache),
149 download_operation_(new file_system::DownloadOperation(
150 blocking_task_runner,
151 delegate,
152 scheduler,
153 metadata,
154 cache,
155 temporary_file_directory)),
156 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
157 delegate,
158 scheduler,
159 metadata,
160 cache,
161 loader_controller)),
162 delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
163 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
164 weak_ptr_factory_(this) {
167 SyncClient::~SyncClient() {
168 DCHECK(thread_checker_.CalledOnValidThread());
171 void SyncClient::StartProcessingBacklog() {
172 DCHECK(thread_checker_.CalledOnValidThread());
174 std::vector<std::string>* to_fetch = new std::vector<std::string>;
175 std::vector<std::string>* to_update = new std::vector<std::string>;
176 blocking_task_runner_->PostTaskAndReply(
177 FROM_HERE,
178 base::Bind(&CollectBacklog, metadata_, to_fetch, to_update),
179 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
180 weak_ptr_factory_.GetWeakPtr(),
181 base::Owned(to_fetch),
182 base::Owned(to_update)));
185 void SyncClient::StartCheckingExistingPinnedFiles() {
186 DCHECK(thread_checker_.CalledOnValidThread());
188 std::vector<std::string>* local_ids = new std::vector<std::string>;
189 blocking_task_runner_->PostTaskAndReply(
190 FROM_HERE,
191 base::Bind(&CheckExistingPinnedFiles,
192 metadata_,
193 cache_,
194 local_ids),
195 base::Bind(&SyncClient::AddFetchTasks,
196 weak_ptr_factory_.GetWeakPtr(),
197 base::Owned(local_ids)));
200 void SyncClient::AddFetchTask(const std::string& local_id) {
201 DCHECK(thread_checker_.CalledOnValidThread());
202 AddFetchTaskInternal(local_id, delay_);
205 void SyncClient::RemoveFetchTask(const std::string& local_id) {
206 DCHECK(thread_checker_.CalledOnValidThread());
208 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
209 if (it == tasks_.end())
210 return;
212 SyncTask* task = &it->second;
213 switch (task->state) {
214 case SUSPENDED:
215 case PENDING:
216 OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT);
217 break;
218 case RUNNING:
219 if (!task->cancel_closure.is_null())
220 task->cancel_closure.Run();
221 break;
225 void SyncClient::AddUpdateTask(const ClientContext& context,
226 const std::string& local_id) {
227 DCHECK(thread_checker_.CalledOnValidThread());
228 AddUpdateTaskInternal(context, local_id, delay_);
231 bool SyncClient:: WaitForUpdateTaskToComplete(
232 const std::string& local_id,
233 const FileOperationCallback& callback) {
234 DCHECK(thread_checker_.CalledOnValidThread());
236 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(UPDATE, local_id));
237 if (it == tasks_.end())
238 return false;
240 SyncTask* task = &it->second;
241 task->waiting_callbacks.push_back(callback);
242 return true;
245 base::Closure SyncClient::PerformFetchTask(const std::string& local_id,
246 const ClientContext& context) {
247 DCHECK(thread_checker_.CalledOnValidThread());
248 return download_operation_->EnsureFileDownloadedByLocalId(
249 local_id,
250 context,
251 GetFileContentInitializedCallback(),
252 google_apis::GetContentCallback(),
253 base::Bind(&SyncClient::OnFetchFileComplete,
254 weak_ptr_factory_.GetWeakPtr(),
255 local_id));
258 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
259 const base::TimeDelta& delay) {
260 DCHECK(thread_checker_.CalledOnValidThread());
262 SyncTask task;
263 task.state = PENDING;
264 task.context = ClientContext(BACKGROUND);
265 task.task = base::Bind(&SyncClient::PerformFetchTask,
266 base::Unretained(this),
267 local_id);
268 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
271 base::Closure SyncClient::PerformUpdateTask(const std::string& local_id,
272 const ClientContext& context) {
273 DCHECK(thread_checker_.CalledOnValidThread());
274 entry_update_performer_->UpdateEntry(
275 local_id,
276 context,
277 base::Bind(&SyncClient::OnTaskComplete,
278 weak_ptr_factory_.GetWeakPtr(),
279 UPDATE,
280 local_id));
281 return base::Closure();
284 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
285 const std::string& local_id,
286 const base::TimeDelta& delay) {
287 DCHECK(thread_checker_.CalledOnValidThread());
289 SyncTask task;
290 task.state = PENDING;
291 task.context = context;
292 task.task = base::Bind(&SyncClient::PerformUpdateTask,
293 base::Unretained(this),
294 local_id);
295 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
298 void SyncClient::AddTask(const SyncTasks::key_type& key,
299 const SyncTask& task,
300 const base::TimeDelta& delay) {
301 DCHECK(thread_checker_.CalledOnValidThread());
303 SyncTasks::iterator it = tasks_.find(key);
304 if (it != tasks_.end()) {
305 switch (it->second.state) {
306 case SUSPENDED:
307 // Activate the task.
308 it->second.state = PENDING;
309 break;
310 case PENDING:
311 // The same task will run, do nothing.
312 return;
313 case RUNNING:
314 // Something has changed since the task started. Schedule rerun.
315 it->second.should_run_again = true;
316 return;
318 } else {
319 tasks_[key] = task;
321 DCHECK_EQ(PENDING, task.state);
322 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
323 FROM_HERE,
324 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
325 delay);
328 void SyncClient::StartTask(const SyncTasks::key_type& key) {
329 ResourceEntry* parent = new ResourceEntry;
330 base::PostTaskAndReplyWithResult(
331 blocking_task_runner_.get(),
332 FROM_HERE,
333 base::Bind(&GetParentResourceEntry, metadata_, key.second, parent),
334 base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry,
335 weak_ptr_factory_.GetWeakPtr(),
336 key,
337 base::Owned(parent)));
340 void SyncClient::StartTaskAfterGetParentResourceEntry(
341 const SyncTasks::key_type& key,
342 const ResourceEntry* parent,
343 FileError error) {
344 const SyncType type = key.first;
345 const std::string& local_id = key.second;
346 SyncTasks::iterator it = tasks_.find(key);
347 if (it == tasks_.end())
348 return;
350 SyncTask* task = &it->second;
351 switch (task->state) {
352 case SUSPENDED:
353 case PENDING:
354 break;
355 case RUNNING: // Do nothing.
356 return;
359 if (error != FILE_ERROR_OK) {
360 OnTaskComplete(type, local_id, error);
361 return;
364 if (type == UPDATE &&
365 parent->resource_id().empty() &&
366 parent->local_id() != util::kDriveTrashDirLocalId) {
367 // Parent entry needs to be synced to get a resource ID.
368 // Suspend the task and register it as a dependent task of the parent.
369 const SyncTasks::key_type key_parent(type, parent->local_id());
370 SyncTasks::iterator it_parent = tasks_.find(key_parent);
371 if (it_parent == tasks_.end()) {
372 OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION);
373 LOG(WARNING) << "Parent task not found: type = " << type << ", id = "
374 << local_id << ", parent_id = " << parent->local_id();
375 return;
377 task->state = SUSPENDED;
378 it_parent->second.dependent_tasks.push_back(key);
379 return;
382 // Run the task.
383 task->state = RUNNING;
384 task->cancel_closure = task->task.Run(task->context);
387 void SyncClient::OnGetLocalIdsOfBacklog(
388 const std::vector<std::string>* to_fetch,
389 const std::vector<std::string>* to_update) {
390 DCHECK(thread_checker_.CalledOnValidThread());
392 // Give priority to upload tasks over fetch tasks, so that dirty files are
393 // uploaded as soon as possible.
394 for (size_t i = 0; i < to_update->size(); ++i) {
395 const std::string& local_id = (*to_update)[i];
396 DVLOG(1) << "Queuing to update: " << local_id;
397 AddUpdateTask(ClientContext(BACKGROUND), local_id);
400 for (size_t i = 0; i < to_fetch->size(); ++i) {
401 const std::string& local_id = (*to_fetch)[i];
402 DVLOG(1) << "Queuing to fetch: " << local_id;
403 AddFetchTaskInternal(local_id, delay_);
407 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
408 DCHECK(thread_checker_.CalledOnValidThread());
410 for (size_t i = 0; i < local_ids->size(); ++i)
411 AddFetchTask((*local_ids)[i]);
414 void SyncClient::OnTaskComplete(SyncType type,
415 const std::string& local_id,
416 FileError error) {
417 DCHECK(thread_checker_.CalledOnValidThread());
419 const SyncTasks::key_type key(type, local_id);
420 SyncTasks::iterator it = tasks_.find(key);
421 DCHECK(it != tasks_.end());
423 base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0);
425 switch (error) {
426 case FILE_ERROR_OK:
427 DVLOG(1) << "Completed: type = " << type << ", id = " << local_id;
428 break;
429 case FILE_ERROR_ABORT:
430 // Ignore it because this is caused by user's cancel operations.
431 break;
432 case FILE_ERROR_NO_CONNECTION:
433 // Run the task again so that we'll retry once the connection is back.
434 it->second.should_run_again = true;
435 it->second.context = ClientContext(BACKGROUND);
436 break;
437 case FILE_ERROR_SERVICE_UNAVAILABLE:
438 // Run the task again so that we'll retry once the service is back.
439 it->second.should_run_again = true;
440 it->second.context = ClientContext(BACKGROUND);
441 retry_delay = long_delay_;
442 operation_delegate_->OnDriveSyncError(
443 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id);
444 break;
445 case FILE_ERROR_NO_SERVER_SPACE:
446 operation_delegate_->OnDriveSyncError(
447 file_system::DRIVE_SYNC_ERROR_NO_SERVER_SPACE, local_id);
448 break;
449 default:
450 operation_delegate_->OnDriveSyncError(
451 file_system::DRIVE_SYNC_ERROR_MISC, local_id);
452 LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id
453 << ": " << FileErrorToString(error);
456 for (size_t i = 0; i < it->second.waiting_callbacks.size(); ++i) {
457 base::ThreadTaskRunnerHandle::Get()->PostTask(
458 FROM_HERE, base::Bind(it->second.waiting_callbacks[i], error));
460 it->second.waiting_callbacks.clear();
462 if (it->second.should_run_again) {
463 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
464 it->second.state = PENDING;
465 it->second.should_run_again = false;
466 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
467 FROM_HERE,
468 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
469 retry_delay);
470 } else {
471 for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i)
472 StartTask(it->second.dependent_tasks[i]);
473 tasks_.erase(it);
477 void SyncClient::OnFetchFileComplete(const std::string& local_id,
478 FileError error,
479 const base::FilePath& local_path,
480 scoped_ptr<ResourceEntry> entry) {
481 DCHECK(thread_checker_.CalledOnValidThread());
482 OnTaskComplete(FETCH, local_id, error);
483 if (error == FILE_ERROR_ABORT) {
484 // If user cancels download, unpin the file so that we do not sync the file
485 // again.
486 base::PostTaskAndReplyWithResult(
487 blocking_task_runner_.get(),
488 FROM_HERE,
489 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
490 base::Bind(&util::EmptyFileOperationCallback));
494 } // namespace internal
495 } // namespace drive