Add ENABLE_MEDIA_ROUTER define to builds other than Android and iOS.
[chromium-blink-merge.git] / chrome / browser / chromeos / drive / job_scheduler.cc
blob58700a32300c30af4a511ee4453eeb1cea7b95b4
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
7 #include "base/files/file_util.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h"
10 #include "base/prefs/pref_service.h"
11 #include "base/rand_util.h"
12 #include "base/strings/string_number_conversions.h"
13 #include "base/strings/stringprintf.h"
14 #include "chrome/browser/drive/event_logger.h"
15 #include "chrome/common/pref_names.h"
16 #include "content/public/browser/browser_thread.h"
17 #include "google_apis/drive/drive_api_parser.h"
19 using content::BrowserThread;
21 namespace drive {
23 namespace {
25 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
26 // throttling or server error. The delay before retrying a job is shared among
27 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
29 // According to the API documentation, kMaxRetryCount should be the same as
30 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
31 // But currently multiplied by 2 to ensure upload related jobs retried for a
32 // sufficient number of times. crbug.com/269918
33 const int kMaxThrottleCount = 4;
34 const int kMaxRetryCount = 2 * kMaxThrottleCount;
36 // GetDefaultValue returns a value constructed by the default constructor.
37 template<typename T> struct DefaultValueCreator {
38 static T GetDefaultValue() { return T(); }
40 template<typename T> struct DefaultValueCreator<const T&> {
41 static T GetDefaultValue() { return T(); }
44 // Helper of CreateErrorRunCallback implementation.
45 // Provides:
46 // - ResultType; the type of the Callback which should be returned by
47 // CreateErrorRunCallback.
48 // - Run(): a static function which takes the original |callback| and |error|,
49 // and runs the |callback|.Run() with the error code and default values
50 // for remaining arguments.
51 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
53 // CreateErrorRunCallback with two arguments.
54 template<typename P1>
55 struct CreateErrorRunCallbackHelper<void(google_apis::DriveApiErrorCode, P1)> {
56 static void Run(
57 const base::Callback<void(google_apis::DriveApiErrorCode, P1)>& callback,
58 google_apis::DriveApiErrorCode error) {
59 callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
63 // Returns a callback with the tail parameter bound to its default value.
64 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
65 template<typename CallbackType>
66 base::Callback<void(google_apis::DriveApiErrorCode)>
67 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
68 return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
71 // Parameter struct for RunUploadNewFile.
72 struct UploadNewFileParams {
73 std::string parent_resource_id;
74 base::FilePath local_file_path;
75 std::string title;
76 std::string content_type;
77 UploadNewFileOptions options;
78 UploadCompletionCallback callback;
79 google_apis::ProgressCallback progress_callback;
82 // Helper function to work around the arity limitation of base::Bind.
83 google_apis::CancelCallback RunUploadNewFile(
84 DriveUploaderInterface* uploader,
85 const UploadNewFileParams& params) {
86 return uploader->UploadNewFile(params.parent_resource_id,
87 params.local_file_path,
88 params.title,
89 params.content_type,
90 params.options,
91 params.callback,
92 params.progress_callback);
95 // Parameter struct for RunUploadExistingFile.
96 struct UploadExistingFileParams {
97 std::string resource_id;
98 base::FilePath local_file_path;
99 std::string content_type;
100 UploadExistingFileOptions options;
101 std::string etag;
102 UploadCompletionCallback callback;
103 google_apis::ProgressCallback progress_callback;
106 // Helper function to work around the arity limitation of base::Bind.
107 google_apis::CancelCallback RunUploadExistingFile(
108 DriveUploaderInterface* uploader,
109 const UploadExistingFileParams& params) {
110 return uploader->UploadExistingFile(params.resource_id,
111 params.local_file_path,
112 params.content_type,
113 params.options,
114 params.callback,
115 params.progress_callback);
118 // Parameter struct for RunResumeUploadFile.
119 struct ResumeUploadFileParams {
120 GURL upload_location;
121 base::FilePath local_file_path;
122 std::string content_type;
123 UploadCompletionCallback callback;
124 google_apis::ProgressCallback progress_callback;
127 // Helper function to adjust the return type.
128 google_apis::CancelCallback RunResumeUploadFile(
129 DriveUploaderInterface* uploader,
130 const ResumeUploadFileParams& params) {
131 return uploader->ResumeUploadFile(params.upload_location,
132 params.local_file_path,
133 params.content_type,
134 params.callback,
135 params.progress_callback);
138 // Collects information about sizes of files copied or moved from or to Drive
139 // Otherwise does nothing. Temporary for crbug.com/229650.
140 void CollectCopyHistogramSample(const std::string& histogram_name, int64 size) {
141 base::HistogramBase* const counter =
142 base::Histogram::FactoryGet(histogram_name,
144 1024 * 1024 /* 1 GB */,
146 base::Histogram::kUmaTargetedHistogramFlag);
147 counter->Add(size / 1024);
150 // Obtains file size to be uploaded for setting total bytes of JobInfo.
151 void GetFileSizeForJob(base::SequencedTaskRunner* blocking_task_runner,
152 const base::FilePath& local_file_path,
153 const base::Callback<void(int64* size)>& callback) {
154 int64* const size = new int64;
155 *size = -1;
156 blocking_task_runner->PostTaskAndReply(
157 FROM_HERE, base::Bind(base::IgnoreResult(&base::GetFileSize),
158 local_file_path, base::Unretained(size)),
159 base::Bind(callback, base::Owned(size)));
162 } // namespace
164 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
165 const int JobScheduler::kMaxJobCount[] = {
166 5, // METADATA_QUEUE
167 1, // FILE_QUEUE
170 JobScheduler::JobEntry::JobEntry(JobType type)
171 : job_info(type),
172 context(ClientContext(USER_INITIATED)),
173 retry_count(0) {
174 DCHECK_CURRENTLY_ON(BrowserThread::UI);
177 JobScheduler::JobEntry::~JobEntry() {
178 DCHECK_CURRENTLY_ON(BrowserThread::UI);
181 struct JobScheduler::ResumeUploadParams {
182 base::FilePath drive_file_path;
183 base::FilePath local_file_path;
184 std::string content_type;
187 JobScheduler::JobScheduler(PrefService* pref_service,
188 EventLogger* logger,
189 DriveServiceInterface* drive_service,
190 base::SequencedTaskRunner* blocking_task_runner)
191 : throttle_count_(0),
192 wait_until_(base::Time::Now()),
193 disable_throttling_(false),
194 logger_(logger),
195 drive_service_(drive_service),
196 blocking_task_runner_(blocking_task_runner),
197 uploader_(new DriveUploader(drive_service, blocking_task_runner)),
198 pref_service_(pref_service),
199 weak_ptr_factory_(this) {
200 DCHECK_CURRENTLY_ON(BrowserThread::UI);
202 for (int i = 0; i < NUM_QUEUES; ++i)
203 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
205 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
208 JobScheduler::~JobScheduler() {
209 DCHECK_CURRENTLY_ON(BrowserThread::UI);
211 size_t num_queued_jobs = 0;
212 for (int i = 0; i < NUM_QUEUES; ++i)
213 num_queued_jobs += queue_[i]->GetNumberOfJobs();
214 DCHECK_EQ(num_queued_jobs, job_map_.size());
216 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
219 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
220 std::vector<JobInfo> job_info_list;
221 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
222 job_info_list.push_back(iter.GetCurrentValue()->job_info);
223 return job_info_list;
226 void JobScheduler::AddObserver(JobListObserver* observer) {
227 DCHECK_CURRENTLY_ON(BrowserThread::UI);
228 observer_list_.AddObserver(observer);
231 void JobScheduler::RemoveObserver(JobListObserver* observer) {
232 DCHECK_CURRENTLY_ON(BrowserThread::UI);
233 observer_list_.RemoveObserver(observer);
236 void JobScheduler::CancelJob(JobID job_id) {
237 DCHECK_CURRENTLY_ON(BrowserThread::UI);
239 JobEntry* job = job_map_.Lookup(job_id);
240 if (job) {
241 if (job->job_info.state == STATE_RUNNING) {
242 // If the job is running an HTTP request, cancel it via |cancel_callback|
243 // returned from the request, and wait for termination in the normal
244 // callback handler, OnJobDone.
245 if (!job->cancel_callback.is_null())
246 job->cancel_callback.Run();
247 } else {
248 AbortNotRunningJob(job, google_apis::DRIVE_CANCELLED);
253 void JobScheduler::CancelAllJobs() {
254 DCHECK_CURRENTLY_ON(BrowserThread::UI);
256 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
257 // removable during iteration.
258 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
259 CancelJob(iter.GetCurrentKey());
262 void JobScheduler::GetAboutResource(
263 const google_apis::AboutResourceCallback& callback) {
264 DCHECK_CURRENTLY_ON(BrowserThread::UI);
265 DCHECK(!callback.is_null());
267 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
268 new_job->task = base::Bind(
269 &DriveServiceInterface::GetAboutResource,
270 base::Unretained(drive_service_),
271 base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
272 weak_ptr_factory_.GetWeakPtr(),
273 new_job->job_info.job_id,
274 callback));
275 new_job->abort_callback = CreateErrorRunCallback(callback);
276 StartJob(new_job);
279 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
280 DCHECK_CURRENTLY_ON(BrowserThread::UI);
281 DCHECK(!callback.is_null());
283 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
284 new_job->task = base::Bind(
285 &DriveServiceInterface::GetAppList,
286 base::Unretained(drive_service_),
287 base::Bind(&JobScheduler::OnGetAppListJobDone,
288 weak_ptr_factory_.GetWeakPtr(),
289 new_job->job_info.job_id,
290 callback));
291 new_job->abort_callback = CreateErrorRunCallback(callback);
292 StartJob(new_job);
295 void JobScheduler::GetAllFileList(
296 const google_apis::FileListCallback& callback) {
297 DCHECK_CURRENTLY_ON(BrowserThread::UI);
298 DCHECK(!callback.is_null());
300 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
301 new_job->task = base::Bind(
302 &DriveServiceInterface::GetAllFileList,
303 base::Unretained(drive_service_),
304 base::Bind(&JobScheduler::OnGetFileListJobDone,
305 weak_ptr_factory_.GetWeakPtr(),
306 new_job->job_info.job_id,
307 callback));
308 new_job->abort_callback = CreateErrorRunCallback(callback);
309 StartJob(new_job);
312 void JobScheduler::GetFileListInDirectory(
313 const std::string& directory_resource_id,
314 const google_apis::FileListCallback& callback) {
315 DCHECK_CURRENTLY_ON(BrowserThread::UI);
316 DCHECK(!callback.is_null());
318 JobEntry* new_job = CreateNewJob(
319 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
320 new_job->task = base::Bind(
321 &DriveServiceInterface::GetFileListInDirectory,
322 base::Unretained(drive_service_),
323 directory_resource_id,
324 base::Bind(&JobScheduler::OnGetFileListJobDone,
325 weak_ptr_factory_.GetWeakPtr(),
326 new_job->job_info.job_id,
327 callback));
328 new_job->abort_callback = CreateErrorRunCallback(callback);
329 StartJob(new_job);
332 void JobScheduler::Search(const std::string& search_query,
333 const google_apis::FileListCallback& callback) {
334 DCHECK_CURRENTLY_ON(BrowserThread::UI);
335 DCHECK(!callback.is_null());
337 JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
338 new_job->task = base::Bind(
339 &DriveServiceInterface::Search,
340 base::Unretained(drive_service_),
341 search_query,
342 base::Bind(&JobScheduler::OnGetFileListJobDone,
343 weak_ptr_factory_.GetWeakPtr(),
344 new_job->job_info.job_id,
345 callback));
346 new_job->abort_callback = CreateErrorRunCallback(callback);
347 StartJob(new_job);
350 void JobScheduler::GetChangeList(
351 int64 start_changestamp,
352 const google_apis::ChangeListCallback& callback) {
353 DCHECK_CURRENTLY_ON(BrowserThread::UI);
354 DCHECK(!callback.is_null());
356 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
357 new_job->task = base::Bind(
358 &DriveServiceInterface::GetChangeList,
359 base::Unretained(drive_service_),
360 start_changestamp,
361 base::Bind(&JobScheduler::OnGetChangeListJobDone,
362 weak_ptr_factory_.GetWeakPtr(),
363 new_job->job_info.job_id,
364 callback));
365 new_job->abort_callback = CreateErrorRunCallback(callback);
366 StartJob(new_job);
369 void JobScheduler::GetRemainingChangeList(
370 const GURL& next_link,
371 const google_apis::ChangeListCallback& callback) {
372 DCHECK_CURRENTLY_ON(BrowserThread::UI);
373 DCHECK(!callback.is_null());
375 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
376 new_job->task = base::Bind(
377 &DriveServiceInterface::GetRemainingChangeList,
378 base::Unretained(drive_service_),
379 next_link,
380 base::Bind(&JobScheduler::OnGetChangeListJobDone,
381 weak_ptr_factory_.GetWeakPtr(),
382 new_job->job_info.job_id,
383 callback));
384 new_job->abort_callback = CreateErrorRunCallback(callback);
385 StartJob(new_job);
388 void JobScheduler::GetRemainingFileList(
389 const GURL& next_link,
390 const google_apis::FileListCallback& callback) {
391 DCHECK_CURRENTLY_ON(BrowserThread::UI);
392 DCHECK(!callback.is_null());
394 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
395 new_job->task = base::Bind(
396 &DriveServiceInterface::GetRemainingFileList,
397 base::Unretained(drive_service_),
398 next_link,
399 base::Bind(&JobScheduler::OnGetFileListJobDone,
400 weak_ptr_factory_.GetWeakPtr(),
401 new_job->job_info.job_id,
402 callback));
403 new_job->abort_callback = CreateErrorRunCallback(callback);
404 StartJob(new_job);
407 void JobScheduler::GetFileResource(
408 const std::string& resource_id,
409 const ClientContext& context,
410 const google_apis::FileResourceCallback& callback) {
411 DCHECK_CURRENTLY_ON(BrowserThread::UI);
412 DCHECK(!callback.is_null());
414 JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
415 new_job->context = context;
416 new_job->task = base::Bind(
417 &DriveServiceInterface::GetFileResource,
418 base::Unretained(drive_service_),
419 resource_id,
420 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
421 weak_ptr_factory_.GetWeakPtr(),
422 new_job->job_info.job_id,
423 callback));
424 new_job->abort_callback = CreateErrorRunCallback(callback);
425 StartJob(new_job);
428 void JobScheduler::GetShareUrl(
429 const std::string& resource_id,
430 const GURL& embed_origin,
431 const ClientContext& context,
432 const google_apis::GetShareUrlCallback& callback) {
433 DCHECK_CURRENTLY_ON(BrowserThread::UI);
434 DCHECK(!callback.is_null());
436 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
437 new_job->context = context;
438 new_job->task = base::Bind(
439 &DriveServiceInterface::GetShareUrl,
440 base::Unretained(drive_service_),
441 resource_id,
442 embed_origin,
443 base::Bind(&JobScheduler::OnGetShareUrlJobDone,
444 weak_ptr_factory_.GetWeakPtr(),
445 new_job->job_info.job_id,
446 callback));
447 new_job->abort_callback = CreateErrorRunCallback(callback);
448 StartJob(new_job);
451 void JobScheduler::TrashResource(
452 const std::string& resource_id,
453 const ClientContext& context,
454 const google_apis::EntryActionCallback& callback) {
455 DCHECK_CURRENTLY_ON(BrowserThread::UI);
456 DCHECK(!callback.is_null());
458 JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
459 new_job->context = context;
460 new_job->task = base::Bind(
461 &DriveServiceInterface::TrashResource,
462 base::Unretained(drive_service_),
463 resource_id,
464 base::Bind(&JobScheduler::OnEntryActionJobDone,
465 weak_ptr_factory_.GetWeakPtr(),
466 new_job->job_info.job_id,
467 callback));
468 new_job->abort_callback = callback;
469 StartJob(new_job);
472 void JobScheduler::CopyResource(
473 const std::string& resource_id,
474 const std::string& parent_resource_id,
475 const std::string& new_title,
476 const base::Time& last_modified,
477 const google_apis::FileResourceCallback& callback) {
478 DCHECK_CURRENTLY_ON(BrowserThread::UI);
479 DCHECK(!callback.is_null());
481 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
482 new_job->task = base::Bind(
483 &DriveServiceInterface::CopyResource,
484 base::Unretained(drive_service_),
485 resource_id,
486 parent_resource_id,
487 new_title,
488 last_modified,
489 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
490 weak_ptr_factory_.GetWeakPtr(),
491 new_job->job_info.job_id,
492 callback));
493 new_job->abort_callback = CreateErrorRunCallback(callback);
494 StartJob(new_job);
497 void JobScheduler::UpdateResource(
498 const std::string& resource_id,
499 const std::string& parent_resource_id,
500 const std::string& new_title,
501 const base::Time& last_modified,
502 const base::Time& last_viewed_by_me,
503 const google_apis::drive::Properties& properties,
504 const ClientContext& context,
505 const google_apis::FileResourceCallback& callback) {
506 DCHECK_CURRENTLY_ON(BrowserThread::UI);
507 DCHECK(!callback.is_null());
509 JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
510 new_job->context = context;
511 new_job->task = base::Bind(&DriveServiceInterface::UpdateResource,
512 base::Unretained(drive_service_), resource_id,
513 parent_resource_id, new_title, last_modified,
514 last_viewed_by_me, properties,
515 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
516 weak_ptr_factory_.GetWeakPtr(),
517 new_job->job_info.job_id, callback));
518 new_job->abort_callback = CreateErrorRunCallback(callback);
519 StartJob(new_job);
522 void JobScheduler::AddResourceToDirectory(
523 const std::string& parent_resource_id,
524 const std::string& resource_id,
525 const google_apis::EntryActionCallback& callback) {
526 DCHECK_CURRENTLY_ON(BrowserThread::UI);
527 DCHECK(!callback.is_null());
529 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
530 new_job->task = base::Bind(
531 &DriveServiceInterface::AddResourceToDirectory,
532 base::Unretained(drive_service_),
533 parent_resource_id,
534 resource_id,
535 base::Bind(&JobScheduler::OnEntryActionJobDone,
536 weak_ptr_factory_.GetWeakPtr(),
537 new_job->job_info.job_id,
538 callback));
539 new_job->abort_callback = callback;
540 StartJob(new_job);
543 void JobScheduler::RemoveResourceFromDirectory(
544 const std::string& parent_resource_id,
545 const std::string& resource_id,
546 const ClientContext& context,
547 const google_apis::EntryActionCallback& callback) {
548 DCHECK_CURRENTLY_ON(BrowserThread::UI);
550 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
551 new_job->context = context;
552 new_job->task = base::Bind(
553 &DriveServiceInterface::RemoveResourceFromDirectory,
554 base::Unretained(drive_service_),
555 parent_resource_id,
556 resource_id,
557 base::Bind(&JobScheduler::OnEntryActionJobDone,
558 weak_ptr_factory_.GetWeakPtr(),
559 new_job->job_info.job_id,
560 callback));
561 new_job->abort_callback = callback;
562 StartJob(new_job);
565 void JobScheduler::AddNewDirectory(
566 const std::string& parent_resource_id,
567 const std::string& directory_title,
568 const AddNewDirectoryOptions& options,
569 const ClientContext& context,
570 const google_apis::FileResourceCallback& callback) {
571 DCHECK_CURRENTLY_ON(BrowserThread::UI);
573 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
574 new_job->context = context;
575 new_job->task = base::Bind(
576 &DriveServiceInterface::AddNewDirectory,
577 base::Unretained(drive_service_),
578 parent_resource_id,
579 directory_title,
580 options,
581 base::Bind(&JobScheduler::OnGetFileResourceJobDone,
582 weak_ptr_factory_.GetWeakPtr(),
583 new_job->job_info.job_id,
584 callback));
585 new_job->abort_callback = CreateErrorRunCallback(callback);
586 StartJob(new_job);
589 JobID JobScheduler::DownloadFile(
590 const base::FilePath& virtual_path,
591 int64 expected_file_size,
592 const base::FilePath& local_cache_path,
593 const std::string& resource_id,
594 const ClientContext& context,
595 const google_apis::DownloadActionCallback& download_action_callback,
596 const google_apis::GetContentCallback& get_content_callback) {
597 DCHECK_CURRENTLY_ON(BrowserThread::UI);
599 // Temporary histogram for crbug.com/229650.
600 CollectCopyHistogramSample("Drive.DownloadFromDriveFileSize",
601 expected_file_size);
603 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
604 new_job->job_info.file_path = virtual_path;
605 new_job->job_info.num_total_bytes = expected_file_size;
606 new_job->context = context;
607 new_job->task = base::Bind(
608 &DriveServiceInterface::DownloadFile,
609 base::Unretained(drive_service_),
610 local_cache_path,
611 resource_id,
612 base::Bind(&JobScheduler::OnDownloadActionJobDone,
613 weak_ptr_factory_.GetWeakPtr(),
614 new_job->job_info.job_id,
615 download_action_callback),
616 get_content_callback,
617 base::Bind(&JobScheduler::UpdateProgress,
618 weak_ptr_factory_.GetWeakPtr(),
619 new_job->job_info.job_id));
620 new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
621 StartJob(new_job);
622 return new_job->job_info.job_id;
625 void JobScheduler::UploadNewFile(
626 const std::string& parent_resource_id,
627 const base::FilePath& drive_file_path,
628 const base::FilePath& local_file_path,
629 const std::string& title,
630 const std::string& content_type,
631 const UploadNewFileOptions& options,
632 const ClientContext& context,
633 const google_apis::FileResourceCallback& callback) {
634 DCHECK_CURRENTLY_ON(BrowserThread::UI);
636 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
637 new_job->job_info.file_path = drive_file_path;
638 new_job->context = context;
640 GetFileSizeForJob(
641 blocking_task_runner_, local_file_path,
642 base::Bind(&JobScheduler::OnGotFileSizeForJob,
643 weak_ptr_factory_.GetWeakPtr(), new_job->job_info.job_id,
644 "Drive.UploadToDriveFileSize"));
646 UploadNewFileParams params;
647 params.parent_resource_id = parent_resource_id;
648 params.local_file_path = local_file_path;
649 params.title = title;
650 params.content_type = content_type;
651 params.options = options;
653 ResumeUploadParams resume_params;
654 resume_params.local_file_path = params.local_file_path;
655 resume_params.content_type = params.content_type;
657 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
658 weak_ptr_factory_.GetWeakPtr(),
659 new_job->job_info.job_id,
660 resume_params,
661 callback);
662 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
663 weak_ptr_factory_.GetWeakPtr(),
664 new_job->job_info.job_id);
665 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
666 new_job->abort_callback = CreateErrorRunCallback(callback);
667 StartJob(new_job);
670 void JobScheduler::UploadExistingFile(
671 const std::string& resource_id,
672 const base::FilePath& drive_file_path,
673 const base::FilePath& local_file_path,
674 const std::string& content_type,
675 const UploadExistingFileOptions& options,
676 const ClientContext& context,
677 const google_apis::FileResourceCallback& callback) {
678 DCHECK_CURRENTLY_ON(BrowserThread::UI);
680 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
681 new_job->job_info.file_path = drive_file_path;
682 new_job->context = context;
684 GetFileSizeForJob(
685 blocking_task_runner_, local_file_path,
686 base::Bind(&JobScheduler::OnGotFileSizeForJob,
687 weak_ptr_factory_.GetWeakPtr(), new_job->job_info.job_id,
688 "Drive.UploadToDriveFileSize"));
690 UploadExistingFileParams params;
691 params.resource_id = resource_id;
692 params.local_file_path = local_file_path;
693 params.content_type = content_type;
694 params.options = options;
696 ResumeUploadParams resume_params;
697 resume_params.local_file_path = params.local_file_path;
698 resume_params.content_type = params.content_type;
700 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
701 weak_ptr_factory_.GetWeakPtr(),
702 new_job->job_info.job_id,
703 resume_params,
704 callback);
705 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
706 weak_ptr_factory_.GetWeakPtr(),
707 new_job->job_info.job_id);
708 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
709 new_job->abort_callback = CreateErrorRunCallback(callback);
710 StartJob(new_job);
713 void JobScheduler::AddPermission(
714 const std::string& resource_id,
715 const std::string& email,
716 google_apis::drive::PermissionRole role,
717 const google_apis::EntryActionCallback& callback) {
718 DCHECK_CURRENTLY_ON(BrowserThread::UI);
719 DCHECK(!callback.is_null());
721 JobEntry* new_job = CreateNewJob(TYPE_ADD_PERMISSION);
722 new_job->task = base::Bind(&DriveServiceInterface::AddPermission,
723 base::Unretained(drive_service_),
724 resource_id,
725 email,
726 role,
727 base::Bind(&JobScheduler::OnEntryActionJobDone,
728 weak_ptr_factory_.GetWeakPtr(),
729 new_job->job_info.job_id,
730 callback));
731 new_job->abort_callback = callback;
732 StartJob(new_job);
735 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
736 JobEntry* job = new JobEntry(type);
737 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|.
738 return job;
741 void JobScheduler::StartJob(JobEntry* job) {
742 DCHECK(!job->task.is_null());
744 QueueJob(job->job_info.job_id);
745 NotifyJobAdded(job->job_info);
746 DoJobLoop(GetJobQueueType(job->job_info.job_type));
749 void JobScheduler::QueueJob(JobID job_id) {
750 DCHECK_CURRENTLY_ON(BrowserThread::UI);
752 JobEntry* job_entry = job_map_.Lookup(job_id);
753 DCHECK(job_entry);
754 const JobInfo& job_info = job_entry->job_info;
756 const QueueType queue_type = GetJobQueueType(job_info.job_type);
757 queue_[queue_type]->Push(job_id, job_entry->context.type);
759 // Temporary histogram for crbug.com/229650.
760 if (job_info.job_type == TYPE_DOWNLOAD_FILE ||
761 job_info.job_type == TYPE_UPLOAD_EXISTING_FILE ||
762 job_info.job_type == TYPE_UPLOAD_NEW_FILE) {
763 std::vector<JobID> jobs_with_the_same_priority;
764 queue_[queue_type]->GetQueuedJobs(job_entry->context.type,
765 &jobs_with_the_same_priority);
766 DCHECK(!jobs_with_the_same_priority.empty());
768 const size_t blocking_jobs_count = jobs_with_the_same_priority.size() - 1;
769 UMA_HISTOGRAM_COUNTS_10000("Drive.TransferBlockedOnJobs",
770 blocking_jobs_count);
773 const std::string retry_prefix = job_entry->retry_count > 0 ?
774 base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
775 logger_->Log(logging::LOG_INFO,
776 "Job queued%s: %s - %s",
777 retry_prefix.c_str(),
778 job_info.ToString().c_str(),
779 GetQueueInfo(queue_type).c_str());
782 void JobScheduler::DoJobLoop(QueueType queue_type) {
783 DCHECK_CURRENTLY_ON(BrowserThread::UI);
785 const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
787 // Abort all USER_INITAITED jobs when not accepted.
788 if (accepted_priority < USER_INITIATED) {
789 std::vector<JobID> jobs;
790 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
791 for (size_t i = 0; i < jobs.size(); ++i) {
792 JobEntry* job = job_map_.Lookup(jobs[i]);
793 DCHECK(job);
794 AbortNotRunningJob(job, google_apis::DRIVE_NO_CONNECTION);
798 // Wait when throttled.
799 const base::Time now = base::Time::Now();
800 if (now < wait_until_) {
801 base::MessageLoopProxy::current()->PostDelayedTask(
802 FROM_HERE,
803 base::Bind(&JobScheduler::DoJobLoop,
804 weak_ptr_factory_.GetWeakPtr(),
805 queue_type),
806 wait_until_ - now);
807 return;
810 // Run the job with the highest priority in the queue.
811 JobID job_id = -1;
812 if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
813 return;
815 JobEntry* entry = job_map_.Lookup(job_id);
816 DCHECK(entry);
818 JobInfo* job_info = &entry->job_info;
819 job_info->state = STATE_RUNNING;
820 job_info->start_time = now;
821 NotifyJobUpdated(*job_info);
823 entry->cancel_callback = entry->task.Run();
825 UpdateWait();
827 logger_->Log(logging::LOG_INFO,
828 "Job started: %s - %s",
829 job_info->ToString().c_str(),
830 GetQueueInfo(queue_type).c_str());
833 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
834 DCHECK_CURRENTLY_ON(BrowserThread::UI);
836 const int kNoJobShouldRun = -1;
838 // Should stop if Drive was disabled while running the fetch loop.
839 if (pref_service_->GetBoolean(prefs::kDisableDrive))
840 return kNoJobShouldRun;
842 // Should stop if the network is not online.
843 if (net::NetworkChangeNotifier::IsOffline())
844 return kNoJobShouldRun;
846 // For the file queue, if it is on cellular network, only user initiated
847 // operations are allowed to start.
848 if (queue_type == FILE_QUEUE &&
849 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
850 net::NetworkChangeNotifier::IsConnectionCellular(
851 net::NetworkChangeNotifier::GetConnectionType()))
852 return USER_INITIATED;
854 // Otherwise, every operations including background tasks are allowed.
855 return BACKGROUND;
858 void JobScheduler::UpdateWait() {
859 DCHECK_CURRENTLY_ON(BrowserThread::UI);
861 if (disable_throttling_ || throttle_count_ == 0)
862 return;
864 // Exponential backoff: https://developers.google.com/drive/handle-errors.
865 base::TimeDelta delay =
866 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
867 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
868 VLOG(1) << "Throttling for " << delay.InMillisecondsF();
870 wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
873 bool JobScheduler::OnJobDone(JobID job_id,
874 google_apis::DriveApiErrorCode error) {
875 DCHECK_CURRENTLY_ON(BrowserThread::UI);
877 JobEntry* job_entry = job_map_.Lookup(job_id);
878 DCHECK(job_entry);
879 JobInfo* job_info = &job_entry->job_info;
880 QueueType queue_type = GetJobQueueType(job_info->job_type);
881 queue_[queue_type]->MarkFinished(job_id);
883 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
884 bool success = (GDataToFileError(error) == FILE_ERROR_OK);
885 logger_->Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
886 "Job done: %s => %s (elapsed time: %sms) - %s",
887 job_info->ToString().c_str(),
888 DriveApiErrorCodeToString(error).c_str(),
889 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
890 GetQueueInfo(queue_type).c_str());
892 // Retry, depending on the error.
893 const bool is_server_error =
894 error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
895 error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
896 if (is_server_error) {
897 if (throttle_count_ < kMaxThrottleCount)
898 ++throttle_count_;
899 UpdateWait();
900 } else {
901 throttle_count_ = 0;
904 const bool should_retry =
905 is_server_error && job_entry->retry_count < kMaxRetryCount;
906 if (should_retry) {
907 job_entry->cancel_callback.Reset();
908 job_info->state = STATE_RETRY;
909 NotifyJobUpdated(*job_info);
911 ++job_entry->retry_count;
913 // Requeue the job.
914 QueueJob(job_id);
915 } else {
916 NotifyJobDone(*job_info, error);
917 // The job has finished, no retry will happen in the scheduler. Now we can
918 // remove the job info from the map.
919 job_map_.Remove(job_id);
922 // Post a task to continue the job loop. This allows us to finish handling
923 // the current job before starting the next one.
924 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
925 base::Bind(&JobScheduler::DoJobLoop,
926 weak_ptr_factory_.GetWeakPtr(),
927 queue_type));
928 return !should_retry;
931 void JobScheduler::OnGetFileListJobDone(
932 JobID job_id,
933 const google_apis::FileListCallback& callback,
934 google_apis::DriveApiErrorCode error,
935 scoped_ptr<google_apis::FileList> file_list) {
936 DCHECK_CURRENTLY_ON(BrowserThread::UI);
937 DCHECK(!callback.is_null());
939 if (OnJobDone(job_id, error))
940 callback.Run(error, file_list.Pass());
943 void JobScheduler::OnGetChangeListJobDone(
944 JobID job_id,
945 const google_apis::ChangeListCallback& callback,
946 google_apis::DriveApiErrorCode error,
947 scoped_ptr<google_apis::ChangeList> change_list) {
948 DCHECK_CURRENTLY_ON(BrowserThread::UI);
949 DCHECK(!callback.is_null());
951 if (OnJobDone(job_id, error))
952 callback.Run(error, change_list.Pass());
955 void JobScheduler::OnGetFileResourceJobDone(
956 JobID job_id,
957 const google_apis::FileResourceCallback& callback,
958 google_apis::DriveApiErrorCode error,
959 scoped_ptr<google_apis::FileResource> entry) {
960 DCHECK_CURRENTLY_ON(BrowserThread::UI);
961 DCHECK(!callback.is_null());
963 if (OnJobDone(job_id, error))
964 callback.Run(error, entry.Pass());
967 void JobScheduler::OnGetAboutResourceJobDone(
968 JobID job_id,
969 const google_apis::AboutResourceCallback& callback,
970 google_apis::DriveApiErrorCode error,
971 scoped_ptr<google_apis::AboutResource> about_resource) {
972 DCHECK_CURRENTLY_ON(BrowserThread::UI);
973 DCHECK(!callback.is_null());
975 if (OnJobDone(job_id, error))
976 callback.Run(error, about_resource.Pass());
979 void JobScheduler::OnGetShareUrlJobDone(
980 JobID job_id,
981 const google_apis::GetShareUrlCallback& callback,
982 google_apis::DriveApiErrorCode error,
983 const GURL& share_url) {
984 DCHECK_CURRENTLY_ON(BrowserThread::UI);
985 DCHECK(!callback.is_null());
987 if (OnJobDone(job_id, error))
988 callback.Run(error, share_url);
991 void JobScheduler::OnGetAppListJobDone(
992 JobID job_id,
993 const google_apis::AppListCallback& callback,
994 google_apis::DriveApiErrorCode error,
995 scoped_ptr<google_apis::AppList> app_list) {
996 DCHECK_CURRENTLY_ON(BrowserThread::UI);
997 DCHECK(!callback.is_null());
999 if (OnJobDone(job_id, error))
1000 callback.Run(error, app_list.Pass());
1003 void JobScheduler::OnEntryActionJobDone(
1004 JobID job_id,
1005 const google_apis::EntryActionCallback& callback,
1006 google_apis::DriveApiErrorCode error) {
1007 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1008 DCHECK(!callback.is_null());
1010 if (OnJobDone(job_id, error))
1011 callback.Run(error);
1014 void JobScheduler::OnDownloadActionJobDone(
1015 JobID job_id,
1016 const google_apis::DownloadActionCallback& callback,
1017 google_apis::DriveApiErrorCode error,
1018 const base::FilePath& temp_file) {
1019 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1020 DCHECK(!callback.is_null());
1022 if (OnJobDone(job_id, error))
1023 callback.Run(error, temp_file);
1026 void JobScheduler::OnUploadCompletionJobDone(
1027 JobID job_id,
1028 const ResumeUploadParams& resume_params,
1029 const google_apis::FileResourceCallback& callback,
1030 google_apis::DriveApiErrorCode error,
1031 const GURL& upload_location,
1032 scoped_ptr<google_apis::FileResource> entry) {
1033 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1034 DCHECK(!callback.is_null());
1036 if (!upload_location.is_empty()) {
1037 // If upload_location is available, update the task to resume the
1038 // upload process from the terminated point.
1039 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1040 // so OnJobDone called below will be in charge to re-queue the job.
1041 JobEntry* job_entry = job_map_.Lookup(job_id);
1042 DCHECK(job_entry);
1044 ResumeUploadFileParams params;
1045 params.upload_location = upload_location;
1046 params.local_file_path = resume_params.local_file_path;
1047 params.content_type = resume_params.content_type;
1048 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1049 weak_ptr_factory_.GetWeakPtr(),
1050 job_id,
1051 job_entry->task,
1052 callback);
1053 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1054 weak_ptr_factory_.GetWeakPtr(),
1055 job_id);
1056 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1059 if (OnJobDone(job_id, error))
1060 callback.Run(error, entry.Pass());
1063 void JobScheduler::OnResumeUploadFileDone(
1064 JobID job_id,
1065 const base::Callback<google_apis::CancelCallback()>& original_task,
1066 const google_apis::FileResourceCallback& callback,
1067 google_apis::DriveApiErrorCode error,
1068 const GURL& upload_location,
1069 scoped_ptr<google_apis::FileResource> entry) {
1070 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1071 DCHECK(!original_task.is_null());
1072 DCHECK(!callback.is_null());
1074 if (upload_location.is_empty()) {
1075 // If upload_location is not available, we should discard it and stop trying
1076 // to resume. Restore the original task.
1077 JobEntry* job_entry = job_map_.Lookup(job_id);
1078 DCHECK(job_entry);
1079 job_entry->task = original_task;
1082 if (OnJobDone(job_id, error))
1083 callback.Run(error, entry.Pass());
1086 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1087 JobEntry* job_entry = job_map_.Lookup(job_id);
1088 DCHECK(job_entry);
1090 job_entry->job_info.num_completed_bytes = progress;
1091 if (total != -1)
1092 job_entry->job_info.num_total_bytes = total;
1093 NotifyJobUpdated(job_entry->job_info);
1096 void JobScheduler::OnConnectionTypeChanged(
1097 net::NetworkChangeNotifier::ConnectionType type) {
1098 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1100 // Resume the job loop.
1101 // Note that we don't need to check the network connection status as it will
1102 // be checked in GetCurrentAcceptedPriority().
1103 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1104 DoJobLoop(static_cast<QueueType>(i));
1107 void JobScheduler::OnGotFileSizeForJob(JobID job_id,
1108 const std::string& histogram_name,
1109 int64* size) {
1110 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1111 if (*size == -1)
1112 return;
1114 // Temporary histogram for crbug.com/229650.
1115 CollectCopyHistogramSample(histogram_name, *size);
1117 JobEntry* const job_entry = job_map_.Lookup(job_id);
1118 if (!job_entry)
1119 return;
1121 job_entry->job_info.num_total_bytes = *size;
1122 NotifyJobUpdated(job_entry->job_info);
1125 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1126 switch (type) {
1127 case TYPE_GET_ABOUT_RESOURCE:
1128 case TYPE_GET_APP_LIST:
1129 case TYPE_GET_ALL_RESOURCE_LIST:
1130 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1131 case TYPE_SEARCH:
1132 case TYPE_GET_CHANGE_LIST:
1133 case TYPE_GET_REMAINING_CHANGE_LIST:
1134 case TYPE_GET_REMAINING_FILE_LIST:
1135 case TYPE_GET_RESOURCE_ENTRY:
1136 case TYPE_GET_SHARE_URL:
1137 case TYPE_TRASH_RESOURCE:
1138 case TYPE_COPY_RESOURCE:
1139 case TYPE_UPDATE_RESOURCE:
1140 case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1141 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1142 case TYPE_ADD_NEW_DIRECTORY:
1143 case TYPE_ADD_PERMISSION:
1144 return METADATA_QUEUE;
1146 case TYPE_DOWNLOAD_FILE:
1147 case TYPE_UPLOAD_NEW_FILE:
1148 case TYPE_UPLOAD_EXISTING_FILE:
1149 return FILE_QUEUE;
1151 NOTREACHED();
1152 return FILE_QUEUE;
1155 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1156 google_apis::DriveApiErrorCode error) {
1157 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1159 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1160 const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1161 logger_->Log(logging::LOG_INFO,
1162 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1163 job->job_info.ToString().c_str(),
1164 DriveApiErrorCodeToString(error).c_str(),
1165 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1166 GetQueueInfo(queue_type).c_str());
1168 base::Callback<void(google_apis::DriveApiErrorCode)> callback =
1169 job->abort_callback;
1170 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1171 NotifyJobDone(job->job_info, error);
1172 job_map_.Remove(job->job_info.job_id);
1173 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1174 base::Bind(callback, error));
1177 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1178 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1179 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1182 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1183 google_apis::DriveApiErrorCode error) {
1184 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1185 FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1186 OnJobDone(job_info, GDataToFileError(error)));
1189 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1190 DCHECK_CURRENTLY_ON(BrowserThread::UI);
1191 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1194 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1195 return QueueTypeToString(type) + " " + queue_[type]->ToString();
1198 // static
1199 std::string JobScheduler::QueueTypeToString(QueueType type) {
1200 switch (type) {
1201 case METADATA_QUEUE:
1202 return "METADATA_QUEUE";
1203 case FILE_QUEUE:
1204 return "FILE_QUEUE";
1205 case NUM_QUEUES:
1206 break; // This value is just a sentinel. Should never be used.
1208 NOTREACHED();
1209 return "";
1212 } // namespace drive