Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / components / drive / job_scheduler.cc
blob60d8ef5f7705936ad89ab9a17cc12454241e6e2a
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/drive/job_scheduler.h"
7 #include <algorithm>
9 #include "base/files/file_util.h"
10 #include "base/metrics/histogram.h"
11 #include "base/prefs/pref_service.h"
12 #include "base/rand_util.h"
13 #include "base/strings/string_number_conversions.h"
14 #include "base/strings/stringprintf.h"
15 #include "base/thread_task_runner_handle.h"
16 #include "components/drive/drive_pref_names.h"
17 #include "components/drive/event_logger.h"
18 #include "google_apis/drive/drive_api_parser.h"
20 namespace drive {
22 namespace {
24 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
25 // throttling or server error. The delay before retrying a job is shared among
26 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
28 // According to the API documentation, kMaxRetryCount should be the same as
29 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
30 // But currently multiplied by 2 to ensure upload related jobs retried for a
31 // sufficient number of times. crbug.com/269918
32 const int kMaxThrottleCount = 4;
33 const int kMaxRetryCount = 2 * kMaxThrottleCount;
34 const size_t kMaxBatchCount = 20;
35 const size_t kMaxBatchSize = 1024 * 1024 * 10;
37 // GetDefaultValue returns a value constructed by the default constructor.
38 template<typename T> struct DefaultValueCreator {
39 static T GetDefaultValue() { return T(); }
41 template<typename T> struct DefaultValueCreator<const T&> {
42 static T GetDefaultValue() { return T(); }
45 // Helper of CreateErrorRunCallback implementation.
46 // Provides:
47 // - ResultType; the type of the Callback which should be returned by
48 // CreateErrorRunCallback.
49 // - Run(): a static function which takes the original |callback| and |error|,
50 // and runs the |callback|.Run() with the error code and default values
51 // for remaining arguments.
52 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
54 // CreateErrorRunCallback with two arguments.
55 template<typename P1>
56 struct CreateErrorRunCallbackHelper<void(google_apis::DriveApiErrorCode, P1)> {
57 static void Run(
58 const base::Callback<void(google_apis::DriveApiErrorCode, P1)>& callback,
59 google_apis::DriveApiErrorCode error) {
60 callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
64 // Returns a callback with the tail parameter bound to its default value.
65 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
66 template<typename CallbackType>
67 base::Callback<void(google_apis::DriveApiErrorCode)>
68 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
69 return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
72 // Parameter struct for RunUploadNewFile.
73 struct UploadNewFileParams {
74 std::string parent_resource_id;
75 base::FilePath local_file_path;
76 std::string title;
77 std::string content_type;
78 UploadNewFileOptions options;
79 UploadCompletionCallback callback;
80 google_apis::ProgressCallback progress_callback;
83 // Helper function to work around the arity limitation of base::Bind.
84 google_apis::CancelCallback RunUploadNewFile(
85 DriveUploaderInterface* uploader,
86 const UploadNewFileParams& params) {
87 return uploader->UploadNewFile(params.parent_resource_id,
88 params.local_file_path,
89 params.title,
90 params.content_type,
91 params.options,
92 params.callback,
93 params.progress_callback);
96 // Parameter struct for RunUploadExistingFile.
97 struct UploadExistingFileParams {
98 std::string resource_id;
99 base::FilePath local_file_path;
100 std::string content_type;
101 UploadExistingFileOptions options;
102 std::string etag;
103 UploadCompletionCallback callback;
104 google_apis::ProgressCallback progress_callback;
107 // Helper function to work around the arity limitation of base::Bind.
108 google_apis::CancelCallback RunUploadExistingFile(
109 DriveUploaderInterface* uploader,
110 const UploadExistingFileParams& params) {
111 return uploader->UploadExistingFile(params.resource_id,
112 params.local_file_path,
113 params.content_type,
114 params.options,
115 params.callback,
116 params.progress_callback);
119 // Parameter struct for RunResumeUploadFile.
120 struct ResumeUploadFileParams {
121 GURL upload_location;
122 base::FilePath local_file_path;
123 std::string content_type;
124 UploadCompletionCallback callback;
125 google_apis::ProgressCallback progress_callback;
128 // Helper function to adjust the return type.
129 google_apis::CancelCallback RunResumeUploadFile(
130 DriveUploaderInterface* uploader,
131 const ResumeUploadFileParams& params) {
132 return uploader->ResumeUploadFile(params.upload_location,
133 params.local_file_path,
134 params.content_type,
135 params.callback,
136 params.progress_callback);
139 // Collects information about sizes of files copied or moved from or to Drive
140 // Otherwise does nothing. Temporary for crbug.com/229650.
141 void CollectCopyHistogramSample(const std::string& histogram_name, int64 size) {
142 base::HistogramBase* const counter =
143 base::Histogram::FactoryGet(histogram_name,
145 1024 * 1024 /* 1 GB */,
147 base::Histogram::kUmaTargetedHistogramFlag);
148 counter->Add(size / 1024);
151 } // namespace
153 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
154 const int JobScheduler::kMaxJobCount[] = {
155 5, // METADATA_QUEUE
156 1, // FILE_QUEUE
159 JobScheduler::JobEntry::JobEntry(JobType type)
160 : job_info(type),
161 context(ClientContext(USER_INITIATED)),
162 retry_count(0) {
165 JobScheduler::JobEntry::~JobEntry() {
166 DCHECK(thread_checker_.CalledOnValidThread());
169 struct JobScheduler::ResumeUploadParams {
170 base::FilePath drive_file_path;
171 base::FilePath local_file_path;
172 std::string content_type;
175 JobScheduler::JobScheduler(PrefService* pref_service,
176 EventLogger* logger,
177 DriveServiceInterface* drive_service,
178 base::SequencedTaskRunner* blocking_task_runner)
179 : throttle_count_(0),
180 wait_until_(base::Time::Now()),
181 disable_throttling_(false),
182 logger_(logger),
183 drive_service_(drive_service),
184 blocking_task_runner_(blocking_task_runner),
185 uploader_(new DriveUploader(drive_service, blocking_task_runner)),
186 pref_service_(pref_service),
187 weak_ptr_factory_(this) {
188 for (int i = 0; i < NUM_QUEUES; ++i)
189 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES,
190 kMaxBatchCount, kMaxBatchSize));
192 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
195 JobScheduler::~JobScheduler() {
196 DCHECK(thread_checker_.CalledOnValidThread());
198 size_t num_queued_jobs = 0;
199 for (int i = 0; i < NUM_QUEUES; ++i)
200 num_queued_jobs += queue_[i]->GetNumberOfJobs();
201 DCHECK_EQ(num_queued_jobs, job_map_.size());
203 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
206 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
207 std::vector<JobInfo> job_info_list;
208 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
209 job_info_list.push_back(iter.GetCurrentValue()->job_info);
210 return job_info_list;
213 void JobScheduler::AddObserver(JobListObserver* observer) {
214 DCHECK(thread_checker_.CalledOnValidThread());
215 observer_list_.AddObserver(observer);
218 void JobScheduler::RemoveObserver(JobListObserver* observer) {
219 DCHECK(thread_checker_.CalledOnValidThread());
220 observer_list_.RemoveObserver(observer);
223 void JobScheduler::CancelJob(JobID job_id) {
224 DCHECK(thread_checker_.CalledOnValidThread());
226 JobEntry* job = job_map_.Lookup(job_id);
227 if (job) {
228 if (job->job_info.state == STATE_RUNNING) {
229 // If the job is running an HTTP request, cancel it via |cancel_callback|
230 // returned from the request, and wait for termination in the normal
231 // callback handler, OnJobDone.
232 if (!job->cancel_callback.is_null())
233 job->cancel_callback.Run();
234 } else {
235 AbortNotRunningJob(job, google_apis::DRIVE_CANCELLED);
240 void JobScheduler::CancelAllJobs() {
241 DCHECK(thread_checker_.CalledOnValidThread());
243 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
244 // removable during iteration.
245 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
246 CancelJob(iter.GetCurrentKey());
249 void JobScheduler::GetAboutResource(
250 const google_apis::AboutResourceCallback& callback) {
251 DCHECK(thread_checker_.CalledOnValidThread());
252 DCHECK(!callback.is_null());
254 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
255 new_job->task = base::Bind(
256 &DriveServiceInterface::GetAboutResource,
257 base::Unretained(drive_service_),
258 base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
259 weak_ptr_factory_.GetWeakPtr(),
260 new_job->job_info.job_id,
261 callback));
262 new_job->abort_callback = CreateErrorRunCallback(callback);
263 StartJob(new_job);
266 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
267 DCHECK(thread_checker_.CalledOnValidThread());
268 DCHECK(!callback.is_null());
270 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
271 new_job->task = base::Bind(
272 &DriveServiceInterface::GetAppList,
273 base::Unretained(drive_service_),
274 base::Bind(&JobScheduler::OnGetAppListJobDone,
275 weak_ptr_factory_.GetWeakPtr(),
276 new_job->job_info.job_id,
277 callback));
278 new_job->abort_callback = CreateErrorRunCallback(callback);
279 StartJob(new_job);
282 void JobScheduler::GetAllFileList(
283 const google_apis::FileListCallback& callback) {
284 DCHECK(thread_checker_.CalledOnValidThread());
285 DCHECK(!callback.is_null());
287 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
288 new_job->task = base::Bind(
289 &DriveServiceInterface::GetAllFileList,
290 base::Unretained(drive_service_),
291 base::Bind(&JobScheduler::OnGetFileListJobDone,
292 weak_ptr_factory_.GetWeakPtr(),
293 new_job->job_info.job_id,
294 callback));
295 new_job->abort_callback = CreateErrorRunCallback(callback);
296 StartJob(new_job);
299 void JobScheduler::GetFileListInDirectory(
300 const std::string& directory_resource_id,
301 const google_apis::FileListCallback& callback) {
302 DCHECK(thread_checker_.CalledOnValidThread());
303 DCHECK(!callback.is_null());
305 JobEntry* new_job = CreateNewJob(
306 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
307 new_job->task = base::Bind(
308 &DriveServiceInterface::GetFileListInDirectory,
309 base::Unretained(drive_service_),
310 directory_resource_id,
311 base::Bind(&JobScheduler::OnGetFileListJobDone,
312 weak_ptr_factory_.GetWeakPtr(),
313 new_job->job_info.job_id,
314 callback));
315 new_job->abort_callback = CreateErrorRunCallback(callback);
316 StartJob(new_job);
319 void JobScheduler::Search(const std::string& search_query,
320 const google_apis::FileListCallback& callback) {
321 DCHECK(thread_checker_.CalledOnValidThread());
322 DCHECK(!callback.is_null());
324 JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
325 new_job->task = base::Bind(
326 &DriveServiceInterface::Search,
327 base::Unretained(drive_service_),
328 search_query,
329 base::Bind(&JobScheduler::OnGetFileListJobDone,
330 weak_ptr_factory_.GetWeakPtr(),
331 new_job->job_info.job_id,
332 callback));
333 new_job->abort_callback = CreateErrorRunCallback(callback);
334 StartJob(new_job);
337 void JobScheduler::GetChangeList(
338 int64 start_changestamp,
339 const google_apis::ChangeListCallback& callback) {
340 DCHECK(thread_checker_.CalledOnValidThread());
341 DCHECK(!callback.is_null());
343 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
344 new_job->task = base::Bind(
345 &DriveServiceInterface::GetChangeList,
346 base::Unretained(drive_service_),
347 start_changestamp,
348 base::Bind(&JobScheduler::OnGetChangeListJobDone,
349 weak_ptr_factory_.GetWeakPtr(),
350 new_job->job_info.job_id,
351 callback));
352 new_job->abort_callback = CreateErrorRunCallback(callback);
353 StartJob(new_job);
356 void JobScheduler::GetRemainingChangeList(
357 const GURL& next_link,
358 const google_apis::ChangeListCallback& callback) {
359 DCHECK(thread_checker_.CalledOnValidThread());
360 DCHECK(!callback.is_null());
362 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
363 new_job->task = base::Bind(
364 &DriveServiceInterface::GetRemainingChangeList,
365 base::Unretained(drive_service_),
366 next_link,
367 base::Bind(&JobScheduler::OnGetChangeListJobDone,
368 weak_ptr_factory_.GetWeakPtr(),
369 new_job->job_info.job_id,
370 callback));
371 new_job->abort_callback = CreateErrorRunCallback(callback);
372 StartJob(new_job);
375 void JobScheduler::GetRemainingFileList(
376 const GURL& next_link,
377 const google_apis::FileListCallback& callback) {
378 DCHECK(thread_checker_.CalledOnValidThread());
379 DCHECK(!callback.is_null());
381 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
382 new_job->task = base::Bind(
383 &DriveServiceInterface::GetRemainingFileList,
384 base::Unretained(drive_service_),
385 next_link,
386 base::Bind(&JobScheduler::OnGetFileListJobDone,
387 weak_ptr_factory_.GetWeakPtr(),
388 new_job->job_info.job_id,
389 callback));
390 new_job->abort_callback = CreateErrorRunCallback(callback);
391 StartJob(new_job);
394 void JobScheduler::GetFileResource(
395 const std::string& resource_id,
396 const ClientContext& context,
397 const google_apis::FileResourceCallback& callback) {
398 DCHECK(thread_checker_.CalledOnValidThread());
399 DCHECK(!callback.is_null());
401 JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
402 new_job->context = context;
403 new_job->task = base::Bind(
404 &DriveServiceInterface::GetFileResource,
405 base::Unretained(drive_service_),
406 resource_id,
407 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
408 weak_ptr_factory_.GetWeakPtr(),
409 new_job->job_info.job_id,
410 callback));
411 new_job->abort_callback = CreateErrorRunCallback(callback);
412 StartJob(new_job);
415 void JobScheduler::GetShareUrl(
416 const std::string& resource_id,
417 const GURL& embed_origin,
418 const ClientContext& context,
419 const google_apis::GetShareUrlCallback& callback) {
420 DCHECK(thread_checker_.CalledOnValidThread());
421 DCHECK(!callback.is_null());
423 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
424 new_job->context = context;
425 new_job->task = base::Bind(
426 &DriveServiceInterface::GetShareUrl,
427 base::Unretained(drive_service_),
428 resource_id,
429 embed_origin,
430 base::Bind(&JobScheduler::OnGetShareUrlJobDone,
431 weak_ptr_factory_.GetWeakPtr(),
432 new_job->job_info.job_id,
433 callback));
434 new_job->abort_callback = CreateErrorRunCallback(callback);
435 StartJob(new_job);
438 void JobScheduler::TrashResource(
439 const std::string& resource_id,
440 const ClientContext& context,
441 const google_apis::EntryActionCallback& callback) {
442 DCHECK(thread_checker_.CalledOnValidThread());
443 DCHECK(!callback.is_null());
445 JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
446 new_job->context = context;
447 new_job->task = base::Bind(
448 &DriveServiceInterface::TrashResource,
449 base::Unretained(drive_service_),
450 resource_id,
451 base::Bind(&JobScheduler::OnEntryActionJobDone,
452 weak_ptr_factory_.GetWeakPtr(),
453 new_job->job_info.job_id,
454 callback));
455 new_job->abort_callback = callback;
456 StartJob(new_job);
459 void JobScheduler::CopyResource(
460 const std::string& resource_id,
461 const std::string& parent_resource_id,
462 const std::string& new_title,
463 const base::Time& last_modified,
464 const google_apis::FileResourceCallback& callback) {
465 DCHECK(thread_checker_.CalledOnValidThread());
466 DCHECK(!callback.is_null());
468 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
469 new_job->task = base::Bind(
470 &DriveServiceInterface::CopyResource,
471 base::Unretained(drive_service_),
472 resource_id,
473 parent_resource_id,
474 new_title,
475 last_modified,
476 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
477 weak_ptr_factory_.GetWeakPtr(),
478 new_job->job_info.job_id,
479 callback));
480 new_job->abort_callback = CreateErrorRunCallback(callback);
481 StartJob(new_job);
484 void JobScheduler::UpdateResource(
485 const std::string& resource_id,
486 const std::string& parent_resource_id,
487 const std::string& new_title,
488 const base::Time& last_modified,
489 const base::Time& last_viewed_by_me,
490 const google_apis::drive::Properties& properties,
491 const ClientContext& context,
492 const google_apis::FileResourceCallback& callback) {
493 DCHECK(thread_checker_.CalledOnValidThread());
494 DCHECK(!callback.is_null());
496 JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
497 new_job->context = context;
498 new_job->task = base::Bind(&DriveServiceInterface::UpdateResource,
499 base::Unretained(drive_service_), resource_id,
500 parent_resource_id, new_title, last_modified,
501 last_viewed_by_me, properties,
502 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
503 weak_ptr_factory_.GetWeakPtr(),
504 new_job->job_info.job_id, callback));
505 new_job->abort_callback = CreateErrorRunCallback(callback);
506 StartJob(new_job);
509 void JobScheduler::AddResourceToDirectory(
510 const std::string& parent_resource_id,
511 const std::string& resource_id,
512 const google_apis::EntryActionCallback& callback) {
513 DCHECK(thread_checker_.CalledOnValidThread());
514 DCHECK(!callback.is_null());
516 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
517 new_job->task = base::Bind(
518 &DriveServiceInterface::AddResourceToDirectory,
519 base::Unretained(drive_service_),
520 parent_resource_id,
521 resource_id,
522 base::Bind(&JobScheduler::OnEntryActionJobDone,
523 weak_ptr_factory_.GetWeakPtr(),
524 new_job->job_info.job_id,
525 callback));
526 new_job->abort_callback = callback;
527 StartJob(new_job);
530 void JobScheduler::RemoveResourceFromDirectory(
531 const std::string& parent_resource_id,
532 const std::string& resource_id,
533 const ClientContext& context,
534 const google_apis::EntryActionCallback& callback) {
535 DCHECK(thread_checker_.CalledOnValidThread());
537 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
538 new_job->context = context;
539 new_job->task = base::Bind(
540 &DriveServiceInterface::RemoveResourceFromDirectory,
541 base::Unretained(drive_service_),
542 parent_resource_id,
543 resource_id,
544 base::Bind(&JobScheduler::OnEntryActionJobDone,
545 weak_ptr_factory_.GetWeakPtr(),
546 new_job->job_info.job_id,
547 callback));
548 new_job->abort_callback = callback;
549 StartJob(new_job);
552 void JobScheduler::AddNewDirectory(
553 const std::string& parent_resource_id,
554 const std::string& directory_title,
555 const AddNewDirectoryOptions& options,
556 const ClientContext& context,
557 const google_apis::FileResourceCallback& callback) {
558 DCHECK(thread_checker_.CalledOnValidThread());
560 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
561 new_job->context = context;
562 new_job->task = base::Bind(
563 &DriveServiceInterface::AddNewDirectory,
564 base::Unretained(drive_service_),
565 parent_resource_id,
566 directory_title,
567 options,
568 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
569 weak_ptr_factory_.GetWeakPtr(),
570 new_job->job_info.job_id,
571 callback));
572 new_job->abort_callback = CreateErrorRunCallback(callback);
573 StartJob(new_job);
576 JobID JobScheduler::DownloadFile(
577 const base::FilePath& virtual_path,
578 int64 expected_file_size,
579 const base::FilePath& local_cache_path,
580 const std::string& resource_id,
581 const ClientContext& context,
582 const google_apis::DownloadActionCallback& download_action_callback,
583 const google_apis::GetContentCallback& get_content_callback) {
584 DCHECK(thread_checker_.CalledOnValidThread());
586 // Temporary histogram for crbug.com/229650.
587 CollectCopyHistogramSample("Drive.DownloadFromDriveFileSize",
588 expected_file_size);
590 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
591 new_job->job_info.file_path = virtual_path;
592 new_job->job_info.num_total_bytes = expected_file_size;
593 new_job->context = context;
594 new_job->task = base::Bind(
595 &DriveServiceInterface::DownloadFile,
596 base::Unretained(drive_service_),
597 local_cache_path,
598 resource_id,
599 base::Bind(&JobScheduler::OnDownloadActionJobDone,
600 weak_ptr_factory_.GetWeakPtr(),
601 new_job->job_info.job_id,
602 download_action_callback),
603 get_content_callback,
604 base::Bind(&JobScheduler::UpdateProgress,
605 weak_ptr_factory_.GetWeakPtr(),
606 new_job->job_info.job_id));
607 new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
608 StartJob(new_job);
609 return new_job->job_info.job_id;
612 void JobScheduler::UploadNewFile(
613 const std::string& parent_resource_id,
614 int64 expected_file_size,
615 const base::FilePath& drive_file_path,
616 const base::FilePath& local_file_path,
617 const std::string& title,
618 const std::string& content_type,
619 const UploadNewFileOptions& options,
620 const ClientContext& context,
621 const google_apis::FileResourceCallback& callback) {
622 DCHECK(thread_checker_.CalledOnValidThread());
624 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
625 new_job->job_info.file_path = drive_file_path;
626 new_job->job_info.num_total_bytes = expected_file_size;
627 new_job->context = context;
629 // Temporary histogram for crbug.com/229650.
630 CollectCopyHistogramSample("Drive.UploadToDriveFileSize", expected_file_size);
632 UploadNewFileParams params;
633 params.parent_resource_id = parent_resource_id;
634 params.local_file_path = local_file_path;
635 params.title = title;
636 params.content_type = content_type;
637 params.options = options;
639 ResumeUploadParams resume_params;
640 resume_params.local_file_path = params.local_file_path;
641 resume_params.content_type = params.content_type;
643 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
644 weak_ptr_factory_.GetWeakPtr(),
645 new_job->job_info.job_id,
646 resume_params,
647 callback);
648 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
649 weak_ptr_factory_.GetWeakPtr(),
650 new_job->job_info.job_id);
651 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
652 new_job->abort_callback = CreateErrorRunCallback(callback);
653 StartJob(new_job);
656 void JobScheduler::UploadExistingFile(
657 const std::string& resource_id,
658 int64 expected_file_size,
659 const base::FilePath& drive_file_path,
660 const base::FilePath& local_file_path,
661 const std::string& content_type,
662 const UploadExistingFileOptions& options,
663 const ClientContext& context,
664 const google_apis::FileResourceCallback& callback) {
665 DCHECK(thread_checker_.CalledOnValidThread());
667 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
668 new_job->job_info.file_path = drive_file_path;
669 new_job->job_info.num_total_bytes = expected_file_size;
670 new_job->context = context;
672 // Temporary histogram for crbug.com/229650.
673 CollectCopyHistogramSample("Drive.UploadToDriveFileSize", expected_file_size);
675 UploadExistingFileParams params;
676 params.resource_id = resource_id;
677 params.local_file_path = local_file_path;
678 params.content_type = content_type;
679 params.options = options;
681 ResumeUploadParams resume_params;
682 resume_params.local_file_path = params.local_file_path;
683 resume_params.content_type = params.content_type;
685 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
686 weak_ptr_factory_.GetWeakPtr(),
687 new_job->job_info.job_id,
688 resume_params,
689 callback);
690 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
691 weak_ptr_factory_.GetWeakPtr(),
692 new_job->job_info.job_id);
693 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
694 new_job->abort_callback = CreateErrorRunCallback(callback);
695 StartJob(new_job);
698 void JobScheduler::AddPermission(
699 const std::string& resource_id,
700 const std::string& email,
701 google_apis::drive::PermissionRole role,
702 const google_apis::EntryActionCallback& callback) {
703 DCHECK(thread_checker_.CalledOnValidThread());
704 DCHECK(!callback.is_null());
706 JobEntry* new_job = CreateNewJob(TYPE_ADD_PERMISSION);
707 new_job->task = base::Bind(&DriveServiceInterface::AddPermission,
708 base::Unretained(drive_service_),
709 resource_id,
710 email,
711 role,
712 base::Bind(&JobScheduler::OnEntryActionJobDone,
713 weak_ptr_factory_.GetWeakPtr(),
714 new_job->job_info.job_id,
715 callback));
716 new_job->abort_callback = callback;
717 StartJob(new_job);
720 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
721 JobEntry* job = new JobEntry(type);
722 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|.
723 return job;
726 void JobScheduler::StartJob(JobEntry* job) {
727 DCHECK(!job->task.is_null());
729 QueueJob(job->job_info.job_id);
730 NotifyJobAdded(job->job_info);
731 DoJobLoop(GetJobQueueType(job->job_info.job_type));
734 void JobScheduler::QueueJob(JobID job_id) {
735 DCHECK(thread_checker_.CalledOnValidThread());
737 JobEntry* job_entry = job_map_.Lookup(job_id);
738 DCHECK(job_entry);
739 const JobInfo& job_info = job_entry->job_info;
741 const QueueType queue_type = GetJobQueueType(job_info.job_type);
742 const bool batchable = job_info.job_type == TYPE_UPLOAD_EXISTING_FILE ||
743 job_info.job_type == TYPE_UPLOAD_NEW_FILE;
744 queue_[queue_type]->Push(job_id, job_entry->context.type, batchable,
745 job_info.num_total_bytes);
747 // Temporary histogram for crbug.com/229650.
748 if (job_info.job_type == TYPE_DOWNLOAD_FILE ||
749 job_info.job_type == TYPE_UPLOAD_EXISTING_FILE ||
750 job_info.job_type == TYPE_UPLOAD_NEW_FILE) {
751 std::vector<JobID> jobs_with_the_same_priority;
752 queue_[queue_type]->GetQueuedJobs(job_entry->context.type,
753 &jobs_with_the_same_priority);
754 DCHECK(!jobs_with_the_same_priority.empty());
756 const size_t blocking_jobs_count = jobs_with_the_same_priority.size() - 1;
757 UMA_HISTOGRAM_COUNTS_10000("Drive.TransferBlockedOnJobs",
758 blocking_jobs_count);
761 const std::string retry_prefix = job_entry->retry_count > 0 ?
762 base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
763 logger_->Log(logging::LOG_INFO,
764 "Job queued%s: %s - %s",
765 retry_prefix.c_str(),
766 job_info.ToString().c_str(),
767 GetQueueInfo(queue_type).c_str());
770 void JobScheduler::DoJobLoop(QueueType queue_type) {
771 DCHECK(thread_checker_.CalledOnValidThread());
773 const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
775 // Abort all USER_INITAITED jobs when not accepted.
776 if (accepted_priority < USER_INITIATED) {
777 std::vector<JobID> jobs;
778 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
779 for (size_t i = 0; i < jobs.size(); ++i) {
780 JobEntry* job = job_map_.Lookup(jobs[i]);
781 DCHECK(job);
782 AbortNotRunningJob(job, google_apis::DRIVE_NO_CONNECTION);
786 // Wait when throttled.
787 const base::Time now = base::Time::Now();
788 if (now < wait_until_) {
789 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
790 FROM_HERE,
791 base::Bind(&JobScheduler::DoJobLoop,
792 weak_ptr_factory_.GetWeakPtr(),
793 queue_type),
794 wait_until_ - now);
795 return;
798 // Run the job with the highest priority in the queue.
799 std::vector<JobID> job_ids;
800 queue_[queue_type]->PopForRun(accepted_priority, &job_ids);
801 if (job_ids.empty())
802 return;
804 if (job_ids.size() > 1)
805 uploader_->StartBatchProcessing();
807 for (JobID job_id : job_ids) {
808 JobEntry* entry = job_map_.Lookup(job_id);
809 DCHECK(entry);
811 JobInfo* job_info = &entry->job_info;
812 job_info->state = STATE_RUNNING;
813 job_info->start_time = now;
814 NotifyJobUpdated(*job_info);
816 entry->cancel_callback = entry->task.Run();
817 logger_->Log(logging::LOG_INFO, "Job started: %s - %s",
818 job_info->ToString().c_str(),
819 GetQueueInfo(queue_type).c_str());
822 if (job_ids.size() > 1)
823 uploader_->StopBatchProcessing();
825 UpdateWait();
828 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
829 DCHECK(thread_checker_.CalledOnValidThread());
831 const int kNoJobShouldRun = -1;
833 // Should stop if Drive was disabled while running the fetch loop.
834 if (pref_service_->GetBoolean(prefs::kDisableDrive))
835 return kNoJobShouldRun;
837 // Should stop if the network is not online.
838 if (net::NetworkChangeNotifier::IsOffline())
839 return kNoJobShouldRun;
841 // For the file queue, if it is on cellular network, only user initiated
842 // operations are allowed to start.
843 if (queue_type == FILE_QUEUE &&
844 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
845 net::NetworkChangeNotifier::IsConnectionCellular(
846 net::NetworkChangeNotifier::GetConnectionType()))
847 return USER_INITIATED;
849 // Otherwise, every operations including background tasks are allowed.
850 return BACKGROUND;
853 void JobScheduler::UpdateWait() {
854 DCHECK(thread_checker_.CalledOnValidThread());
856 if (disable_throttling_ || throttle_count_ == 0)
857 return;
859 // Exponential backoff: https://developers.google.com/drive/handle-errors.
860 base::TimeDelta delay =
861 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
862 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
863 VLOG(1) << "Throttling for " << delay.InMillisecondsF();
865 wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
868 bool JobScheduler::OnJobDone(JobID job_id,
869 google_apis::DriveApiErrorCode error) {
870 DCHECK(thread_checker_.CalledOnValidThread());
872 JobEntry* job_entry = job_map_.Lookup(job_id);
873 DCHECK(job_entry);
874 JobInfo* job_info = &job_entry->job_info;
875 QueueType queue_type = GetJobQueueType(job_info->job_type);
876 queue_[queue_type]->MarkFinished(job_id);
878 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
879 bool success = (GDataToFileError(error) == FILE_ERROR_OK);
880 logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
881 "Job done: %s => %s (elapsed time: %sms) - %s",
882 job_info->ToString().c_str(),
883 DriveApiErrorCodeToString(error).c_str(),
884 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
885 GetQueueInfo(queue_type).c_str());
887 // Retry, depending on the error.
888 const bool is_server_error =
889 error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
890 error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
891 if (is_server_error) {
892 if (throttle_count_ < kMaxThrottleCount)
893 ++throttle_count_;
894 UpdateWait();
895 } else {
896 throttle_count_ = 0;
899 const bool should_retry =
900 is_server_error && job_entry->retry_count < kMaxRetryCount;
901 if (should_retry) {
902 job_entry->cancel_callback.Reset();
903 job_info->state = STATE_RETRY;
904 NotifyJobUpdated(*job_info);
906 ++job_entry->retry_count;
908 // Requeue the job.
909 QueueJob(job_id);
910 } else {
911 NotifyJobDone(*job_info, error);
912 // The job has finished, no retry will happen in the scheduler. Now we can
913 // remove the job info from the map.
914 job_map_.Remove(job_id);
917 // Post a task to continue the job loop. This allows us to finish handling
918 // the current job before starting the next one.
919 base::ThreadTaskRunnerHandle::Get()->PostTask(
920 FROM_HERE,
921 base::Bind(&JobScheduler::DoJobLoop,
922 weak_ptr_factory_.GetWeakPtr(),
923 queue_type));
924 return !should_retry;
927 void JobScheduler::OnGetFileListJobDone(
928 JobID job_id,
929 const google_apis::FileListCallback& callback,
930 google_apis::DriveApiErrorCode error,
931 scoped_ptr<google_apis::FileList> file_list) {
932 DCHECK(thread_checker_.CalledOnValidThread());
933 DCHECK(!callback.is_null());
935 if (OnJobDone(job_id, error))
936 callback.Run(error, file_list.Pass());
939 void JobScheduler::OnGetChangeListJobDone(
940 JobID job_id,
941 const google_apis::ChangeListCallback& callback,
942 google_apis::DriveApiErrorCode error,
943 scoped_ptr<google_apis::ChangeList> change_list) {
944 DCHECK(thread_checker_.CalledOnValidThread());
945 DCHECK(!callback.is_null());
947 if (OnJobDone(job_id, error))
948 callback.Run(error, change_list.Pass());
951 void JobScheduler::OnGetFileResourceJobDone(
952 JobID job_id,
953 const google_apis::FileResourceCallback& callback,
954 google_apis::DriveApiErrorCode error,
955 scoped_ptr<google_apis::FileResource> entry) {
956 DCHECK(thread_checker_.CalledOnValidThread());
957 DCHECK(!callback.is_null());
959 if (OnJobDone(job_id, error))
960 callback.Run(error, entry.Pass());
963 void JobScheduler::OnGetAboutResourceJobDone(
964 JobID job_id,
965 const google_apis::AboutResourceCallback& callback,
966 google_apis::DriveApiErrorCode error,
967 scoped_ptr<google_apis::AboutResource> about_resource) {
968 DCHECK(thread_checker_.CalledOnValidThread());
969 DCHECK(!callback.is_null());
971 if (OnJobDone(job_id, error))
972 callback.Run(error, about_resource.Pass());
975 void JobScheduler::OnGetShareUrlJobDone(
976 JobID job_id,
977 const google_apis::GetShareUrlCallback& callback,
978 google_apis::DriveApiErrorCode error,
979 const GURL& share_url) {
980 DCHECK(thread_checker_.CalledOnValidThread());
981 DCHECK(!callback.is_null());
983 if (OnJobDone(job_id, error))
984 callback.Run(error, share_url);
987 void JobScheduler::OnGetAppListJobDone(
988 JobID job_id,
989 const google_apis::AppListCallback& callback,
990 google_apis::DriveApiErrorCode error,
991 scoped_ptr<google_apis::AppList> app_list) {
992 DCHECK(thread_checker_.CalledOnValidThread());
993 DCHECK(!callback.is_null());
995 if (OnJobDone(job_id, error))
996 callback.Run(error, app_list.Pass());
999 void JobScheduler::OnEntryActionJobDone(
1000 JobID job_id,
1001 const google_apis::EntryActionCallback& callback,
1002 google_apis::DriveApiErrorCode error) {
1003 DCHECK(thread_checker_.CalledOnValidThread());
1004 DCHECK(!callback.is_null());
1006 if (OnJobDone(job_id, error))
1007 callback.Run(error);
1010 void JobScheduler::OnDownloadActionJobDone(
1011 JobID job_id,
1012 const google_apis::DownloadActionCallback& callback,
1013 google_apis::DriveApiErrorCode error,
1014 const base::FilePath& temp_file) {
1015 DCHECK(thread_checker_.CalledOnValidThread());
1016 DCHECK(!callback.is_null());
1018 if (OnJobDone(job_id, error))
1019 callback.Run(error, temp_file);
1022 void JobScheduler::OnUploadCompletionJobDone(
1023 JobID job_id,
1024 const ResumeUploadParams& resume_params,
1025 const google_apis::FileResourceCallback& callback,
1026 google_apis::DriveApiErrorCode error,
1027 const GURL& upload_location,
1028 scoped_ptr<google_apis::FileResource> entry) {
1029 DCHECK(thread_checker_.CalledOnValidThread());
1030 DCHECK(!callback.is_null());
1032 if (!upload_location.is_empty()) {
1033 // If upload_location is available, update the task to resume the
1034 // upload process from the terminated point.
1035 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1036 // so OnJobDone called below will be in charge to re-queue the job.
1037 JobEntry* job_entry = job_map_.Lookup(job_id);
1038 DCHECK(job_entry);
1040 ResumeUploadFileParams params;
1041 params.upload_location = upload_location;
1042 params.local_file_path = resume_params.local_file_path;
1043 params.content_type = resume_params.content_type;
1044 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1045 weak_ptr_factory_.GetWeakPtr(),
1046 job_id,
1047 job_entry->task,
1048 callback);
1049 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1050 weak_ptr_factory_.GetWeakPtr(),
1051 job_id);
1052 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1055 if (OnJobDone(job_id, error))
1056 callback.Run(error, entry.Pass());
1059 void JobScheduler::OnResumeUploadFileDone(
1060 JobID job_id,
1061 const base::Callback<google_apis::CancelCallback()>& original_task,
1062 const google_apis::FileResourceCallback& callback,
1063 google_apis::DriveApiErrorCode error,
1064 const GURL& upload_location,
1065 scoped_ptr<google_apis::FileResource> entry) {
1066 DCHECK(thread_checker_.CalledOnValidThread());
1067 DCHECK(!original_task.is_null());
1068 DCHECK(!callback.is_null());
1070 if (upload_location.is_empty()) {
1071 // If upload_location is not available, we should discard it and stop trying
1072 // to resume. Restore the original task.
1073 JobEntry* job_entry = job_map_.Lookup(job_id);
1074 DCHECK(job_entry);
1075 job_entry->task = original_task;
1078 if (OnJobDone(job_id, error))
1079 callback.Run(error, entry.Pass());
1082 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1083 JobEntry* job_entry = job_map_.Lookup(job_id);
1084 DCHECK(job_entry);
1086 job_entry->job_info.num_completed_bytes = progress;
1087 if (total != -1)
1088 job_entry->job_info.num_total_bytes = total;
1089 NotifyJobUpdated(job_entry->job_info);
1092 void JobScheduler::OnConnectionTypeChanged(
1093 net::NetworkChangeNotifier::ConnectionType type) {
1094 DCHECK(thread_checker_.CalledOnValidThread());
1096 // Resume the job loop.
1097 // Note that we don't need to check the network connection status as it will
1098 // be checked in GetCurrentAcceptedPriority().
1099 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1100 DoJobLoop(static_cast<QueueType>(i));
1103 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1104 switch (type) {
1105 case TYPE_GET_ABOUT_RESOURCE:
1106 case TYPE_GET_APP_LIST:
1107 case TYPE_GET_ALL_RESOURCE_LIST:
1108 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1109 case TYPE_SEARCH:
1110 case TYPE_GET_CHANGE_LIST:
1111 case TYPE_GET_REMAINING_CHANGE_LIST:
1112 case TYPE_GET_REMAINING_FILE_LIST:
1113 case TYPE_GET_RESOURCE_ENTRY:
1114 case TYPE_GET_SHARE_URL:
1115 case TYPE_TRASH_RESOURCE:
1116 case TYPE_COPY_RESOURCE:
1117 case TYPE_UPDATE_RESOURCE:
1118 case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1119 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1120 case TYPE_ADD_NEW_DIRECTORY:
1121 case TYPE_ADD_PERMISSION:
1122 return METADATA_QUEUE;
1124 case TYPE_DOWNLOAD_FILE:
1125 case TYPE_UPLOAD_NEW_FILE:
1126 case TYPE_UPLOAD_EXISTING_FILE:
1127 return FILE_QUEUE;
1129 NOTREACHED();
1130 return FILE_QUEUE;
1133 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1134 google_apis::DriveApiErrorCode error) {
1135 DCHECK(thread_checker_.CalledOnValidThread());
1137 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1138 const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1139 logger_->Log(logging::LOG_INFO,
1140 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1141 job->job_info.ToString().c_str(),
1142 DriveApiErrorCodeToString(error).c_str(),
1143 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1144 GetQueueInfo(queue_type).c_str());
1146 base::Callback<void(google_apis::DriveApiErrorCode)> callback =
1147 job->abort_callback;
1148 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1149 NotifyJobDone(job->job_info, error);
1150 job_map_.Remove(job->job_info.job_id);
1151 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE,
1152 base::Bind(callback, error));
1155 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1156 DCHECK(thread_checker_.CalledOnValidThread());
1157 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1160 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1161 google_apis::DriveApiErrorCode error) {
1162 DCHECK(thread_checker_.CalledOnValidThread());
1163 FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1164 OnJobDone(job_info, GDataToFileError(error)));
1167 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1168 DCHECK(thread_checker_.CalledOnValidThread());
1169 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1172 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1173 return QueueTypeToString(type) + " " + queue_[type]->ToString();
1176 // static
1177 std::string JobScheduler::QueueTypeToString(QueueType type) {
1178 switch (type) {
1179 case METADATA_QUEUE:
1180 return "METADATA_QUEUE";
1181 case FILE_QUEUE:
1182 return "FILE_QUEUE";
1183 case NUM_QUEUES:
1184 break; // This value is just a sentinel. Should never be used.
1186 NOTREACHED();
1187 return "";
1190 } // namespace drive