1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
9 #include "base/files/file_util.h"
10 #include "base/metrics/histogram.h"
11 #include "base/prefs/pref_service.h"
12 #include "base/rand_util.h"
13 #include "base/strings/string_number_conversions.h"
14 #include "base/strings/stringprintf.h"
15 #include "base/thread_task_runner_handle.h"
16 #include "chrome/browser/chromeos/drive/drive_pref_names.h"
17 #include "components/drive/event_logger.h"
18 #include "google_apis/drive/drive_api_parser.h"
24 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
25 // throttling or server error. The delay before retrying a job is shared among
26 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
28 // According to the API documentation, kMaxRetryCount should be the same as
29 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
30 // But currently multiplied by 2 to ensure upload related jobs retried for a
31 // sufficient number of times. crbug.com/269918
32 const int kMaxThrottleCount
= 4;
33 const int kMaxRetryCount
= 2 * kMaxThrottleCount
;
34 const size_t kMaxBatchCount
= 20;
35 const size_t kMaxBatchSize
= 1024 * 1024 * 10;
37 // GetDefaultValue returns a value constructed by the default constructor.
38 template<typename T
> struct DefaultValueCreator
{
39 static T
GetDefaultValue() { return T(); }
41 template<typename T
> struct DefaultValueCreator
<const T
&> {
42 static T
GetDefaultValue() { return T(); }
45 // Helper of CreateErrorRunCallback implementation.
47 // - ResultType; the type of the Callback which should be returned by
48 // CreateErrorRunCallback.
49 // - Run(): a static function which takes the original |callback| and |error|,
50 // and runs the |callback|.Run() with the error code and default values
51 // for remaining arguments.
52 template<typename CallbackType
> struct CreateErrorRunCallbackHelper
;
54 // CreateErrorRunCallback with two arguments.
56 struct CreateErrorRunCallbackHelper
<void(google_apis::DriveApiErrorCode
, P1
)> {
58 const base::Callback
<void(google_apis::DriveApiErrorCode
, P1
)>& callback
,
59 google_apis::DriveApiErrorCode error
) {
60 callback
.Run(error
, DefaultValueCreator
<P1
>::GetDefaultValue());
64 // Returns a callback with the tail parameter bound to its default value.
65 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
66 template<typename CallbackType
>
67 base::Callback
<void(google_apis::DriveApiErrorCode
)>
68 CreateErrorRunCallback(const base::Callback
<CallbackType
>& callback
) {
69 return base::Bind(&CreateErrorRunCallbackHelper
<CallbackType
>::Run
, callback
);
72 // Parameter struct for RunUploadNewFile.
73 struct UploadNewFileParams
{
74 std::string parent_resource_id
;
75 base::FilePath local_file_path
;
77 std::string content_type
;
78 UploadNewFileOptions options
;
79 UploadCompletionCallback callback
;
80 google_apis::ProgressCallback progress_callback
;
83 // Helper function to work around the arity limitation of base::Bind.
84 google_apis::CancelCallback
RunUploadNewFile(
85 DriveUploaderInterface
* uploader
,
86 const UploadNewFileParams
& params
) {
87 return uploader
->UploadNewFile(params
.parent_resource_id
,
88 params
.local_file_path
,
93 params
.progress_callback
);
96 // Parameter struct for RunUploadExistingFile.
97 struct UploadExistingFileParams
{
98 std::string resource_id
;
99 base::FilePath local_file_path
;
100 std::string content_type
;
101 UploadExistingFileOptions options
;
103 UploadCompletionCallback callback
;
104 google_apis::ProgressCallback progress_callback
;
107 // Helper function to work around the arity limitation of base::Bind.
108 google_apis::CancelCallback
RunUploadExistingFile(
109 DriveUploaderInterface
* uploader
,
110 const UploadExistingFileParams
& params
) {
111 return uploader
->UploadExistingFile(params
.resource_id
,
112 params
.local_file_path
,
116 params
.progress_callback
);
119 // Parameter struct for RunResumeUploadFile.
120 struct ResumeUploadFileParams
{
121 GURL upload_location
;
122 base::FilePath local_file_path
;
123 std::string content_type
;
124 UploadCompletionCallback callback
;
125 google_apis::ProgressCallback progress_callback
;
128 // Helper function to adjust the return type.
129 google_apis::CancelCallback
RunResumeUploadFile(
130 DriveUploaderInterface
* uploader
,
131 const ResumeUploadFileParams
& params
) {
132 return uploader
->ResumeUploadFile(params
.upload_location
,
133 params
.local_file_path
,
136 params
.progress_callback
);
139 // Collects information about sizes of files copied or moved from or to Drive
140 // Otherwise does nothing. Temporary for crbug.com/229650.
141 void CollectCopyHistogramSample(const std::string
& histogram_name
, int64 size
) {
142 base::HistogramBase
* const counter
=
143 base::Histogram::FactoryGet(histogram_name
,
145 1024 * 1024 /* 1 GB */,
147 base::Histogram::kUmaTargetedHistogramFlag
);
148 counter
->Add(size
/ 1024);
153 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
154 const int JobScheduler::kMaxJobCount
[] = {
159 JobScheduler::JobEntry::JobEntry(JobType type
)
161 context(ClientContext(USER_INITIATED
)),
165 JobScheduler::JobEntry::~JobEntry() {
166 DCHECK(thread_checker_
.CalledOnValidThread());
169 struct JobScheduler::ResumeUploadParams
{
170 base::FilePath drive_file_path
;
171 base::FilePath local_file_path
;
172 std::string content_type
;
175 JobScheduler::JobScheduler(PrefService
* pref_service
,
177 DriveServiceInterface
* drive_service
,
178 base::SequencedTaskRunner
* blocking_task_runner
)
179 : throttle_count_(0),
180 wait_until_(base::Time::Now()),
181 disable_throttling_(false),
183 drive_service_(drive_service
),
184 blocking_task_runner_(blocking_task_runner
),
185 uploader_(new DriveUploader(drive_service
, blocking_task_runner
)),
186 pref_service_(pref_service
),
187 weak_ptr_factory_(this) {
188 for (int i
= 0; i
< NUM_QUEUES
; ++i
)
189 queue_
[i
].reset(new JobQueue(kMaxJobCount
[i
], NUM_CONTEXT_TYPES
,
190 kMaxBatchCount
, kMaxBatchSize
));
192 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
195 JobScheduler::~JobScheduler() {
196 DCHECK(thread_checker_
.CalledOnValidThread());
198 size_t num_queued_jobs
= 0;
199 for (int i
= 0; i
< NUM_QUEUES
; ++i
)
200 num_queued_jobs
+= queue_
[i
]->GetNumberOfJobs();
201 DCHECK_EQ(num_queued_jobs
, job_map_
.size());
203 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
206 std::vector
<JobInfo
> JobScheduler::GetJobInfoList() {
207 std::vector
<JobInfo
> job_info_list
;
208 for (JobIDMap::iterator
iter(&job_map_
); !iter
.IsAtEnd(); iter
.Advance())
209 job_info_list
.push_back(iter
.GetCurrentValue()->job_info
);
210 return job_info_list
;
213 void JobScheduler::AddObserver(JobListObserver
* observer
) {
214 DCHECK(thread_checker_
.CalledOnValidThread());
215 observer_list_
.AddObserver(observer
);
218 void JobScheduler::RemoveObserver(JobListObserver
* observer
) {
219 DCHECK(thread_checker_
.CalledOnValidThread());
220 observer_list_
.RemoveObserver(observer
);
223 void JobScheduler::CancelJob(JobID job_id
) {
224 DCHECK(thread_checker_
.CalledOnValidThread());
226 JobEntry
* job
= job_map_
.Lookup(job_id
);
228 if (job
->job_info
.state
== STATE_RUNNING
) {
229 // If the job is running an HTTP request, cancel it via |cancel_callback|
230 // returned from the request, and wait for termination in the normal
231 // callback handler, OnJobDone.
232 if (!job
->cancel_callback
.is_null())
233 job
->cancel_callback
.Run();
235 AbortNotRunningJob(job
, google_apis::DRIVE_CANCELLED
);
240 void JobScheduler::CancelAllJobs() {
241 DCHECK(thread_checker_
.CalledOnValidThread());
243 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
244 // removable during iteration.
245 for (JobIDMap::iterator
iter(&job_map_
); !iter
.IsAtEnd(); iter
.Advance())
246 CancelJob(iter
.GetCurrentKey());
249 void JobScheduler::GetAboutResource(
250 const google_apis::AboutResourceCallback
& callback
) {
251 DCHECK(thread_checker_
.CalledOnValidThread());
252 DCHECK(!callback
.is_null());
254 JobEntry
* new_job
= CreateNewJob(TYPE_GET_ABOUT_RESOURCE
);
255 new_job
->task
= base::Bind(
256 &DriveServiceInterface::GetAboutResource
,
257 base::Unretained(drive_service_
),
258 base::Bind(&JobScheduler::OnGetAboutResourceJobDone
,
259 weak_ptr_factory_
.GetWeakPtr(),
260 new_job
->job_info
.job_id
,
262 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
266 void JobScheduler::GetAppList(const google_apis::AppListCallback
& callback
) {
267 DCHECK(thread_checker_
.CalledOnValidThread());
268 DCHECK(!callback
.is_null());
270 JobEntry
* new_job
= CreateNewJob(TYPE_GET_APP_LIST
);
271 new_job
->task
= base::Bind(
272 &DriveServiceInterface::GetAppList
,
273 base::Unretained(drive_service_
),
274 base::Bind(&JobScheduler::OnGetAppListJobDone
,
275 weak_ptr_factory_
.GetWeakPtr(),
276 new_job
->job_info
.job_id
,
278 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
282 void JobScheduler::GetAllFileList(
283 const google_apis::FileListCallback
& callback
) {
284 DCHECK(thread_checker_
.CalledOnValidThread());
285 DCHECK(!callback
.is_null());
287 JobEntry
* new_job
= CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST
);
288 new_job
->task
= base::Bind(
289 &DriveServiceInterface::GetAllFileList
,
290 base::Unretained(drive_service_
),
291 base::Bind(&JobScheduler::OnGetFileListJobDone
,
292 weak_ptr_factory_
.GetWeakPtr(),
293 new_job
->job_info
.job_id
,
295 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
299 void JobScheduler::GetFileListInDirectory(
300 const std::string
& directory_resource_id
,
301 const google_apis::FileListCallback
& callback
) {
302 DCHECK(thread_checker_
.CalledOnValidThread());
303 DCHECK(!callback
.is_null());
305 JobEntry
* new_job
= CreateNewJob(
306 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY
);
307 new_job
->task
= base::Bind(
308 &DriveServiceInterface::GetFileListInDirectory
,
309 base::Unretained(drive_service_
),
310 directory_resource_id
,
311 base::Bind(&JobScheduler::OnGetFileListJobDone
,
312 weak_ptr_factory_
.GetWeakPtr(),
313 new_job
->job_info
.job_id
,
315 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
319 void JobScheduler::Search(const std::string
& search_query
,
320 const google_apis::FileListCallback
& callback
) {
321 DCHECK(thread_checker_
.CalledOnValidThread());
322 DCHECK(!callback
.is_null());
324 JobEntry
* new_job
= CreateNewJob(TYPE_SEARCH
);
325 new_job
->task
= base::Bind(
326 &DriveServiceInterface::Search
,
327 base::Unretained(drive_service_
),
329 base::Bind(&JobScheduler::OnGetFileListJobDone
,
330 weak_ptr_factory_
.GetWeakPtr(),
331 new_job
->job_info
.job_id
,
333 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
337 void JobScheduler::GetChangeList(
338 int64 start_changestamp
,
339 const google_apis::ChangeListCallback
& callback
) {
340 DCHECK(thread_checker_
.CalledOnValidThread());
341 DCHECK(!callback
.is_null());
343 JobEntry
* new_job
= CreateNewJob(TYPE_GET_CHANGE_LIST
);
344 new_job
->task
= base::Bind(
345 &DriveServiceInterface::GetChangeList
,
346 base::Unretained(drive_service_
),
348 base::Bind(&JobScheduler::OnGetChangeListJobDone
,
349 weak_ptr_factory_
.GetWeakPtr(),
350 new_job
->job_info
.job_id
,
352 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
356 void JobScheduler::GetRemainingChangeList(
357 const GURL
& next_link
,
358 const google_apis::ChangeListCallback
& callback
) {
359 DCHECK(thread_checker_
.CalledOnValidThread());
360 DCHECK(!callback
.is_null());
362 JobEntry
* new_job
= CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST
);
363 new_job
->task
= base::Bind(
364 &DriveServiceInterface::GetRemainingChangeList
,
365 base::Unretained(drive_service_
),
367 base::Bind(&JobScheduler::OnGetChangeListJobDone
,
368 weak_ptr_factory_
.GetWeakPtr(),
369 new_job
->job_info
.job_id
,
371 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
375 void JobScheduler::GetRemainingFileList(
376 const GURL
& next_link
,
377 const google_apis::FileListCallback
& callback
) {
378 DCHECK(thread_checker_
.CalledOnValidThread());
379 DCHECK(!callback
.is_null());
381 JobEntry
* new_job
= CreateNewJob(TYPE_GET_REMAINING_FILE_LIST
);
382 new_job
->task
= base::Bind(
383 &DriveServiceInterface::GetRemainingFileList
,
384 base::Unretained(drive_service_
),
386 base::Bind(&JobScheduler::OnGetFileListJobDone
,
387 weak_ptr_factory_
.GetWeakPtr(),
388 new_job
->job_info
.job_id
,
390 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
394 void JobScheduler::GetFileResource(
395 const std::string
& resource_id
,
396 const ClientContext
& context
,
397 const google_apis::FileResourceCallback
& callback
) {
398 DCHECK(thread_checker_
.CalledOnValidThread());
399 DCHECK(!callback
.is_null());
401 JobEntry
* new_job
= CreateNewJob(TYPE_GET_RESOURCE_ENTRY
);
402 new_job
->context
= context
;
403 new_job
->task
= base::Bind(
404 &DriveServiceInterface::GetFileResource
,
405 base::Unretained(drive_service_
),
407 base::Bind(&JobScheduler::OnGetFileResourceJobDone
,
408 weak_ptr_factory_
.GetWeakPtr(),
409 new_job
->job_info
.job_id
,
411 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
415 void JobScheduler::GetShareUrl(
416 const std::string
& resource_id
,
417 const GURL
& embed_origin
,
418 const ClientContext
& context
,
419 const google_apis::GetShareUrlCallback
& callback
) {
420 DCHECK(thread_checker_
.CalledOnValidThread());
421 DCHECK(!callback
.is_null());
423 JobEntry
* new_job
= CreateNewJob(TYPE_GET_SHARE_URL
);
424 new_job
->context
= context
;
425 new_job
->task
= base::Bind(
426 &DriveServiceInterface::GetShareUrl
,
427 base::Unretained(drive_service_
),
430 base::Bind(&JobScheduler::OnGetShareUrlJobDone
,
431 weak_ptr_factory_
.GetWeakPtr(),
432 new_job
->job_info
.job_id
,
434 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
438 void JobScheduler::TrashResource(
439 const std::string
& resource_id
,
440 const ClientContext
& context
,
441 const google_apis::EntryActionCallback
& callback
) {
442 DCHECK(thread_checker_
.CalledOnValidThread());
443 DCHECK(!callback
.is_null());
445 JobEntry
* new_job
= CreateNewJob(TYPE_TRASH_RESOURCE
);
446 new_job
->context
= context
;
447 new_job
->task
= base::Bind(
448 &DriveServiceInterface::TrashResource
,
449 base::Unretained(drive_service_
),
451 base::Bind(&JobScheduler::OnEntryActionJobDone
,
452 weak_ptr_factory_
.GetWeakPtr(),
453 new_job
->job_info
.job_id
,
455 new_job
->abort_callback
= callback
;
459 void JobScheduler::CopyResource(
460 const std::string
& resource_id
,
461 const std::string
& parent_resource_id
,
462 const std::string
& new_title
,
463 const base::Time
& last_modified
,
464 const google_apis::FileResourceCallback
& callback
) {
465 DCHECK(thread_checker_
.CalledOnValidThread());
466 DCHECK(!callback
.is_null());
468 JobEntry
* new_job
= CreateNewJob(TYPE_COPY_RESOURCE
);
469 new_job
->task
= base::Bind(
470 &DriveServiceInterface::CopyResource
,
471 base::Unretained(drive_service_
),
476 base::Bind(&JobScheduler::OnGetFileResourceJobDone
,
477 weak_ptr_factory_
.GetWeakPtr(),
478 new_job
->job_info
.job_id
,
480 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
484 void JobScheduler::UpdateResource(
485 const std::string
& resource_id
,
486 const std::string
& parent_resource_id
,
487 const std::string
& new_title
,
488 const base::Time
& last_modified
,
489 const base::Time
& last_viewed_by_me
,
490 const google_apis::drive::Properties
& properties
,
491 const ClientContext
& context
,
492 const google_apis::FileResourceCallback
& callback
) {
493 DCHECK(thread_checker_
.CalledOnValidThread());
494 DCHECK(!callback
.is_null());
496 JobEntry
* new_job
= CreateNewJob(TYPE_UPDATE_RESOURCE
);
497 new_job
->context
= context
;
498 new_job
->task
= base::Bind(&DriveServiceInterface::UpdateResource
,
499 base::Unretained(drive_service_
), resource_id
,
500 parent_resource_id
, new_title
, last_modified
,
501 last_viewed_by_me
, properties
,
502 base::Bind(&JobScheduler::OnGetFileResourceJobDone
,
503 weak_ptr_factory_
.GetWeakPtr(),
504 new_job
->job_info
.job_id
, callback
));
505 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
509 void JobScheduler::AddResourceToDirectory(
510 const std::string
& parent_resource_id
,
511 const std::string
& resource_id
,
512 const google_apis::EntryActionCallback
& callback
) {
513 DCHECK(thread_checker_
.CalledOnValidThread());
514 DCHECK(!callback
.is_null());
516 JobEntry
* new_job
= CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY
);
517 new_job
->task
= base::Bind(
518 &DriveServiceInterface::AddResourceToDirectory
,
519 base::Unretained(drive_service_
),
522 base::Bind(&JobScheduler::OnEntryActionJobDone
,
523 weak_ptr_factory_
.GetWeakPtr(),
524 new_job
->job_info
.job_id
,
526 new_job
->abort_callback
= callback
;
530 void JobScheduler::RemoveResourceFromDirectory(
531 const std::string
& parent_resource_id
,
532 const std::string
& resource_id
,
533 const ClientContext
& context
,
534 const google_apis::EntryActionCallback
& callback
) {
535 DCHECK(thread_checker_
.CalledOnValidThread());
537 JobEntry
* new_job
= CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY
);
538 new_job
->context
= context
;
539 new_job
->task
= base::Bind(
540 &DriveServiceInterface::RemoveResourceFromDirectory
,
541 base::Unretained(drive_service_
),
544 base::Bind(&JobScheduler::OnEntryActionJobDone
,
545 weak_ptr_factory_
.GetWeakPtr(),
546 new_job
->job_info
.job_id
,
548 new_job
->abort_callback
= callback
;
552 void JobScheduler::AddNewDirectory(
553 const std::string
& parent_resource_id
,
554 const std::string
& directory_title
,
555 const AddNewDirectoryOptions
& options
,
556 const ClientContext
& context
,
557 const google_apis::FileResourceCallback
& callback
) {
558 DCHECK(thread_checker_
.CalledOnValidThread());
560 JobEntry
* new_job
= CreateNewJob(TYPE_ADD_NEW_DIRECTORY
);
561 new_job
->context
= context
;
562 new_job
->task
= base::Bind(
563 &DriveServiceInterface::AddNewDirectory
,
564 base::Unretained(drive_service_
),
568 base::Bind(&JobScheduler::OnGetFileResourceJobDone
,
569 weak_ptr_factory_
.GetWeakPtr(),
570 new_job
->job_info
.job_id
,
572 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
576 JobID
JobScheduler::DownloadFile(
577 const base::FilePath
& virtual_path
,
578 int64 expected_file_size
,
579 const base::FilePath
& local_cache_path
,
580 const std::string
& resource_id
,
581 const ClientContext
& context
,
582 const google_apis::DownloadActionCallback
& download_action_callback
,
583 const google_apis::GetContentCallback
& get_content_callback
) {
584 DCHECK(thread_checker_
.CalledOnValidThread());
586 // Temporary histogram for crbug.com/229650.
587 CollectCopyHistogramSample("Drive.DownloadFromDriveFileSize",
590 JobEntry
* new_job
= CreateNewJob(TYPE_DOWNLOAD_FILE
);
591 new_job
->job_info
.file_path
= virtual_path
;
592 new_job
->job_info
.num_total_bytes
= expected_file_size
;
593 new_job
->context
= context
;
594 new_job
->task
= base::Bind(
595 &DriveServiceInterface::DownloadFile
,
596 base::Unretained(drive_service_
),
599 base::Bind(&JobScheduler::OnDownloadActionJobDone
,
600 weak_ptr_factory_
.GetWeakPtr(),
601 new_job
->job_info
.job_id
,
602 download_action_callback
),
603 get_content_callback
,
604 base::Bind(&JobScheduler::UpdateProgress
,
605 weak_ptr_factory_
.GetWeakPtr(),
606 new_job
->job_info
.job_id
));
607 new_job
->abort_callback
= CreateErrorRunCallback(download_action_callback
);
609 return new_job
->job_info
.job_id
;
612 void JobScheduler::UploadNewFile(
613 const std::string
& parent_resource_id
,
614 int64 expected_file_size
,
615 const base::FilePath
& drive_file_path
,
616 const base::FilePath
& local_file_path
,
617 const std::string
& title
,
618 const std::string
& content_type
,
619 const UploadNewFileOptions
& options
,
620 const ClientContext
& context
,
621 const google_apis::FileResourceCallback
& callback
) {
622 DCHECK(thread_checker_
.CalledOnValidThread());
624 JobEntry
* new_job
= CreateNewJob(TYPE_UPLOAD_NEW_FILE
);
625 new_job
->job_info
.file_path
= drive_file_path
;
626 new_job
->job_info
.num_total_bytes
= expected_file_size
;
627 new_job
->context
= context
;
629 // Temporary histogram for crbug.com/229650.
630 CollectCopyHistogramSample("Drive.UploadToDriveFileSize", expected_file_size
);
632 UploadNewFileParams params
;
633 params
.parent_resource_id
= parent_resource_id
;
634 params
.local_file_path
= local_file_path
;
635 params
.title
= title
;
636 params
.content_type
= content_type
;
637 params
.options
= options
;
639 ResumeUploadParams resume_params
;
640 resume_params
.local_file_path
= params
.local_file_path
;
641 resume_params
.content_type
= params
.content_type
;
643 params
.callback
= base::Bind(&JobScheduler::OnUploadCompletionJobDone
,
644 weak_ptr_factory_
.GetWeakPtr(),
645 new_job
->job_info
.job_id
,
648 params
.progress_callback
= base::Bind(&JobScheduler::UpdateProgress
,
649 weak_ptr_factory_
.GetWeakPtr(),
650 new_job
->job_info
.job_id
);
651 new_job
->task
= base::Bind(&RunUploadNewFile
, uploader_
.get(), params
);
652 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
656 void JobScheduler::UploadExistingFile(
657 const std::string
& resource_id
,
658 int64 expected_file_size
,
659 const base::FilePath
& drive_file_path
,
660 const base::FilePath
& local_file_path
,
661 const std::string
& content_type
,
662 const UploadExistingFileOptions
& options
,
663 const ClientContext
& context
,
664 const google_apis::FileResourceCallback
& callback
) {
665 DCHECK(thread_checker_
.CalledOnValidThread());
667 JobEntry
* new_job
= CreateNewJob(TYPE_UPLOAD_EXISTING_FILE
);
668 new_job
->job_info
.file_path
= drive_file_path
;
669 new_job
->job_info
.num_total_bytes
= expected_file_size
;
670 new_job
->context
= context
;
672 // Temporary histogram for crbug.com/229650.
673 CollectCopyHistogramSample("Drive.UploadToDriveFileSize", expected_file_size
);
675 UploadExistingFileParams params
;
676 params
.resource_id
= resource_id
;
677 params
.local_file_path
= local_file_path
;
678 params
.content_type
= content_type
;
679 params
.options
= options
;
681 ResumeUploadParams resume_params
;
682 resume_params
.local_file_path
= params
.local_file_path
;
683 resume_params
.content_type
= params
.content_type
;
685 params
.callback
= base::Bind(&JobScheduler::OnUploadCompletionJobDone
,
686 weak_ptr_factory_
.GetWeakPtr(),
687 new_job
->job_info
.job_id
,
690 params
.progress_callback
= base::Bind(&JobScheduler::UpdateProgress
,
691 weak_ptr_factory_
.GetWeakPtr(),
692 new_job
->job_info
.job_id
);
693 new_job
->task
= base::Bind(&RunUploadExistingFile
, uploader_
.get(), params
);
694 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
698 void JobScheduler::AddPermission(
699 const std::string
& resource_id
,
700 const std::string
& email
,
701 google_apis::drive::PermissionRole role
,
702 const google_apis::EntryActionCallback
& callback
) {
703 DCHECK(thread_checker_
.CalledOnValidThread());
704 DCHECK(!callback
.is_null());
706 JobEntry
* new_job
= CreateNewJob(TYPE_ADD_PERMISSION
);
707 new_job
->task
= base::Bind(&DriveServiceInterface::AddPermission
,
708 base::Unretained(drive_service_
),
712 base::Bind(&JobScheduler::OnEntryActionJobDone
,
713 weak_ptr_factory_
.GetWeakPtr(),
714 new_job
->job_info
.job_id
,
716 new_job
->abort_callback
= callback
;
720 JobScheduler::JobEntry
* JobScheduler::CreateNewJob(JobType type
) {
721 JobEntry
* job
= new JobEntry(type
);
722 job
->job_info
.job_id
= job_map_
.Add(job
); // Takes the ownership of |job|.
726 void JobScheduler::StartJob(JobEntry
* job
) {
727 DCHECK(!job
->task
.is_null());
729 QueueJob(job
->job_info
.job_id
);
730 NotifyJobAdded(job
->job_info
);
731 DoJobLoop(GetJobQueueType(job
->job_info
.job_type
));
734 void JobScheduler::QueueJob(JobID job_id
) {
735 DCHECK(thread_checker_
.CalledOnValidThread());
737 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
739 const JobInfo
& job_info
= job_entry
->job_info
;
741 const QueueType queue_type
= GetJobQueueType(job_info
.job_type
);
742 const bool batchable
= job_info
.job_type
== TYPE_UPLOAD_EXISTING_FILE
||
743 job_info
.job_type
== TYPE_UPLOAD_NEW_FILE
;
744 queue_
[queue_type
]->Push(job_id
, job_entry
->context
.type
, batchable
,
745 job_info
.num_total_bytes
);
747 // Temporary histogram for crbug.com/229650.
748 if (job_info
.job_type
== TYPE_DOWNLOAD_FILE
||
749 job_info
.job_type
== TYPE_UPLOAD_EXISTING_FILE
||
750 job_info
.job_type
== TYPE_UPLOAD_NEW_FILE
) {
751 std::vector
<JobID
> jobs_with_the_same_priority
;
752 queue_
[queue_type
]->GetQueuedJobs(job_entry
->context
.type
,
753 &jobs_with_the_same_priority
);
754 DCHECK(!jobs_with_the_same_priority
.empty());
756 const size_t blocking_jobs_count
= jobs_with_the_same_priority
.size() - 1;
757 UMA_HISTOGRAM_COUNTS_10000("Drive.TransferBlockedOnJobs",
758 blocking_jobs_count
);
761 const std::string retry_prefix
= job_entry
->retry_count
> 0 ?
762 base::StringPrintf(" (retry %d)", job_entry
->retry_count
) : "";
763 logger_
->Log(logging::LOG_INFO
,
764 "Job queued%s: %s - %s",
765 retry_prefix
.c_str(),
766 job_info
.ToString().c_str(),
767 GetQueueInfo(queue_type
).c_str());
770 void JobScheduler::DoJobLoop(QueueType queue_type
) {
771 DCHECK(thread_checker_
.CalledOnValidThread());
773 const int accepted_priority
= GetCurrentAcceptedPriority(queue_type
);
775 // Abort all USER_INITAITED jobs when not accepted.
776 if (accepted_priority
< USER_INITIATED
) {
777 std::vector
<JobID
> jobs
;
778 queue_
[queue_type
]->GetQueuedJobs(USER_INITIATED
, &jobs
);
779 for (size_t i
= 0; i
< jobs
.size(); ++i
) {
780 JobEntry
* job
= job_map_
.Lookup(jobs
[i
]);
782 AbortNotRunningJob(job
, google_apis::DRIVE_NO_CONNECTION
);
786 // Wait when throttled.
787 const base::Time now
= base::Time::Now();
788 if (now
< wait_until_
) {
789 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
791 base::Bind(&JobScheduler::DoJobLoop
,
792 weak_ptr_factory_
.GetWeakPtr(),
798 // Run the job with the highest priority in the queue.
799 std::vector
<JobID
> job_ids
;
800 queue_
[queue_type
]->PopForRun(accepted_priority
, &job_ids
);
804 if (job_ids
.size() > 1)
805 uploader_
->StartBatchProcessing();
807 for (JobID job_id
: job_ids
) {
808 JobEntry
* entry
= job_map_
.Lookup(job_id
);
811 JobInfo
* job_info
= &entry
->job_info
;
812 job_info
->state
= STATE_RUNNING
;
813 job_info
->start_time
= now
;
814 NotifyJobUpdated(*job_info
);
816 entry
->cancel_callback
= entry
->task
.Run();
817 logger_
->Log(logging::LOG_INFO
, "Job started: %s - %s",
818 job_info
->ToString().c_str(),
819 GetQueueInfo(queue_type
).c_str());
822 if (job_ids
.size() > 1)
823 uploader_
->StopBatchProcessing();
828 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type
) {
829 DCHECK(thread_checker_
.CalledOnValidThread());
831 const int kNoJobShouldRun
= -1;
833 // Should stop if Drive was disabled while running the fetch loop.
834 if (pref_service_
->GetBoolean(prefs::kDisableDrive
))
835 return kNoJobShouldRun
;
837 // Should stop if the network is not online.
838 if (net::NetworkChangeNotifier::IsOffline())
839 return kNoJobShouldRun
;
841 // For the file queue, if it is on cellular network, only user initiated
842 // operations are allowed to start.
843 if (queue_type
== FILE_QUEUE
&&
844 pref_service_
->GetBoolean(prefs::kDisableDriveOverCellular
) &&
845 net::NetworkChangeNotifier::IsConnectionCellular(
846 net::NetworkChangeNotifier::GetConnectionType()))
847 return USER_INITIATED
;
849 // Otherwise, every operations including background tasks are allowed.
853 void JobScheduler::UpdateWait() {
854 DCHECK(thread_checker_
.CalledOnValidThread());
856 if (disable_throttling_
|| throttle_count_
== 0)
859 // Exponential backoff: https://developers.google.com/drive/handle-errors.
860 base::TimeDelta delay
=
861 base::TimeDelta::FromSeconds(1 << (throttle_count_
- 1)) +
862 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
863 VLOG(1) << "Throttling for " << delay
.InMillisecondsF();
865 wait_until_
= std::max(wait_until_
, base::Time::Now() + delay
);
868 bool JobScheduler::OnJobDone(JobID job_id
,
869 google_apis::DriveApiErrorCode error
) {
870 DCHECK(thread_checker_
.CalledOnValidThread());
872 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
874 JobInfo
* job_info
= &job_entry
->job_info
;
875 QueueType queue_type
= GetJobQueueType(job_info
->job_type
);
876 queue_
[queue_type
]->MarkFinished(job_id
);
878 const base::TimeDelta elapsed
= base::Time::Now() - job_info
->start_time
;
879 bool success
= (GDataToFileError(error
) == FILE_ERROR_OK
);
880 logger_
->Log(success
? logging::LOG_INFO
: logging::LOG_WARNING
,
881 "Job done: %s => %s (elapsed time: %sms) - %s",
882 job_info
->ToString().c_str(),
883 DriveApiErrorCodeToString(error
).c_str(),
884 base::Int64ToString(elapsed
.InMilliseconds()).c_str(),
885 GetQueueInfo(queue_type
).c_str());
887 // Retry, depending on the error.
888 const bool is_server_error
=
889 error
== google_apis::HTTP_SERVICE_UNAVAILABLE
||
890 error
== google_apis::HTTP_INTERNAL_SERVER_ERROR
;
891 if (is_server_error
) {
892 if (throttle_count_
< kMaxThrottleCount
)
899 const bool should_retry
=
900 is_server_error
&& job_entry
->retry_count
< kMaxRetryCount
;
902 job_entry
->cancel_callback
.Reset();
903 job_info
->state
= STATE_RETRY
;
904 NotifyJobUpdated(*job_info
);
906 ++job_entry
->retry_count
;
911 NotifyJobDone(*job_info
, error
);
912 // The job has finished, no retry will happen in the scheduler. Now we can
913 // remove the job info from the map.
914 job_map_
.Remove(job_id
);
917 // Post a task to continue the job loop. This allows us to finish handling
918 // the current job before starting the next one.
919 base::ThreadTaskRunnerHandle::Get()->PostTask(
921 base::Bind(&JobScheduler::DoJobLoop
,
922 weak_ptr_factory_
.GetWeakPtr(),
924 return !should_retry
;
927 void JobScheduler::OnGetFileListJobDone(
929 const google_apis::FileListCallback
& callback
,
930 google_apis::DriveApiErrorCode error
,
931 scoped_ptr
<google_apis::FileList
> file_list
) {
932 DCHECK(thread_checker_
.CalledOnValidThread());
933 DCHECK(!callback
.is_null());
935 if (OnJobDone(job_id
, error
))
936 callback
.Run(error
, file_list
.Pass());
939 void JobScheduler::OnGetChangeListJobDone(
941 const google_apis::ChangeListCallback
& callback
,
942 google_apis::DriveApiErrorCode error
,
943 scoped_ptr
<google_apis::ChangeList
> change_list
) {
944 DCHECK(thread_checker_
.CalledOnValidThread());
945 DCHECK(!callback
.is_null());
947 if (OnJobDone(job_id
, error
))
948 callback
.Run(error
, change_list
.Pass());
951 void JobScheduler::OnGetFileResourceJobDone(
953 const google_apis::FileResourceCallback
& callback
,
954 google_apis::DriveApiErrorCode error
,
955 scoped_ptr
<google_apis::FileResource
> entry
) {
956 DCHECK(thread_checker_
.CalledOnValidThread());
957 DCHECK(!callback
.is_null());
959 if (OnJobDone(job_id
, error
))
960 callback
.Run(error
, entry
.Pass());
963 void JobScheduler::OnGetAboutResourceJobDone(
965 const google_apis::AboutResourceCallback
& callback
,
966 google_apis::DriveApiErrorCode error
,
967 scoped_ptr
<google_apis::AboutResource
> about_resource
) {
968 DCHECK(thread_checker_
.CalledOnValidThread());
969 DCHECK(!callback
.is_null());
971 if (OnJobDone(job_id
, error
))
972 callback
.Run(error
, about_resource
.Pass());
975 void JobScheduler::OnGetShareUrlJobDone(
977 const google_apis::GetShareUrlCallback
& callback
,
978 google_apis::DriveApiErrorCode error
,
979 const GURL
& share_url
) {
980 DCHECK(thread_checker_
.CalledOnValidThread());
981 DCHECK(!callback
.is_null());
983 if (OnJobDone(job_id
, error
))
984 callback
.Run(error
, share_url
);
987 void JobScheduler::OnGetAppListJobDone(
989 const google_apis::AppListCallback
& callback
,
990 google_apis::DriveApiErrorCode error
,
991 scoped_ptr
<google_apis::AppList
> app_list
) {
992 DCHECK(thread_checker_
.CalledOnValidThread());
993 DCHECK(!callback
.is_null());
995 if (OnJobDone(job_id
, error
))
996 callback
.Run(error
, app_list
.Pass());
999 void JobScheduler::OnEntryActionJobDone(
1001 const google_apis::EntryActionCallback
& callback
,
1002 google_apis::DriveApiErrorCode error
) {
1003 DCHECK(thread_checker_
.CalledOnValidThread());
1004 DCHECK(!callback
.is_null());
1006 if (OnJobDone(job_id
, error
))
1007 callback
.Run(error
);
1010 void JobScheduler::OnDownloadActionJobDone(
1012 const google_apis::DownloadActionCallback
& callback
,
1013 google_apis::DriveApiErrorCode error
,
1014 const base::FilePath
& temp_file
) {
1015 DCHECK(thread_checker_
.CalledOnValidThread());
1016 DCHECK(!callback
.is_null());
1018 if (OnJobDone(job_id
, error
))
1019 callback
.Run(error
, temp_file
);
1022 void JobScheduler::OnUploadCompletionJobDone(
1024 const ResumeUploadParams
& resume_params
,
1025 const google_apis::FileResourceCallback
& callback
,
1026 google_apis::DriveApiErrorCode error
,
1027 const GURL
& upload_location
,
1028 scoped_ptr
<google_apis::FileResource
> entry
) {
1029 DCHECK(thread_checker_
.CalledOnValidThread());
1030 DCHECK(!callback
.is_null());
1032 if (!upload_location
.is_empty()) {
1033 // If upload_location is available, update the task to resume the
1034 // upload process from the terminated point.
1035 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1036 // so OnJobDone called below will be in charge to re-queue the job.
1037 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
1040 ResumeUploadFileParams params
;
1041 params
.upload_location
= upload_location
;
1042 params
.local_file_path
= resume_params
.local_file_path
;
1043 params
.content_type
= resume_params
.content_type
;
1044 params
.callback
= base::Bind(&JobScheduler::OnResumeUploadFileDone
,
1045 weak_ptr_factory_
.GetWeakPtr(),
1049 params
.progress_callback
= base::Bind(&JobScheduler::UpdateProgress
,
1050 weak_ptr_factory_
.GetWeakPtr(),
1052 job_entry
->task
= base::Bind(&RunResumeUploadFile
, uploader_
.get(), params
);
1055 if (OnJobDone(job_id
, error
))
1056 callback
.Run(error
, entry
.Pass());
1059 void JobScheduler::OnResumeUploadFileDone(
1061 const base::Callback
<google_apis::CancelCallback()>& original_task
,
1062 const google_apis::FileResourceCallback
& callback
,
1063 google_apis::DriveApiErrorCode error
,
1064 const GURL
& upload_location
,
1065 scoped_ptr
<google_apis::FileResource
> entry
) {
1066 DCHECK(thread_checker_
.CalledOnValidThread());
1067 DCHECK(!original_task
.is_null());
1068 DCHECK(!callback
.is_null());
1070 if (upload_location
.is_empty()) {
1071 // If upload_location is not available, we should discard it and stop trying
1072 // to resume. Restore the original task.
1073 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
1075 job_entry
->task
= original_task
;
1078 if (OnJobDone(job_id
, error
))
1079 callback
.Run(error
, entry
.Pass());
1082 void JobScheduler::UpdateProgress(JobID job_id
, int64 progress
, int64 total
) {
1083 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
1086 job_entry
->job_info
.num_completed_bytes
= progress
;
1088 job_entry
->job_info
.num_total_bytes
= total
;
1089 NotifyJobUpdated(job_entry
->job_info
);
1092 void JobScheduler::OnConnectionTypeChanged(
1093 net::NetworkChangeNotifier::ConnectionType type
) {
1094 DCHECK(thread_checker_
.CalledOnValidThread());
1096 // Resume the job loop.
1097 // Note that we don't need to check the network connection status as it will
1098 // be checked in GetCurrentAcceptedPriority().
1099 for (int i
= METADATA_QUEUE
; i
< NUM_QUEUES
; ++i
)
1100 DoJobLoop(static_cast<QueueType
>(i
));
1103 JobScheduler::QueueType
JobScheduler::GetJobQueueType(JobType type
) {
1105 case TYPE_GET_ABOUT_RESOURCE
:
1106 case TYPE_GET_APP_LIST
:
1107 case TYPE_GET_ALL_RESOURCE_LIST
:
1108 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY
:
1110 case TYPE_GET_CHANGE_LIST
:
1111 case TYPE_GET_REMAINING_CHANGE_LIST
:
1112 case TYPE_GET_REMAINING_FILE_LIST
:
1113 case TYPE_GET_RESOURCE_ENTRY
:
1114 case TYPE_GET_SHARE_URL
:
1115 case TYPE_TRASH_RESOURCE
:
1116 case TYPE_COPY_RESOURCE
:
1117 case TYPE_UPDATE_RESOURCE
:
1118 case TYPE_ADD_RESOURCE_TO_DIRECTORY
:
1119 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY
:
1120 case TYPE_ADD_NEW_DIRECTORY
:
1121 case TYPE_ADD_PERMISSION
:
1122 return METADATA_QUEUE
;
1124 case TYPE_DOWNLOAD_FILE
:
1125 case TYPE_UPLOAD_NEW_FILE
:
1126 case TYPE_UPLOAD_EXISTING_FILE
:
1133 void JobScheduler::AbortNotRunningJob(JobEntry
* job
,
1134 google_apis::DriveApiErrorCode error
) {
1135 DCHECK(thread_checker_
.CalledOnValidThread());
1137 const base::TimeDelta elapsed
= base::Time::Now() - job
->job_info
.start_time
;
1138 const QueueType queue_type
= GetJobQueueType(job
->job_info
.job_type
);
1139 logger_
->Log(logging::LOG_INFO
,
1140 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1141 job
->job_info
.ToString().c_str(),
1142 DriveApiErrorCodeToString(error
).c_str(),
1143 base::Int64ToString(elapsed
.InMilliseconds()).c_str(),
1144 GetQueueInfo(queue_type
).c_str());
1146 base::Callback
<void(google_apis::DriveApiErrorCode
)> callback
=
1147 job
->abort_callback
;
1148 queue_
[GetJobQueueType(job
->job_info
.job_type
)]->Remove(job
->job_info
.job_id
);
1149 NotifyJobDone(job
->job_info
, error
);
1150 job_map_
.Remove(job
->job_info
.job_id
);
1151 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE
,
1152 base::Bind(callback
, error
));
1155 void JobScheduler::NotifyJobAdded(const JobInfo
& job_info
) {
1156 DCHECK(thread_checker_
.CalledOnValidThread());
1157 FOR_EACH_OBSERVER(JobListObserver
, observer_list_
, OnJobAdded(job_info
));
1160 void JobScheduler::NotifyJobDone(const JobInfo
& job_info
,
1161 google_apis::DriveApiErrorCode error
) {
1162 DCHECK(thread_checker_
.CalledOnValidThread());
1163 FOR_EACH_OBSERVER(JobListObserver
, observer_list_
,
1164 OnJobDone(job_info
, GDataToFileError(error
)));
1167 void JobScheduler::NotifyJobUpdated(const JobInfo
& job_info
) {
1168 DCHECK(thread_checker_
.CalledOnValidThread());
1169 FOR_EACH_OBSERVER(JobListObserver
, observer_list_
, OnJobUpdated(job_info
));
1172 std::string
JobScheduler::GetQueueInfo(QueueType type
) const {
1173 return QueueTypeToString(type
) + " " + queue_
[type
]->ToString();
1177 std::string
JobScheduler::QueueTypeToString(QueueType type
) {
1179 case METADATA_QUEUE
:
1180 return "METADATA_QUEUE";
1182 return "FILE_QUEUE";
1184 break; // This value is just a sentinel. Should never be used.
1190 } // namespace drive