Add ENABLE_MEDIA_ROUTER define to builds other than Android and iOS.
[chromium-blink-merge.git] / chrome / browser / chromeos / drive / sync_client.cc
blob267edb29fa85751176ce778c1f18916a7db9f02a
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/sync_client.h"
7 #include <vector>
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop_proxy.h"
11 #include "chrome/browser/chromeos/drive/drive.pb.h"
12 #include "chrome/browser/chromeos/drive/file_cache.h"
13 #include "chrome/browser/chromeos/drive/file_system/download_operation.h"
14 #include "chrome/browser/chromeos/drive/file_system/operation_delegate.h"
15 #include "chrome/browser/chromeos/drive/file_system_util.h"
16 #include "chrome/browser/chromeos/drive/job_scheduler.h"
17 #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
18 #include "content/public/browser/browser_thread.h"
19 #include "google_apis/drive/task_util.h"
21 using content::BrowserThread;
23 namespace drive {
24 namespace internal {
26 namespace {
28 // The delay constant is used to delay processing a sync task. We should not
29 // process SyncTasks immediately for the following reasons:
31 // 1) For fetching, the user may accidentally click on "Make available
32 // offline" checkbox on a file, and immediately cancel it in a second.
33 // It's a waste to fetch the file in this case.
35 // 2) For uploading, file writing via HTML5 file system API is performed in
36 // two steps: 1) truncate a file to 0 bytes, 2) write contents. We
37 // shouldn't start uploading right after the step 1). Besides, the user
38 // may edit the same file repeatedly in a short period of time.
40 // TODO(satorux): We should find a way to handle the upload case more nicely,
41 // and shorten the delay. crbug.com/134774
42 const int kDelaySeconds = 1;
44 // The delay constant is used to delay retrying a sync task on server errors.
45 const int kLongDelaySeconds = 600;
47 // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
48 // fetched (not present locally), to |to_update| if the file needs update.
49 void CollectBacklog(ResourceMetadata* metadata,
50 std::vector<std::string>* to_fetch,
51 std::vector<std::string>* to_update) {
52 DCHECK(to_fetch);
53 DCHECK(to_update);
55 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
56 for (; !it->IsAtEnd(); it->Advance()) {
57 const std::string& local_id = it->GetID();
58 const ResourceEntry& entry = it->GetValue();
59 if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
60 to_update->push_back(local_id);
61 continue;
64 bool should_update = false;
65 switch (entry.metadata_edit_state()) {
66 case ResourceEntry::CLEAN:
67 break;
68 case ResourceEntry::SYNCING:
69 case ResourceEntry::DIRTY:
70 should_update = true;
71 break;
74 if (entry.file_specific_info().cache_state().is_pinned() &&
75 !entry.file_specific_info().cache_state().is_present())
76 to_fetch->push_back(local_id);
78 if (entry.file_specific_info().cache_state().is_dirty())
79 should_update = true;
81 if (should_update)
82 to_update->push_back(local_id);
84 DCHECK(!it->HasError());
87 // Iterates cache entries and collects IDs of ones with obsolete cache files.
88 void CheckExistingPinnedFiles(ResourceMetadata* metadata,
89 FileCache* cache,
90 std::vector<std::string>* local_ids) {
91 scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
92 for (; !it->IsAtEnd(); it->Advance()) {
93 const ResourceEntry& entry = it->GetValue();
94 const FileCacheEntry& cache_state =
95 entry.file_specific_info().cache_state();
96 const std::string& local_id = it->GetID();
97 if (!cache_state.is_pinned() || !cache_state.is_present())
98 continue;
100 // If MD5s don't match, it indicates the local cache file is stale, unless
101 // the file is dirty (the MD5 is "local"). We should never re-fetch the
102 // file when we have a locally modified version.
103 if (entry.file_specific_info().md5() == cache_state.md5() ||
104 cache_state.is_dirty())
105 continue;
107 FileError error = cache->Remove(local_id);
108 if (error != FILE_ERROR_OK) {
109 LOG(WARNING) << "Failed to remove cache entry: " << local_id;
110 continue;
113 error = cache->Pin(local_id);
114 if (error != FILE_ERROR_OK) {
115 LOG(WARNING) << "Failed to pin cache entry: " << local_id;
116 continue;
119 local_ids->push_back(local_id);
121 DCHECK(!it->HasError());
124 // Gets the parent entry of the entry specified by the ID.
125 FileError GetParentResourceEntry(ResourceMetadata* metadata,
126 const std::string& local_id,
127 ResourceEntry* parent) {
128 ResourceEntry entry;
129 FileError error = metadata->GetResourceEntryById(local_id, &entry);
130 if (error != FILE_ERROR_OK)
131 return error;
132 return metadata->GetResourceEntryById(entry.parent_local_id(), parent);
135 } // namespace
137 SyncClient::SyncTask::SyncTask()
138 : state(SUSPENDED), context(BACKGROUND), should_run_again(false) {}
139 SyncClient::SyncTask::~SyncTask() {}
141 SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
142 file_system::OperationDelegate* delegate,
143 JobScheduler* scheduler,
144 ResourceMetadata* metadata,
145 FileCache* cache,
146 LoaderController* loader_controller,
147 const base::FilePath& temporary_file_directory)
148 : blocking_task_runner_(blocking_task_runner),
149 operation_delegate_(delegate),
150 metadata_(metadata),
151 cache_(cache),
152 download_operation_(new file_system::DownloadOperation(
153 blocking_task_runner,
154 delegate,
155 scheduler,
156 metadata,
157 cache,
158 temporary_file_directory)),
159 entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
160 delegate,
161 scheduler,
162 metadata,
163 cache,
164 loader_controller)),
165 delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
166 long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
167 weak_ptr_factory_(this) {
168 DCHECK_CURRENTLY_ON(BrowserThread::UI);
171 SyncClient::~SyncClient() {
172 DCHECK_CURRENTLY_ON(BrowserThread::UI);
175 void SyncClient::StartProcessingBacklog() {
176 DCHECK_CURRENTLY_ON(BrowserThread::UI);
178 std::vector<std::string>* to_fetch = new std::vector<std::string>;
179 std::vector<std::string>* to_update = new std::vector<std::string>;
180 blocking_task_runner_->PostTaskAndReply(
181 FROM_HERE,
182 base::Bind(&CollectBacklog, metadata_, to_fetch, to_update),
183 base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
184 weak_ptr_factory_.GetWeakPtr(),
185 base::Owned(to_fetch),
186 base::Owned(to_update)));
189 void SyncClient::StartCheckingExistingPinnedFiles() {
190 DCHECK_CURRENTLY_ON(BrowserThread::UI);
192 std::vector<std::string>* local_ids = new std::vector<std::string>;
193 blocking_task_runner_->PostTaskAndReply(
194 FROM_HERE,
195 base::Bind(&CheckExistingPinnedFiles,
196 metadata_,
197 cache_,
198 local_ids),
199 base::Bind(&SyncClient::AddFetchTasks,
200 weak_ptr_factory_.GetWeakPtr(),
201 base::Owned(local_ids)));
204 void SyncClient::AddFetchTask(const std::string& local_id) {
205 DCHECK_CURRENTLY_ON(BrowserThread::UI);
206 AddFetchTaskInternal(local_id, delay_);
209 void SyncClient::RemoveFetchTask(const std::string& local_id) {
210 DCHECK_CURRENTLY_ON(BrowserThread::UI);
212 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
213 if (it == tasks_.end())
214 return;
216 SyncTask* task = &it->second;
217 switch (task->state) {
218 case SUSPENDED:
219 case PENDING:
220 OnTaskComplete(FETCH, local_id, FILE_ERROR_ABORT);
221 break;
222 case RUNNING:
223 if (!task->cancel_closure.is_null())
224 task->cancel_closure.Run();
225 break;
229 void SyncClient::AddUpdateTask(const ClientContext& context,
230 const std::string& local_id) {
231 DCHECK_CURRENTLY_ON(BrowserThread::UI);
232 AddUpdateTaskInternal(context, local_id, delay_);
235 bool SyncClient:: WaitForUpdateTaskToComplete(
236 const std::string& local_id,
237 const FileOperationCallback& callback) {
238 DCHECK_CURRENTLY_ON(BrowserThread::UI);
240 SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(UPDATE, local_id));
241 if (it == tasks_.end())
242 return false;
244 SyncTask* task = &it->second;
245 task->waiting_callbacks.push_back(callback);
246 return true;
249 base::Closure SyncClient::PerformFetchTask(const std::string& local_id,
250 const ClientContext& context) {
251 DCHECK_CURRENTLY_ON(BrowserThread::UI);
252 return download_operation_->EnsureFileDownloadedByLocalId(
253 local_id,
254 context,
255 GetFileContentInitializedCallback(),
256 google_apis::GetContentCallback(),
257 base::Bind(&SyncClient::OnFetchFileComplete,
258 weak_ptr_factory_.GetWeakPtr(),
259 local_id));
262 void SyncClient::AddFetchTaskInternal(const std::string& local_id,
263 const base::TimeDelta& delay) {
264 DCHECK_CURRENTLY_ON(BrowserThread::UI);
266 SyncTask task;
267 task.state = PENDING;
268 task.context = ClientContext(BACKGROUND);
269 task.task = base::Bind(&SyncClient::PerformFetchTask,
270 base::Unretained(this),
271 local_id);
272 AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
275 base::Closure SyncClient::PerformUpdateTask(const std::string& local_id,
276 const ClientContext& context) {
277 DCHECK_CURRENTLY_ON(BrowserThread::UI);
278 entry_update_performer_->UpdateEntry(
279 local_id,
280 context,
281 base::Bind(&SyncClient::OnTaskComplete,
282 weak_ptr_factory_.GetWeakPtr(),
283 UPDATE,
284 local_id));
285 return base::Closure();
288 void SyncClient::AddUpdateTaskInternal(const ClientContext& context,
289 const std::string& local_id,
290 const base::TimeDelta& delay) {
291 DCHECK_CURRENTLY_ON(BrowserThread::UI);
293 SyncTask task;
294 task.state = PENDING;
295 task.context = context;
296 task.task = base::Bind(&SyncClient::PerformUpdateTask,
297 base::Unretained(this),
298 local_id);
299 AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
302 void SyncClient::AddTask(const SyncTasks::key_type& key,
303 const SyncTask& task,
304 const base::TimeDelta& delay) {
305 DCHECK_CURRENTLY_ON(BrowserThread::UI);
307 SyncTasks::iterator it = tasks_.find(key);
308 if (it != tasks_.end()) {
309 switch (it->second.state) {
310 case SUSPENDED:
311 // Activate the task.
312 it->second.state = PENDING;
313 break;
314 case PENDING:
315 // The same task will run, do nothing.
316 return;
317 case RUNNING:
318 // Something has changed since the task started. Schedule rerun.
319 it->second.should_run_again = true;
320 return;
322 } else {
323 tasks_[key] = task;
325 DCHECK_EQ(PENDING, task.state);
326 base::MessageLoopProxy::current()->PostDelayedTask(
327 FROM_HERE,
328 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
329 delay);
332 void SyncClient::StartTask(const SyncTasks::key_type& key) {
333 ResourceEntry* parent = new ResourceEntry;
334 base::PostTaskAndReplyWithResult(
335 blocking_task_runner_.get(),
336 FROM_HERE,
337 base::Bind(&GetParentResourceEntry, metadata_, key.second, parent),
338 base::Bind(&SyncClient::StartTaskAfterGetParentResourceEntry,
339 weak_ptr_factory_.GetWeakPtr(),
340 key,
341 base::Owned(parent)));
344 void SyncClient::StartTaskAfterGetParentResourceEntry(
345 const SyncTasks::key_type& key,
346 const ResourceEntry* parent,
347 FileError error) {
348 const SyncType type = key.first;
349 const std::string& local_id = key.second;
350 SyncTasks::iterator it = tasks_.find(key);
351 if (it == tasks_.end())
352 return;
354 SyncTask* task = &it->second;
355 switch (task->state) {
356 case SUSPENDED:
357 case PENDING:
358 break;
359 case RUNNING: // Do nothing.
360 return;
363 if (error != FILE_ERROR_OK) {
364 OnTaskComplete(type, local_id, error);
365 return;
368 if (type == UPDATE &&
369 parent->resource_id().empty() &&
370 parent->local_id() != util::kDriveTrashDirLocalId) {
371 // Parent entry needs to be synced to get a resource ID.
372 // Suspend the task and register it as a dependent task of the parent.
373 const SyncTasks::key_type key_parent(type, parent->local_id());
374 SyncTasks::iterator it_parent = tasks_.find(key_parent);
375 if (it_parent == tasks_.end()) {
376 OnTaskComplete(type, local_id, FILE_ERROR_INVALID_OPERATION);
377 LOG(WARNING) << "Parent task not found: type = " << type << ", id = "
378 << local_id << ", parent_id = " << parent->local_id();
379 return;
381 task->state = SUSPENDED;
382 it_parent->second.dependent_tasks.push_back(key);
383 return;
386 // Run the task.
387 task->state = RUNNING;
388 task->cancel_closure = task->task.Run(task->context);
391 void SyncClient::OnGetLocalIdsOfBacklog(
392 const std::vector<std::string>* to_fetch,
393 const std::vector<std::string>* to_update) {
394 DCHECK_CURRENTLY_ON(BrowserThread::UI);
396 // Give priority to upload tasks over fetch tasks, so that dirty files are
397 // uploaded as soon as possible.
398 for (size_t i = 0; i < to_update->size(); ++i) {
399 const std::string& local_id = (*to_update)[i];
400 DVLOG(1) << "Queuing to update: " << local_id;
401 AddUpdateTask(ClientContext(BACKGROUND), local_id);
404 for (size_t i = 0; i < to_fetch->size(); ++i) {
405 const std::string& local_id = (*to_fetch)[i];
406 DVLOG(1) << "Queuing to fetch: " << local_id;
407 AddFetchTaskInternal(local_id, delay_);
411 void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
412 DCHECK_CURRENTLY_ON(BrowserThread::UI);
414 for (size_t i = 0; i < local_ids->size(); ++i)
415 AddFetchTask((*local_ids)[i]);
418 void SyncClient::OnTaskComplete(SyncType type,
419 const std::string& local_id,
420 FileError error) {
421 DCHECK_CURRENTLY_ON(BrowserThread::UI);
423 const SyncTasks::key_type key(type, local_id);
424 SyncTasks::iterator it = tasks_.find(key);
425 DCHECK(it != tasks_.end());
427 base::TimeDelta retry_delay = base::TimeDelta::FromSeconds(0);
429 switch (error) {
430 case FILE_ERROR_OK:
431 DVLOG(1) << "Completed: type = " << type << ", id = " << local_id;
432 break;
433 case FILE_ERROR_ABORT:
434 // Ignore it because this is caused by user's cancel operations.
435 break;
436 case FILE_ERROR_NO_CONNECTION:
437 // Run the task again so that we'll retry once the connection is back.
438 it->second.should_run_again = true;
439 it->second.context = ClientContext(BACKGROUND);
440 break;
441 case FILE_ERROR_SERVICE_UNAVAILABLE:
442 // Run the task again so that we'll retry once the service is back.
443 it->second.should_run_again = true;
444 it->second.context = ClientContext(BACKGROUND);
445 retry_delay = long_delay_;
446 operation_delegate_->OnDriveSyncError(
447 file_system::DRIVE_SYNC_ERROR_SERVICE_UNAVAILABLE, local_id);
448 break;
449 default:
450 operation_delegate_->OnDriveSyncError(
451 file_system::DRIVE_SYNC_ERROR_MISC, local_id);
452 LOG(WARNING) << "Failed: type = " << type << ", id = " << local_id
453 << ": " << FileErrorToString(error);
456 for (size_t i = 0; i < it->second.waiting_callbacks.size(); ++i) {
457 base::MessageLoopProxy::current()->PostTask(
458 FROM_HERE, base::Bind(it->second.waiting_callbacks[i], error));
460 it->second.waiting_callbacks.clear();
462 if (it->second.should_run_again) {
463 DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
464 it->second.state = PENDING;
465 it->second.should_run_again = false;
466 base::MessageLoopProxy::current()->PostDelayedTask(
467 FROM_HERE,
468 base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
469 retry_delay);
470 } else {
471 for (size_t i = 0; i < it->second.dependent_tasks.size(); ++i)
472 StartTask(it->second.dependent_tasks[i]);
473 tasks_.erase(it);
477 void SyncClient::OnFetchFileComplete(const std::string& local_id,
478 FileError error,
479 const base::FilePath& local_path,
480 scoped_ptr<ResourceEntry> entry) {
481 DCHECK_CURRENTLY_ON(BrowserThread::UI);
482 OnTaskComplete(FETCH, local_id, error);
483 if (error == FILE_ERROR_ABORT) {
484 // If user cancels download, unpin the file so that we do not sync the file
485 // again.
486 base::PostTaskAndReplyWithResult(
487 blocking_task_runner_.get(),
488 FROM_HERE,
489 base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
490 base::Bind(&util::EmptyFileOperationCallback));
494 } // namespace internal
495 } // namespace drive