Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / components / drive / change_list_loader.cc
blob8efc135c3b5dfc24dc5865a2d8bf6aca9c773379
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/drive/change_list_loader.h"
7 #include <set>
9 #include "base/callback.h"
10 #include "base/callback_helpers.h"
11 #include "base/metrics/histogram.h"
12 #include "base/strings/string_number_conversions.h"
13 #include "base/synchronization/cancellation_flag.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "base/time/time.h"
16 #include "components/drive/change_list_loader_observer.h"
17 #include "components/drive/change_list_processor.h"
18 #include "components/drive/event_logger.h"
19 #include "components/drive/file_system_core_util.h"
20 #include "components/drive/job_scheduler.h"
21 #include "components/drive/resource_metadata.h"
22 #include "google_apis/drive/drive_api_parser.h"
23 #include "url/gurl.h"
25 namespace drive {
26 namespace internal {
28 typedef base::Callback<void(FileError, ScopedVector<ChangeList>)>
29 FeedFetcherCallback;
31 class ChangeListLoader::FeedFetcher {
32 public:
33 virtual ~FeedFetcher() {}
34 virtual void Run(const FeedFetcherCallback& callback) = 0;
37 namespace {
39 // Fetches all the (currently available) resource entries from the server.
40 class FullFeedFetcher : public ChangeListLoader::FeedFetcher {
41 public:
42 explicit FullFeedFetcher(JobScheduler* scheduler)
43 : scheduler_(scheduler),
44 weak_ptr_factory_(this) {
47 ~FullFeedFetcher() override {}
49 void Run(const FeedFetcherCallback& callback) override {
50 DCHECK(thread_checker_.CalledOnValidThread());
51 DCHECK(!callback.is_null());
53 // Remember the time stamp for usage stats.
54 start_time_ = base::TimeTicks::Now();
56 // This is full resource list fetch.
57 scheduler_->GetAllFileList(
58 base::Bind(&FullFeedFetcher::OnFileListFetched,
59 weak_ptr_factory_.GetWeakPtr(), callback));
62 private:
63 void OnFileListFetched(const FeedFetcherCallback& callback,
64 google_apis::DriveApiErrorCode status,
65 scoped_ptr<google_apis::FileList> file_list) {
66 DCHECK(thread_checker_.CalledOnValidThread());
67 DCHECK(!callback.is_null());
69 FileError error = GDataToFileError(status);
70 if (error != FILE_ERROR_OK) {
71 callback.Run(error, ScopedVector<ChangeList>());
72 return;
75 DCHECK(file_list);
76 change_lists_.push_back(new ChangeList(*file_list));
78 if (!file_list->next_link().is_empty()) {
79 // There is the remaining result so fetch it.
80 scheduler_->GetRemainingFileList(
81 file_list->next_link(),
82 base::Bind(&FullFeedFetcher::OnFileListFetched,
83 weak_ptr_factory_.GetWeakPtr(), callback));
84 return;
87 UMA_HISTOGRAM_LONG_TIMES("Drive.FullFeedLoadTime",
88 base::TimeTicks::Now() - start_time_);
90 // Note: The fetcher is managed by ChangeListLoader, and the instance
91 // will be deleted in the callback. Do not touch the fields after this
92 // invocation.
93 callback.Run(FILE_ERROR_OK, change_lists_.Pass());
96 JobScheduler* scheduler_;
97 ScopedVector<ChangeList> change_lists_;
98 base::TimeTicks start_time_;
99 base::ThreadChecker thread_checker_;
100 base::WeakPtrFactory<FullFeedFetcher> weak_ptr_factory_;
101 DISALLOW_COPY_AND_ASSIGN(FullFeedFetcher);
104 // Fetches the delta changes since |start_change_id|.
105 class DeltaFeedFetcher : public ChangeListLoader::FeedFetcher {
106 public:
107 DeltaFeedFetcher(JobScheduler* scheduler, int64 start_change_id)
108 : scheduler_(scheduler),
109 start_change_id_(start_change_id),
110 weak_ptr_factory_(this) {
113 ~DeltaFeedFetcher() override {}
115 void Run(const FeedFetcherCallback& callback) override {
116 DCHECK(thread_checker_.CalledOnValidThread());
117 DCHECK(!callback.is_null());
119 scheduler_->GetChangeList(
120 start_change_id_,
121 base::Bind(&DeltaFeedFetcher::OnChangeListFetched,
122 weak_ptr_factory_.GetWeakPtr(), callback));
125 private:
126 void OnChangeListFetched(const FeedFetcherCallback& callback,
127 google_apis::DriveApiErrorCode status,
128 scoped_ptr<google_apis::ChangeList> change_list) {
129 DCHECK(thread_checker_.CalledOnValidThread());
130 DCHECK(!callback.is_null());
132 FileError error = GDataToFileError(status);
133 if (error != FILE_ERROR_OK) {
134 callback.Run(error, ScopedVector<ChangeList>());
135 return;
138 DCHECK(change_list);
139 change_lists_.push_back(new ChangeList(*change_list));
141 if (!change_list->next_link().is_empty()) {
142 // There is the remaining result so fetch it.
143 scheduler_->GetRemainingChangeList(
144 change_list->next_link(),
145 base::Bind(&DeltaFeedFetcher::OnChangeListFetched,
146 weak_ptr_factory_.GetWeakPtr(), callback));
147 return;
150 // Note: The fetcher is managed by ChangeListLoader, and the instance
151 // will be deleted in the callback. Do not touch the fields after this
152 // invocation.
153 callback.Run(FILE_ERROR_OK, change_lists_.Pass());
156 JobScheduler* scheduler_;
157 int64 start_change_id_;
158 ScopedVector<ChangeList> change_lists_;
159 base::ThreadChecker thread_checker_;
160 base::WeakPtrFactory<DeltaFeedFetcher> weak_ptr_factory_;
161 DISALLOW_COPY_AND_ASSIGN(DeltaFeedFetcher);
164 } // namespace
166 LoaderController::LoaderController()
167 : lock_count_(0),
168 weak_ptr_factory_(this) {
171 LoaderController::~LoaderController() {
172 DCHECK(thread_checker_.CalledOnValidThread());
175 scoped_ptr<base::ScopedClosureRunner> LoaderController::GetLock() {
176 DCHECK(thread_checker_.CalledOnValidThread());
178 ++lock_count_;
179 return make_scoped_ptr(new base::ScopedClosureRunner(
180 base::Bind(&LoaderController::Unlock,
181 weak_ptr_factory_.GetWeakPtr())));
184 void LoaderController::ScheduleRun(const base::Closure& task) {
185 DCHECK(thread_checker_.CalledOnValidThread());
186 DCHECK(!task.is_null());
188 if (lock_count_ > 0) {
189 pending_tasks_.push_back(task);
190 } else {
191 task.Run();
195 void LoaderController::Unlock() {
196 DCHECK(thread_checker_.CalledOnValidThread());
197 DCHECK_LT(0, lock_count_);
199 if (--lock_count_ > 0)
200 return;
202 std::vector<base::Closure> tasks;
203 tasks.swap(pending_tasks_);
204 for (size_t i = 0; i < tasks.size(); ++i)
205 tasks[i].Run();
208 AboutResourceLoader::AboutResourceLoader(JobScheduler* scheduler)
209 : scheduler_(scheduler),
210 current_update_task_id_(-1),
211 weak_ptr_factory_(this) {
214 AboutResourceLoader::~AboutResourceLoader() {}
216 void AboutResourceLoader::GetAboutResource(
217 const google_apis::AboutResourceCallback& callback) {
218 DCHECK(thread_checker_.CalledOnValidThread());
219 DCHECK(!callback.is_null());
221 // If the latest UpdateAboutResource task is still running. Wait for it,
222 if (pending_callbacks_.count(current_update_task_id_)) {
223 pending_callbacks_[current_update_task_id_].push_back(callback);
224 return;
227 if (cached_about_resource_) {
228 base::ThreadTaskRunnerHandle::Get()->PostTask(
229 FROM_HERE,
230 base::Bind(
231 callback,
232 google_apis::HTTP_NO_CONTENT,
233 base::Passed(scoped_ptr<google_apis::AboutResource>(
234 new google_apis::AboutResource(*cached_about_resource_)))));
235 } else {
236 UpdateAboutResource(callback);
240 void AboutResourceLoader::UpdateAboutResource(
241 const google_apis::AboutResourceCallback& callback) {
242 DCHECK(thread_checker_.CalledOnValidThread());
243 DCHECK(!callback.is_null());
245 ++current_update_task_id_;
246 pending_callbacks_[current_update_task_id_].push_back(callback);
248 scheduler_->GetAboutResource(
249 base::Bind(&AboutResourceLoader::UpdateAboutResourceAfterGetAbout,
250 weak_ptr_factory_.GetWeakPtr(),
251 current_update_task_id_));
254 void AboutResourceLoader::UpdateAboutResourceAfterGetAbout(
255 int task_id,
256 google_apis::DriveApiErrorCode status,
257 scoped_ptr<google_apis::AboutResource> about_resource) {
258 DCHECK(thread_checker_.CalledOnValidThread());
259 FileError error = GDataToFileError(status);
261 const std::vector<google_apis::AboutResourceCallback> callbacks =
262 pending_callbacks_[task_id];
263 pending_callbacks_.erase(task_id);
265 if (error != FILE_ERROR_OK) {
266 for (size_t i = 0; i < callbacks.size(); ++i)
267 callbacks[i].Run(status, scoped_ptr<google_apis::AboutResource>());
268 return;
271 // Updates the cache when the resource is successfully obtained.
272 if (cached_about_resource_ &&
273 cached_about_resource_->largest_change_id() >
274 about_resource->largest_change_id()) {
275 LOG(WARNING) << "Local cached about resource is fresher than server, "
276 << "local = " << cached_about_resource_->largest_change_id()
277 << ", server = " << about_resource->largest_change_id();
279 cached_about_resource_.reset(new google_apis::AboutResource(*about_resource));
281 for (size_t i = 0; i < callbacks.size(); ++i) {
282 callbacks[i].Run(
283 status,
284 make_scoped_ptr(new google_apis::AboutResource(*about_resource)));
288 ChangeListLoader::ChangeListLoader(
289 EventLogger* logger,
290 base::SequencedTaskRunner* blocking_task_runner,
291 ResourceMetadata* resource_metadata,
292 JobScheduler* scheduler,
293 AboutResourceLoader* about_resource_loader,
294 LoaderController* loader_controller)
295 : logger_(logger),
296 blocking_task_runner_(blocking_task_runner),
297 in_shutdown_(new base::CancellationFlag),
298 resource_metadata_(resource_metadata),
299 scheduler_(scheduler),
300 about_resource_loader_(about_resource_loader),
301 loader_controller_(loader_controller),
302 loaded_(false),
303 weak_ptr_factory_(this) {
306 ChangeListLoader::~ChangeListLoader() {
307 in_shutdown_->Set();
308 // Delete |in_shutdown_| with the blocking task runner so that it gets deleted
309 // after all active ChangeListProcessors.
310 blocking_task_runner_->DeleteSoon(FROM_HERE, in_shutdown_.release());
313 bool ChangeListLoader::IsRefreshing() const {
314 // Callback for change list loading is stored in pending_load_callback_.
315 // It is non-empty if and only if there is an in-flight loading operation.
316 return !pending_load_callback_.empty();
319 void ChangeListLoader::AddObserver(ChangeListLoaderObserver* observer) {
320 DCHECK(thread_checker_.CalledOnValidThread());
321 observers_.AddObserver(observer);
324 void ChangeListLoader::RemoveObserver(ChangeListLoaderObserver* observer) {
325 DCHECK(thread_checker_.CalledOnValidThread());
326 observers_.RemoveObserver(observer);
329 void ChangeListLoader::CheckForUpdates(const FileOperationCallback& callback) {
330 DCHECK(thread_checker_.CalledOnValidThread());
331 DCHECK(!callback.is_null());
333 // We only start to check for updates iff the load is done.
334 // I.e., we ignore checking updates if not loaded to avoid starting the
335 // load without user's explicit interaction (such as opening Drive).
336 if (!loaded_ && !IsRefreshing())
337 return;
339 // For each CheckForUpdates() request, always refresh the changestamp info.
340 about_resource_loader_->UpdateAboutResource(
341 base::Bind(&ChangeListLoader::OnAboutResourceUpdated,
342 weak_ptr_factory_.GetWeakPtr()));
344 if (IsRefreshing()) {
345 // There is in-flight loading. So keep the callback here, and check for
346 // updates when the in-flight loading is completed.
347 pending_update_check_callback_ = callback;
348 return;
351 DCHECK(loaded_);
352 logger_->Log(logging::LOG_INFO, "Checking for updates");
353 Load(callback);
356 void ChangeListLoader::LoadIfNeeded(const FileOperationCallback& callback) {
357 DCHECK(thread_checker_.CalledOnValidThread());
358 DCHECK(!callback.is_null());
360 // If the metadata is not yet loaded, start loading.
361 if (!loaded_ && !IsRefreshing())
362 Load(callback);
365 void ChangeListLoader::Load(const FileOperationCallback& callback) {
366 DCHECK(thread_checker_.CalledOnValidThread());
367 DCHECK(!callback.is_null());
369 // Check if this is the first time this ChangeListLoader do loading.
370 // Note: IsRefreshing() depends on pending_load_callback_ so check in advance.
371 const bool is_initial_load = (!loaded_ && !IsRefreshing());
373 // Register the callback function to be called when it is loaded.
374 pending_load_callback_.push_back(callback);
376 // If loading task is already running, do nothing.
377 if (pending_load_callback_.size() > 1)
378 return;
380 // Check the current status of local metadata, and start loading if needed.
381 int64* local_changestamp = new int64(0);
382 base::PostTaskAndReplyWithResult(
383 blocking_task_runner_.get(),
384 FROM_HERE,
385 base::Bind(&ResourceMetadata::GetLargestChangestamp,
386 base::Unretained(resource_metadata_),
387 local_changestamp),
388 base::Bind(&ChangeListLoader::LoadAfterGetLargestChangestamp,
389 weak_ptr_factory_.GetWeakPtr(),
390 is_initial_load,
391 base::Owned(local_changestamp)));
394 void ChangeListLoader::LoadAfterGetLargestChangestamp(
395 bool is_initial_load,
396 const int64* local_changestamp,
397 FileError error) {
398 DCHECK(thread_checker_.CalledOnValidThread());
400 if (error != FILE_ERROR_OK) {
401 OnChangeListLoadComplete(error);
402 return;
405 if (is_initial_load && *local_changestamp > 0) {
406 // The local data is usable. Flush callbacks to tell loading was successful.
407 OnChangeListLoadComplete(FILE_ERROR_OK);
409 // Continues to load from server in background.
410 // Put dummy callbacks to indicate that fetching is still continuing.
411 pending_load_callback_.push_back(
412 base::Bind(&util::EmptyFileOperationCallback));
415 about_resource_loader_->GetAboutResource(
416 base::Bind(&ChangeListLoader::LoadAfterGetAboutResource,
417 weak_ptr_factory_.GetWeakPtr(),
418 *local_changestamp));
421 void ChangeListLoader::LoadAfterGetAboutResource(
422 int64 local_changestamp,
423 google_apis::DriveApiErrorCode status,
424 scoped_ptr<google_apis::AboutResource> about_resource) {
425 DCHECK(thread_checker_.CalledOnValidThread());
427 FileError error = GDataToFileError(status);
428 if (error != FILE_ERROR_OK) {
429 OnChangeListLoadComplete(error);
430 return;
433 DCHECK(about_resource);
435 int64 remote_changestamp = about_resource->largest_change_id();
436 int64 start_changestamp = local_changestamp > 0 ? local_changestamp + 1 : 0;
437 if (local_changestamp >= remote_changestamp) {
438 if (local_changestamp > remote_changestamp) {
439 LOG(WARNING) << "Local resource metadata is fresher than server, "
440 << "local = " << local_changestamp
441 << ", server = " << remote_changestamp;
444 // No changes detected, tell the client that the loading was successful.
445 OnChangeListLoadComplete(FILE_ERROR_OK);
446 } else {
447 // Start loading the change list.
448 LoadChangeListFromServer(start_changestamp);
452 void ChangeListLoader::OnChangeListLoadComplete(FileError error) {
453 DCHECK(thread_checker_.CalledOnValidThread());
455 if (!loaded_ && error == FILE_ERROR_OK) {
456 loaded_ = true;
457 FOR_EACH_OBSERVER(ChangeListLoaderObserver,
458 observers_,
459 OnInitialLoadComplete());
462 for (size_t i = 0; i < pending_load_callback_.size(); ++i) {
463 base::ThreadTaskRunnerHandle::Get()->PostTask(
464 FROM_HERE,
465 base::Bind(pending_load_callback_[i], error));
467 pending_load_callback_.clear();
469 // If there is pending update check, try to load the change from the server
470 // again, because there may exist an update during the completed loading.
471 if (!pending_update_check_callback_.is_null()) {
472 Load(base::ResetAndReturn(&pending_update_check_callback_));
476 void ChangeListLoader::OnAboutResourceUpdated(
477 google_apis::DriveApiErrorCode error,
478 scoped_ptr<google_apis::AboutResource> resource) {
479 DCHECK(thread_checker_.CalledOnValidThread());
481 if (drive::GDataToFileError(error) != drive::FILE_ERROR_OK) {
482 logger_->Log(logging::LOG_ERROR,
483 "Failed to update the about resource: %s",
484 google_apis::DriveApiErrorCodeToString(error).c_str());
485 return;
487 logger_->Log(logging::LOG_INFO,
488 "About resource updated to: %s",
489 base::Int64ToString(resource->largest_change_id()).c_str());
492 void ChangeListLoader::LoadChangeListFromServer(int64 start_changestamp) {
493 DCHECK(thread_checker_.CalledOnValidThread());
494 DCHECK(!change_feed_fetcher_);
495 DCHECK(about_resource_loader_->cached_about_resource());
497 bool is_delta_update = start_changestamp != 0;
499 // Set up feed fetcher.
500 if (is_delta_update) {
501 change_feed_fetcher_.reset(
502 new DeltaFeedFetcher(scheduler_, start_changestamp));
503 } else {
504 change_feed_fetcher_.reset(new FullFeedFetcher(scheduler_));
507 // Make a copy of cached_about_resource_ to remember at which changestamp we
508 // are fetching change list.
509 change_feed_fetcher_->Run(
510 base::Bind(&ChangeListLoader::LoadChangeListFromServerAfterLoadChangeList,
511 weak_ptr_factory_.GetWeakPtr(),
512 base::Passed(make_scoped_ptr(new google_apis::AboutResource(
513 *about_resource_loader_->cached_about_resource()))),
514 is_delta_update));
517 void ChangeListLoader::LoadChangeListFromServerAfterLoadChangeList(
518 scoped_ptr<google_apis::AboutResource> about_resource,
519 bool is_delta_update,
520 FileError error,
521 ScopedVector<ChangeList> change_lists) {
522 DCHECK(thread_checker_.CalledOnValidThread());
523 DCHECK(about_resource);
525 // Delete the fetcher first.
526 change_feed_fetcher_.reset();
528 if (error != FILE_ERROR_OK) {
529 OnChangeListLoadComplete(error);
530 return;
533 ChangeListProcessor* change_list_processor =
534 new ChangeListProcessor(resource_metadata_, in_shutdown_.get());
535 // Don't send directory content change notification while performing
536 // the initial content retrieval.
537 const bool should_notify_changed_directories = is_delta_update;
539 logger_->Log(logging::LOG_INFO,
540 "Apply change lists (is delta: %d)",
541 is_delta_update);
542 loader_controller_->ScheduleRun(base::Bind(
543 base::IgnoreResult(
544 &base::PostTaskAndReplyWithResult<FileError, FileError>),
545 blocking_task_runner_,
546 FROM_HERE,
547 base::Bind(&ChangeListProcessor::Apply,
548 base::Unretained(change_list_processor),
549 base::Passed(&about_resource),
550 base::Passed(&change_lists),
551 is_delta_update),
552 base::Bind(&ChangeListLoader::LoadChangeListFromServerAfterUpdate,
553 weak_ptr_factory_.GetWeakPtr(),
554 base::Owned(change_list_processor),
555 should_notify_changed_directories,
556 base::Time::Now())));
559 void ChangeListLoader::LoadChangeListFromServerAfterUpdate(
560 ChangeListProcessor* change_list_processor,
561 bool should_notify_changed_directories,
562 const base::Time& start_time,
563 FileError error) {
564 DCHECK(thread_checker_.CalledOnValidThread());
566 const base::TimeDelta elapsed = base::Time::Now() - start_time;
567 logger_->Log(logging::LOG_INFO,
568 "Change lists applied (elapsed time: %sms)",
569 base::Int64ToString(elapsed.InMilliseconds()).c_str());
571 if (should_notify_changed_directories) {
572 FOR_EACH_OBSERVER(ChangeListLoaderObserver,
573 observers_,
574 OnFileChanged(change_list_processor->changed_files()));
577 OnChangeListLoadComplete(error);
579 FOR_EACH_OBSERVER(ChangeListLoaderObserver,
580 observers_,
581 OnLoadFromServerComplete());
584 } // namespace internal
585 } // namespace drive