1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
7 #include "base/files/file_util.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h"
10 #include "base/prefs/pref_service.h"
11 #include "base/rand_util.h"
12 #include "base/strings/string_number_conversions.h"
13 #include "base/strings/stringprintf.h"
14 #include "chrome/browser/drive/event_logger.h"
15 #include "chrome/common/pref_names.h"
16 #include "content/public/browser/browser_thread.h"
17 #include "google_apis/drive/drive_api_parser.h"
19 using content::BrowserThread
;
25 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
26 // throttling or server error. The delay before retrying a job is shared among
27 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
29 // According to the API documentation, kMaxRetryCount should be the same as
30 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
31 // But currently multiplied by 2 to ensure upload related jobs retried for a
32 // sufficient number of times. crbug.com/269918
33 const int kMaxThrottleCount
= 4;
34 const int kMaxRetryCount
= 2 * kMaxThrottleCount
;
36 // GetDefaultValue returns a value constructed by the default constructor.
37 template<typename T
> struct DefaultValueCreator
{
38 static T
GetDefaultValue() { return T(); }
40 template<typename T
> struct DefaultValueCreator
<const T
&> {
41 static T
GetDefaultValue() { return T(); }
44 // Helper of CreateErrorRunCallback implementation.
46 // - ResultType; the type of the Callback which should be returned by
47 // CreateErrorRunCallback.
48 // - Run(): a static function which takes the original |callback| and |error|,
49 // and runs the |callback|.Run() with the error code and default values
50 // for remaining arguments.
51 template<typename CallbackType
> struct CreateErrorRunCallbackHelper
;
53 // CreateErrorRunCallback with two arguments.
55 struct CreateErrorRunCallbackHelper
<void(google_apis::DriveApiErrorCode
, P1
)> {
57 const base::Callback
<void(google_apis::DriveApiErrorCode
, P1
)>& callback
,
58 google_apis::DriveApiErrorCode error
) {
59 callback
.Run(error
, DefaultValueCreator
<P1
>::GetDefaultValue());
63 // Returns a callback with the tail parameter bound to its default value.
64 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
65 template<typename CallbackType
>
66 base::Callback
<void(google_apis::DriveApiErrorCode
)>
67 CreateErrorRunCallback(const base::Callback
<CallbackType
>& callback
) {
68 return base::Bind(&CreateErrorRunCallbackHelper
<CallbackType
>::Run
, callback
);
71 // Parameter struct for RunUploadNewFile.
72 struct UploadNewFileParams
{
73 std::string parent_resource_id
;
74 base::FilePath local_file_path
;
76 std::string content_type
;
77 UploadNewFileOptions options
;
78 UploadCompletionCallback callback
;
79 google_apis::ProgressCallback progress_callback
;
82 // Helper function to work around the arity limitation of base::Bind.
83 google_apis::CancelCallback
RunUploadNewFile(
84 DriveUploaderInterface
* uploader
,
85 const UploadNewFileParams
& params
) {
86 return uploader
->UploadNewFile(params
.parent_resource_id
,
87 params
.local_file_path
,
92 params
.progress_callback
);
95 // Parameter struct for RunUploadExistingFile.
96 struct UploadExistingFileParams
{
97 std::string resource_id
;
98 base::FilePath local_file_path
;
99 std::string content_type
;
100 UploadExistingFileOptions options
;
102 UploadCompletionCallback callback
;
103 google_apis::ProgressCallback progress_callback
;
106 // Helper function to work around the arity limitation of base::Bind.
107 google_apis::CancelCallback
RunUploadExistingFile(
108 DriveUploaderInterface
* uploader
,
109 const UploadExistingFileParams
& params
) {
110 return uploader
->UploadExistingFile(params
.resource_id
,
111 params
.local_file_path
,
115 params
.progress_callback
);
118 // Parameter struct for RunResumeUploadFile.
119 struct ResumeUploadFileParams
{
120 GURL upload_location
;
121 base::FilePath local_file_path
;
122 std::string content_type
;
123 UploadCompletionCallback callback
;
124 google_apis::ProgressCallback progress_callback
;
127 // Helper function to adjust the return type.
128 google_apis::CancelCallback
RunResumeUploadFile(
129 DriveUploaderInterface
* uploader
,
130 const ResumeUploadFileParams
& params
) {
131 return uploader
->ResumeUploadFile(params
.upload_location
,
132 params
.local_file_path
,
135 params
.progress_callback
);
138 // Collects information about sizes of files copied or moved from or to Drive
139 // Otherwise does nothing. Temporary for crbug.com/229650.
140 void CollectCopyHistogramSample(const std::string
& histogram_name
, int64 size
) {
141 base::HistogramBase
* const counter
=
142 base::Histogram::FactoryGet(histogram_name
,
144 1024 * 1024 /* 1 GB */,
146 base::Histogram::kUmaTargetedHistogramFlag
);
147 counter
->Add(size
/ 1024);
150 // Obtains file size to be uploaded for setting total bytes of JobInfo.
151 void GetFileSizeForJob(base::SequencedTaskRunner
* blocking_task_runner
,
152 const base::FilePath
& local_file_path
,
153 const base::Callback
<void(int64
* size
)>& callback
) {
154 int64
* const size
= new int64
;
156 blocking_task_runner
->PostTaskAndReply(
157 FROM_HERE
, base::Bind(base::IgnoreResult(&base::GetFileSize
),
158 local_file_path
, base::Unretained(size
)),
159 base::Bind(callback
, base::Owned(size
)));
164 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
165 const int JobScheduler::kMaxJobCount
[] = {
170 JobScheduler::JobEntry::JobEntry(JobType type
)
172 context(ClientContext(USER_INITIATED
)),
174 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
177 JobScheduler::JobEntry::~JobEntry() {
178 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
181 struct JobScheduler::ResumeUploadParams
{
182 base::FilePath drive_file_path
;
183 base::FilePath local_file_path
;
184 std::string content_type
;
187 JobScheduler::JobScheduler(PrefService
* pref_service
,
189 DriveServiceInterface
* drive_service
,
190 base::SequencedTaskRunner
* blocking_task_runner
)
191 : throttle_count_(0),
192 wait_until_(base::Time::Now()),
193 disable_throttling_(false),
195 drive_service_(drive_service
),
196 blocking_task_runner_(blocking_task_runner
),
197 uploader_(new DriveUploader(drive_service
, blocking_task_runner
)),
198 pref_service_(pref_service
),
199 weak_ptr_factory_(this) {
200 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
202 for (int i
= 0; i
< NUM_QUEUES
; ++i
)
203 queue_
[i
].reset(new JobQueue(kMaxJobCount
[i
], NUM_CONTEXT_TYPES
));
205 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
208 JobScheduler::~JobScheduler() {
209 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
211 size_t num_queued_jobs
= 0;
212 for (int i
= 0; i
< NUM_QUEUES
; ++i
)
213 num_queued_jobs
+= queue_
[i
]->GetNumberOfJobs();
214 DCHECK_EQ(num_queued_jobs
, job_map_
.size());
216 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
219 std::vector
<JobInfo
> JobScheduler::GetJobInfoList() {
220 std::vector
<JobInfo
> job_info_list
;
221 for (JobIDMap::iterator
iter(&job_map_
); !iter
.IsAtEnd(); iter
.Advance())
222 job_info_list
.push_back(iter
.GetCurrentValue()->job_info
);
223 return job_info_list
;
226 void JobScheduler::AddObserver(JobListObserver
* observer
) {
227 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
228 observer_list_
.AddObserver(observer
);
231 void JobScheduler::RemoveObserver(JobListObserver
* observer
) {
232 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
233 observer_list_
.RemoveObserver(observer
);
236 void JobScheduler::CancelJob(JobID job_id
) {
237 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
239 JobEntry
* job
= job_map_
.Lookup(job_id
);
241 if (job
->job_info
.state
== STATE_RUNNING
) {
242 // If the job is running an HTTP request, cancel it via |cancel_callback|
243 // returned from the request, and wait for termination in the normal
244 // callback handler, OnJobDone.
245 if (!job
->cancel_callback
.is_null())
246 job
->cancel_callback
.Run();
248 AbortNotRunningJob(job
, google_apis::DRIVE_CANCELLED
);
253 void JobScheduler::CancelAllJobs() {
254 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
256 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
257 // removable during iteration.
258 for (JobIDMap::iterator
iter(&job_map_
); !iter
.IsAtEnd(); iter
.Advance())
259 CancelJob(iter
.GetCurrentKey());
262 void JobScheduler::GetAboutResource(
263 const google_apis::AboutResourceCallback
& callback
) {
264 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
265 DCHECK(!callback
.is_null());
267 JobEntry
* new_job
= CreateNewJob(TYPE_GET_ABOUT_RESOURCE
);
268 new_job
->task
= base::Bind(
269 &DriveServiceInterface::GetAboutResource
,
270 base::Unretained(drive_service_
),
271 base::Bind(&JobScheduler::OnGetAboutResourceJobDone
,
272 weak_ptr_factory_
.GetWeakPtr(),
273 new_job
->job_info
.job_id
,
275 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
279 void JobScheduler::GetAppList(const google_apis::AppListCallback
& callback
) {
280 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
281 DCHECK(!callback
.is_null());
283 JobEntry
* new_job
= CreateNewJob(TYPE_GET_APP_LIST
);
284 new_job
->task
= base::Bind(
285 &DriveServiceInterface::GetAppList
,
286 base::Unretained(drive_service_
),
287 base::Bind(&JobScheduler::OnGetAppListJobDone
,
288 weak_ptr_factory_
.GetWeakPtr(),
289 new_job
->job_info
.job_id
,
291 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
295 void JobScheduler::GetAllFileList(
296 const google_apis::FileListCallback
& callback
) {
297 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
298 DCHECK(!callback
.is_null());
300 JobEntry
* new_job
= CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST
);
301 new_job
->task
= base::Bind(
302 &DriveServiceInterface::GetAllFileList
,
303 base::Unretained(drive_service_
),
304 base::Bind(&JobScheduler::OnGetFileListJobDone
,
305 weak_ptr_factory_
.GetWeakPtr(),
306 new_job
->job_info
.job_id
,
308 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
312 void JobScheduler::GetFileListInDirectory(
313 const std::string
& directory_resource_id
,
314 const google_apis::FileListCallback
& callback
) {
315 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
316 DCHECK(!callback
.is_null());
318 JobEntry
* new_job
= CreateNewJob(
319 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY
);
320 new_job
->task
= base::Bind(
321 &DriveServiceInterface::GetFileListInDirectory
,
322 base::Unretained(drive_service_
),
323 directory_resource_id
,
324 base::Bind(&JobScheduler::OnGetFileListJobDone
,
325 weak_ptr_factory_
.GetWeakPtr(),
326 new_job
->job_info
.job_id
,
328 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
332 void JobScheduler::Search(const std::string
& search_query
,
333 const google_apis::FileListCallback
& callback
) {
334 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
335 DCHECK(!callback
.is_null());
337 JobEntry
* new_job
= CreateNewJob(TYPE_SEARCH
);
338 new_job
->task
= base::Bind(
339 &DriveServiceInterface::Search
,
340 base::Unretained(drive_service_
),
342 base::Bind(&JobScheduler::OnGetFileListJobDone
,
343 weak_ptr_factory_
.GetWeakPtr(),
344 new_job
->job_info
.job_id
,
346 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
350 void JobScheduler::GetChangeList(
351 int64 start_changestamp
,
352 const google_apis::ChangeListCallback
& callback
) {
353 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
354 DCHECK(!callback
.is_null());
356 JobEntry
* new_job
= CreateNewJob(TYPE_GET_CHANGE_LIST
);
357 new_job
->task
= base::Bind(
358 &DriveServiceInterface::GetChangeList
,
359 base::Unretained(drive_service_
),
361 base::Bind(&JobScheduler::OnGetChangeListJobDone
,
362 weak_ptr_factory_
.GetWeakPtr(),
363 new_job
->job_info
.job_id
,
365 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
369 void JobScheduler::GetRemainingChangeList(
370 const GURL
& next_link
,
371 const google_apis::ChangeListCallback
& callback
) {
372 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
373 DCHECK(!callback
.is_null());
375 JobEntry
* new_job
= CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST
);
376 new_job
->task
= base::Bind(
377 &DriveServiceInterface::GetRemainingChangeList
,
378 base::Unretained(drive_service_
),
380 base::Bind(&JobScheduler::OnGetChangeListJobDone
,
381 weak_ptr_factory_
.GetWeakPtr(),
382 new_job
->job_info
.job_id
,
384 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
388 void JobScheduler::GetRemainingFileList(
389 const GURL
& next_link
,
390 const google_apis::FileListCallback
& callback
) {
391 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
392 DCHECK(!callback
.is_null());
394 JobEntry
* new_job
= CreateNewJob(TYPE_GET_REMAINING_FILE_LIST
);
395 new_job
->task
= base::Bind(
396 &DriveServiceInterface::GetRemainingFileList
,
397 base::Unretained(drive_service_
),
399 base::Bind(&JobScheduler::OnGetFileListJobDone
,
400 weak_ptr_factory_
.GetWeakPtr(),
401 new_job
->job_info
.job_id
,
403 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
407 void JobScheduler::GetFileResource(
408 const std::string
& resource_id
,
409 const ClientContext
& context
,
410 const google_apis::FileResourceCallback
& callback
) {
411 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
412 DCHECK(!callback
.is_null());
414 JobEntry
* new_job
= CreateNewJob(TYPE_GET_RESOURCE_ENTRY
);
415 new_job
->context
= context
;
416 new_job
->task
= base::Bind(
417 &DriveServiceInterface::GetFileResource
,
418 base::Unretained(drive_service_
),
420 base::Bind(&JobScheduler::OnGetFileResourceJobDone
,
421 weak_ptr_factory_
.GetWeakPtr(),
422 new_job
->job_info
.job_id
,
424 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
428 void JobScheduler::GetShareUrl(
429 const std::string
& resource_id
,
430 const GURL
& embed_origin
,
431 const ClientContext
& context
,
432 const google_apis::GetShareUrlCallback
& callback
) {
433 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
434 DCHECK(!callback
.is_null());
436 JobEntry
* new_job
= CreateNewJob(TYPE_GET_SHARE_URL
);
437 new_job
->context
= context
;
438 new_job
->task
= base::Bind(
439 &DriveServiceInterface::GetShareUrl
,
440 base::Unretained(drive_service_
),
443 base::Bind(&JobScheduler::OnGetShareUrlJobDone
,
444 weak_ptr_factory_
.GetWeakPtr(),
445 new_job
->job_info
.job_id
,
447 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
451 void JobScheduler::TrashResource(
452 const std::string
& resource_id
,
453 const ClientContext
& context
,
454 const google_apis::EntryActionCallback
& callback
) {
455 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
456 DCHECK(!callback
.is_null());
458 JobEntry
* new_job
= CreateNewJob(TYPE_TRASH_RESOURCE
);
459 new_job
->context
= context
;
460 new_job
->task
= base::Bind(
461 &DriveServiceInterface::TrashResource
,
462 base::Unretained(drive_service_
),
464 base::Bind(&JobScheduler::OnEntryActionJobDone
,
465 weak_ptr_factory_
.GetWeakPtr(),
466 new_job
->job_info
.job_id
,
468 new_job
->abort_callback
= callback
;
472 void JobScheduler::CopyResource(
473 const std::string
& resource_id
,
474 const std::string
& parent_resource_id
,
475 const std::string
& new_title
,
476 const base::Time
& last_modified
,
477 const google_apis::FileResourceCallback
& callback
) {
478 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
479 DCHECK(!callback
.is_null());
481 JobEntry
* new_job
= CreateNewJob(TYPE_COPY_RESOURCE
);
482 new_job
->task
= base::Bind(
483 &DriveServiceInterface::CopyResource
,
484 base::Unretained(drive_service_
),
489 base::Bind(&JobScheduler::OnGetFileResourceJobDone
,
490 weak_ptr_factory_
.GetWeakPtr(),
491 new_job
->job_info
.job_id
,
493 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
497 void JobScheduler::UpdateResource(
498 const std::string
& resource_id
,
499 const std::string
& parent_resource_id
,
500 const std::string
& new_title
,
501 const base::Time
& last_modified
,
502 const base::Time
& last_viewed_by_me
,
503 const google_apis::drive::Properties
& properties
,
504 const ClientContext
& context
,
505 const google_apis::FileResourceCallback
& callback
) {
506 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
507 DCHECK(!callback
.is_null());
509 JobEntry
* new_job
= CreateNewJob(TYPE_UPDATE_RESOURCE
);
510 new_job
->context
= context
;
511 new_job
->task
= base::Bind(&DriveServiceInterface::UpdateResource
,
512 base::Unretained(drive_service_
), resource_id
,
513 parent_resource_id
, new_title
, last_modified
,
514 last_viewed_by_me
, properties
,
515 base::Bind(&JobScheduler::OnGetFileResourceJobDone
,
516 weak_ptr_factory_
.GetWeakPtr(),
517 new_job
->job_info
.job_id
, callback
));
518 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
522 void JobScheduler::AddResourceToDirectory(
523 const std::string
& parent_resource_id
,
524 const std::string
& resource_id
,
525 const google_apis::EntryActionCallback
& callback
) {
526 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
527 DCHECK(!callback
.is_null());
529 JobEntry
* new_job
= CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY
);
530 new_job
->task
= base::Bind(
531 &DriveServiceInterface::AddResourceToDirectory
,
532 base::Unretained(drive_service_
),
535 base::Bind(&JobScheduler::OnEntryActionJobDone
,
536 weak_ptr_factory_
.GetWeakPtr(),
537 new_job
->job_info
.job_id
,
539 new_job
->abort_callback
= callback
;
543 void JobScheduler::RemoveResourceFromDirectory(
544 const std::string
& parent_resource_id
,
545 const std::string
& resource_id
,
546 const ClientContext
& context
,
547 const google_apis::EntryActionCallback
& callback
) {
548 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
550 JobEntry
* new_job
= CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY
);
551 new_job
->context
= context
;
552 new_job
->task
= base::Bind(
553 &DriveServiceInterface::RemoveResourceFromDirectory
,
554 base::Unretained(drive_service_
),
557 base::Bind(&JobScheduler::OnEntryActionJobDone
,
558 weak_ptr_factory_
.GetWeakPtr(),
559 new_job
->job_info
.job_id
,
561 new_job
->abort_callback
= callback
;
565 void JobScheduler::AddNewDirectory(
566 const std::string
& parent_resource_id
,
567 const std::string
& directory_title
,
568 const AddNewDirectoryOptions
& options
,
569 const ClientContext
& context
,
570 const google_apis::FileResourceCallback
& callback
) {
571 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
573 JobEntry
* new_job
= CreateNewJob(TYPE_ADD_NEW_DIRECTORY
);
574 new_job
->context
= context
;
575 new_job
->task
= base::Bind(
576 &DriveServiceInterface::AddNewDirectory
,
577 base::Unretained(drive_service_
),
581 base::Bind(&JobScheduler::OnGetFileResourceJobDone
,
582 weak_ptr_factory_
.GetWeakPtr(),
583 new_job
->job_info
.job_id
,
585 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
589 JobID
JobScheduler::DownloadFile(
590 const base::FilePath
& virtual_path
,
591 int64 expected_file_size
,
592 const base::FilePath
& local_cache_path
,
593 const std::string
& resource_id
,
594 const ClientContext
& context
,
595 const google_apis::DownloadActionCallback
& download_action_callback
,
596 const google_apis::GetContentCallback
& get_content_callback
) {
597 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
599 // Temporary histogram for crbug.com/229650.
600 CollectCopyHistogramSample("Drive.DownloadFromDriveFileSize",
603 JobEntry
* new_job
= CreateNewJob(TYPE_DOWNLOAD_FILE
);
604 new_job
->job_info
.file_path
= virtual_path
;
605 new_job
->job_info
.num_total_bytes
= expected_file_size
;
606 new_job
->context
= context
;
607 new_job
->task
= base::Bind(
608 &DriveServiceInterface::DownloadFile
,
609 base::Unretained(drive_service_
),
612 base::Bind(&JobScheduler::OnDownloadActionJobDone
,
613 weak_ptr_factory_
.GetWeakPtr(),
614 new_job
->job_info
.job_id
,
615 download_action_callback
),
616 get_content_callback
,
617 base::Bind(&JobScheduler::UpdateProgress
,
618 weak_ptr_factory_
.GetWeakPtr(),
619 new_job
->job_info
.job_id
));
620 new_job
->abort_callback
= CreateErrorRunCallback(download_action_callback
);
622 return new_job
->job_info
.job_id
;
625 void JobScheduler::UploadNewFile(
626 const std::string
& parent_resource_id
,
627 const base::FilePath
& drive_file_path
,
628 const base::FilePath
& local_file_path
,
629 const std::string
& title
,
630 const std::string
& content_type
,
631 const UploadNewFileOptions
& options
,
632 const ClientContext
& context
,
633 const google_apis::FileResourceCallback
& callback
) {
634 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
636 JobEntry
* new_job
= CreateNewJob(TYPE_UPLOAD_NEW_FILE
);
637 new_job
->job_info
.file_path
= drive_file_path
;
638 new_job
->context
= context
;
641 blocking_task_runner_
, local_file_path
,
642 base::Bind(&JobScheduler::OnGotFileSizeForJob
,
643 weak_ptr_factory_
.GetWeakPtr(), new_job
->job_info
.job_id
,
644 "Drive.UploadToDriveFileSize"));
646 UploadNewFileParams params
;
647 params
.parent_resource_id
= parent_resource_id
;
648 params
.local_file_path
= local_file_path
;
649 params
.title
= title
;
650 params
.content_type
= content_type
;
651 params
.options
= options
;
653 ResumeUploadParams resume_params
;
654 resume_params
.local_file_path
= params
.local_file_path
;
655 resume_params
.content_type
= params
.content_type
;
657 params
.callback
= base::Bind(&JobScheduler::OnUploadCompletionJobDone
,
658 weak_ptr_factory_
.GetWeakPtr(),
659 new_job
->job_info
.job_id
,
662 params
.progress_callback
= base::Bind(&JobScheduler::UpdateProgress
,
663 weak_ptr_factory_
.GetWeakPtr(),
664 new_job
->job_info
.job_id
);
665 new_job
->task
= base::Bind(&RunUploadNewFile
, uploader_
.get(), params
);
666 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
670 void JobScheduler::UploadExistingFile(
671 const std::string
& resource_id
,
672 const base::FilePath
& drive_file_path
,
673 const base::FilePath
& local_file_path
,
674 const std::string
& content_type
,
675 const UploadExistingFileOptions
& options
,
676 const ClientContext
& context
,
677 const google_apis::FileResourceCallback
& callback
) {
678 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
680 JobEntry
* new_job
= CreateNewJob(TYPE_UPLOAD_EXISTING_FILE
);
681 new_job
->job_info
.file_path
= drive_file_path
;
682 new_job
->context
= context
;
685 blocking_task_runner_
, local_file_path
,
686 base::Bind(&JobScheduler::OnGotFileSizeForJob
,
687 weak_ptr_factory_
.GetWeakPtr(), new_job
->job_info
.job_id
,
688 "Drive.UploadToDriveFileSize"));
690 UploadExistingFileParams params
;
691 params
.resource_id
= resource_id
;
692 params
.local_file_path
= local_file_path
;
693 params
.content_type
= content_type
;
694 params
.options
= options
;
696 ResumeUploadParams resume_params
;
697 resume_params
.local_file_path
= params
.local_file_path
;
698 resume_params
.content_type
= params
.content_type
;
700 params
.callback
= base::Bind(&JobScheduler::OnUploadCompletionJobDone
,
701 weak_ptr_factory_
.GetWeakPtr(),
702 new_job
->job_info
.job_id
,
705 params
.progress_callback
= base::Bind(&JobScheduler::UpdateProgress
,
706 weak_ptr_factory_
.GetWeakPtr(),
707 new_job
->job_info
.job_id
);
708 new_job
->task
= base::Bind(&RunUploadExistingFile
, uploader_
.get(), params
);
709 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
713 void JobScheduler::AddPermission(
714 const std::string
& resource_id
,
715 const std::string
& email
,
716 google_apis::drive::PermissionRole role
,
717 const google_apis::EntryActionCallback
& callback
) {
718 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
719 DCHECK(!callback
.is_null());
721 JobEntry
* new_job
= CreateNewJob(TYPE_ADD_PERMISSION
);
722 new_job
->task
= base::Bind(&DriveServiceInterface::AddPermission
,
723 base::Unretained(drive_service_
),
727 base::Bind(&JobScheduler::OnEntryActionJobDone
,
728 weak_ptr_factory_
.GetWeakPtr(),
729 new_job
->job_info
.job_id
,
731 new_job
->abort_callback
= callback
;
735 JobScheduler::JobEntry
* JobScheduler::CreateNewJob(JobType type
) {
736 JobEntry
* job
= new JobEntry(type
);
737 job
->job_info
.job_id
= job_map_
.Add(job
); // Takes the ownership of |job|.
741 void JobScheduler::StartJob(JobEntry
* job
) {
742 DCHECK(!job
->task
.is_null());
744 QueueJob(job
->job_info
.job_id
);
745 NotifyJobAdded(job
->job_info
);
746 DoJobLoop(GetJobQueueType(job
->job_info
.job_type
));
749 void JobScheduler::QueueJob(JobID job_id
) {
750 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
752 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
754 const JobInfo
& job_info
= job_entry
->job_info
;
756 const QueueType queue_type
= GetJobQueueType(job_info
.job_type
);
757 queue_
[queue_type
]->Push(job_id
, job_entry
->context
.type
);
759 // Temporary histogram for crbug.com/229650.
760 if (job_info
.job_type
== TYPE_DOWNLOAD_FILE
||
761 job_info
.job_type
== TYPE_UPLOAD_EXISTING_FILE
||
762 job_info
.job_type
== TYPE_UPLOAD_NEW_FILE
) {
763 std::vector
<JobID
> jobs_with_the_same_priority
;
764 queue_
[queue_type
]->GetQueuedJobs(job_entry
->context
.type
,
765 &jobs_with_the_same_priority
);
766 DCHECK(!jobs_with_the_same_priority
.empty());
768 const size_t blocking_jobs_count
= jobs_with_the_same_priority
.size() - 1;
769 UMA_HISTOGRAM_COUNTS_10000("Drive.TransferBlockedOnJobs",
770 blocking_jobs_count
);
773 const std::string retry_prefix
= job_entry
->retry_count
> 0 ?
774 base::StringPrintf(" (retry %d)", job_entry
->retry_count
) : "";
775 logger_
->Log(logging::LOG_INFO
,
776 "Job queued%s: %s - %s",
777 retry_prefix
.c_str(),
778 job_info
.ToString().c_str(),
779 GetQueueInfo(queue_type
).c_str());
782 void JobScheduler::DoJobLoop(QueueType queue_type
) {
783 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
785 const int accepted_priority
= GetCurrentAcceptedPriority(queue_type
);
787 // Abort all USER_INITAITED jobs when not accepted.
788 if (accepted_priority
< USER_INITIATED
) {
789 std::vector
<JobID
> jobs
;
790 queue_
[queue_type
]->GetQueuedJobs(USER_INITIATED
, &jobs
);
791 for (size_t i
= 0; i
< jobs
.size(); ++i
) {
792 JobEntry
* job
= job_map_
.Lookup(jobs
[i
]);
794 AbortNotRunningJob(job
, google_apis::DRIVE_NO_CONNECTION
);
798 // Wait when throttled.
799 const base::Time now
= base::Time::Now();
800 if (now
< wait_until_
) {
801 base::MessageLoopProxy::current()->PostDelayedTask(
803 base::Bind(&JobScheduler::DoJobLoop
,
804 weak_ptr_factory_
.GetWeakPtr(),
810 // Run the job with the highest priority in the queue.
812 if (!queue_
[queue_type
]->PopForRun(accepted_priority
, &job_id
))
815 JobEntry
* entry
= job_map_
.Lookup(job_id
);
818 JobInfo
* job_info
= &entry
->job_info
;
819 job_info
->state
= STATE_RUNNING
;
820 job_info
->start_time
= now
;
821 NotifyJobUpdated(*job_info
);
823 entry
->cancel_callback
= entry
->task
.Run();
827 logger_
->Log(logging::LOG_INFO
,
828 "Job started: %s - %s",
829 job_info
->ToString().c_str(),
830 GetQueueInfo(queue_type
).c_str());
833 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type
) {
834 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
836 const int kNoJobShouldRun
= -1;
838 // Should stop if Drive was disabled while running the fetch loop.
839 if (pref_service_
->GetBoolean(prefs::kDisableDrive
))
840 return kNoJobShouldRun
;
842 // Should stop if the network is not online.
843 if (net::NetworkChangeNotifier::IsOffline())
844 return kNoJobShouldRun
;
846 // For the file queue, if it is on cellular network, only user initiated
847 // operations are allowed to start.
848 if (queue_type
== FILE_QUEUE
&&
849 pref_service_
->GetBoolean(prefs::kDisableDriveOverCellular
) &&
850 net::NetworkChangeNotifier::IsConnectionCellular(
851 net::NetworkChangeNotifier::GetConnectionType()))
852 return USER_INITIATED
;
854 // Otherwise, every operations including background tasks are allowed.
858 void JobScheduler::UpdateWait() {
859 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
861 if (disable_throttling_
|| throttle_count_
== 0)
864 // Exponential backoff: https://developers.google.com/drive/handle-errors.
865 base::TimeDelta delay
=
866 base::TimeDelta::FromSeconds(1 << (throttle_count_
- 1)) +
867 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
868 VLOG(1) << "Throttling for " << delay
.InMillisecondsF();
870 wait_until_
= std::max(wait_until_
, base::Time::Now() + delay
);
873 bool JobScheduler::OnJobDone(JobID job_id
,
874 google_apis::DriveApiErrorCode error
) {
875 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
877 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
879 JobInfo
* job_info
= &job_entry
->job_info
;
880 QueueType queue_type
= GetJobQueueType(job_info
->job_type
);
881 queue_
[queue_type
]->MarkFinished(job_id
);
883 const base::TimeDelta elapsed
= base::Time::Now() - job_info
->start_time
;
884 bool success
= (GDataToFileError(error
) == FILE_ERROR_OK
);
885 logger_
->Log(success
? logging::LOG_INFO
: logging::LOG_WARNING
,
886 "Job done: %s => %s (elapsed time: %sms) - %s",
887 job_info
->ToString().c_str(),
888 DriveApiErrorCodeToString(error
).c_str(),
889 base::Int64ToString(elapsed
.InMilliseconds()).c_str(),
890 GetQueueInfo(queue_type
).c_str());
892 // Retry, depending on the error.
893 const bool is_server_error
=
894 error
== google_apis::HTTP_SERVICE_UNAVAILABLE
||
895 error
== google_apis::HTTP_INTERNAL_SERVER_ERROR
;
896 if (is_server_error
) {
897 if (throttle_count_
< kMaxThrottleCount
)
904 const bool should_retry
=
905 is_server_error
&& job_entry
->retry_count
< kMaxRetryCount
;
907 job_entry
->cancel_callback
.Reset();
908 job_info
->state
= STATE_RETRY
;
909 NotifyJobUpdated(*job_info
);
911 ++job_entry
->retry_count
;
916 NotifyJobDone(*job_info
, error
);
917 // The job has finished, no retry will happen in the scheduler. Now we can
918 // remove the job info from the map.
919 job_map_
.Remove(job_id
);
922 // Post a task to continue the job loop. This allows us to finish handling
923 // the current job before starting the next one.
924 base::MessageLoopProxy::current()->PostTask(FROM_HERE
,
925 base::Bind(&JobScheduler::DoJobLoop
,
926 weak_ptr_factory_
.GetWeakPtr(),
928 return !should_retry
;
931 void JobScheduler::OnGetFileListJobDone(
933 const google_apis::FileListCallback
& callback
,
934 google_apis::DriveApiErrorCode error
,
935 scoped_ptr
<google_apis::FileList
> file_list
) {
936 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
937 DCHECK(!callback
.is_null());
939 if (OnJobDone(job_id
, error
))
940 callback
.Run(error
, file_list
.Pass());
943 void JobScheduler::OnGetChangeListJobDone(
945 const google_apis::ChangeListCallback
& callback
,
946 google_apis::DriveApiErrorCode error
,
947 scoped_ptr
<google_apis::ChangeList
> change_list
) {
948 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
949 DCHECK(!callback
.is_null());
951 if (OnJobDone(job_id
, error
))
952 callback
.Run(error
, change_list
.Pass());
955 void JobScheduler::OnGetFileResourceJobDone(
957 const google_apis::FileResourceCallback
& callback
,
958 google_apis::DriveApiErrorCode error
,
959 scoped_ptr
<google_apis::FileResource
> entry
) {
960 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
961 DCHECK(!callback
.is_null());
963 if (OnJobDone(job_id
, error
))
964 callback
.Run(error
, entry
.Pass());
967 void JobScheduler::OnGetAboutResourceJobDone(
969 const google_apis::AboutResourceCallback
& callback
,
970 google_apis::DriveApiErrorCode error
,
971 scoped_ptr
<google_apis::AboutResource
> about_resource
) {
972 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
973 DCHECK(!callback
.is_null());
975 if (OnJobDone(job_id
, error
))
976 callback
.Run(error
, about_resource
.Pass());
979 void JobScheduler::OnGetShareUrlJobDone(
981 const google_apis::GetShareUrlCallback
& callback
,
982 google_apis::DriveApiErrorCode error
,
983 const GURL
& share_url
) {
984 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
985 DCHECK(!callback
.is_null());
987 if (OnJobDone(job_id
, error
))
988 callback
.Run(error
, share_url
);
991 void JobScheduler::OnGetAppListJobDone(
993 const google_apis::AppListCallback
& callback
,
994 google_apis::DriveApiErrorCode error
,
995 scoped_ptr
<google_apis::AppList
> app_list
) {
996 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
997 DCHECK(!callback
.is_null());
999 if (OnJobDone(job_id
, error
))
1000 callback
.Run(error
, app_list
.Pass());
1003 void JobScheduler::OnEntryActionJobDone(
1005 const google_apis::EntryActionCallback
& callback
,
1006 google_apis::DriveApiErrorCode error
) {
1007 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1008 DCHECK(!callback
.is_null());
1010 if (OnJobDone(job_id
, error
))
1011 callback
.Run(error
);
1014 void JobScheduler::OnDownloadActionJobDone(
1016 const google_apis::DownloadActionCallback
& callback
,
1017 google_apis::DriveApiErrorCode error
,
1018 const base::FilePath
& temp_file
) {
1019 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1020 DCHECK(!callback
.is_null());
1022 if (OnJobDone(job_id
, error
))
1023 callback
.Run(error
, temp_file
);
1026 void JobScheduler::OnUploadCompletionJobDone(
1028 const ResumeUploadParams
& resume_params
,
1029 const google_apis::FileResourceCallback
& callback
,
1030 google_apis::DriveApiErrorCode error
,
1031 const GURL
& upload_location
,
1032 scoped_ptr
<google_apis::FileResource
> entry
) {
1033 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1034 DCHECK(!callback
.is_null());
1036 if (!upload_location
.is_empty()) {
1037 // If upload_location is available, update the task to resume the
1038 // upload process from the terminated point.
1039 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1040 // so OnJobDone called below will be in charge to re-queue the job.
1041 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
1044 ResumeUploadFileParams params
;
1045 params
.upload_location
= upload_location
;
1046 params
.local_file_path
= resume_params
.local_file_path
;
1047 params
.content_type
= resume_params
.content_type
;
1048 params
.callback
= base::Bind(&JobScheduler::OnResumeUploadFileDone
,
1049 weak_ptr_factory_
.GetWeakPtr(),
1053 params
.progress_callback
= base::Bind(&JobScheduler::UpdateProgress
,
1054 weak_ptr_factory_
.GetWeakPtr(),
1056 job_entry
->task
= base::Bind(&RunResumeUploadFile
, uploader_
.get(), params
);
1059 if (OnJobDone(job_id
, error
))
1060 callback
.Run(error
, entry
.Pass());
1063 void JobScheduler::OnResumeUploadFileDone(
1065 const base::Callback
<google_apis::CancelCallback()>& original_task
,
1066 const google_apis::FileResourceCallback
& callback
,
1067 google_apis::DriveApiErrorCode error
,
1068 const GURL
& upload_location
,
1069 scoped_ptr
<google_apis::FileResource
> entry
) {
1070 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1071 DCHECK(!original_task
.is_null());
1072 DCHECK(!callback
.is_null());
1074 if (upload_location
.is_empty()) {
1075 // If upload_location is not available, we should discard it and stop trying
1076 // to resume. Restore the original task.
1077 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
1079 job_entry
->task
= original_task
;
1082 if (OnJobDone(job_id
, error
))
1083 callback
.Run(error
, entry
.Pass());
1086 void JobScheduler::UpdateProgress(JobID job_id
, int64 progress
, int64 total
) {
1087 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
1090 job_entry
->job_info
.num_completed_bytes
= progress
;
1092 job_entry
->job_info
.num_total_bytes
= total
;
1093 NotifyJobUpdated(job_entry
->job_info
);
1096 void JobScheduler::OnConnectionTypeChanged(
1097 net::NetworkChangeNotifier::ConnectionType type
) {
1098 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1100 // Resume the job loop.
1101 // Note that we don't need to check the network connection status as it will
1102 // be checked in GetCurrentAcceptedPriority().
1103 for (int i
= METADATA_QUEUE
; i
< NUM_QUEUES
; ++i
)
1104 DoJobLoop(static_cast<QueueType
>(i
));
1107 void JobScheduler::OnGotFileSizeForJob(JobID job_id
,
1108 const std::string
& histogram_name
,
1110 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1114 // Temporary histogram for crbug.com/229650.
1115 CollectCopyHistogramSample(histogram_name
, *size
);
1117 JobEntry
* const job_entry
= job_map_
.Lookup(job_id
);
1121 job_entry
->job_info
.num_total_bytes
= *size
;
1122 NotifyJobUpdated(job_entry
->job_info
);
1125 JobScheduler::QueueType
JobScheduler::GetJobQueueType(JobType type
) {
1127 case TYPE_GET_ABOUT_RESOURCE
:
1128 case TYPE_GET_APP_LIST
:
1129 case TYPE_GET_ALL_RESOURCE_LIST
:
1130 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY
:
1132 case TYPE_GET_CHANGE_LIST
:
1133 case TYPE_GET_REMAINING_CHANGE_LIST
:
1134 case TYPE_GET_REMAINING_FILE_LIST
:
1135 case TYPE_GET_RESOURCE_ENTRY
:
1136 case TYPE_GET_SHARE_URL
:
1137 case TYPE_TRASH_RESOURCE
:
1138 case TYPE_COPY_RESOURCE
:
1139 case TYPE_UPDATE_RESOURCE
:
1140 case TYPE_ADD_RESOURCE_TO_DIRECTORY
:
1141 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY
:
1142 case TYPE_ADD_NEW_DIRECTORY
:
1143 case TYPE_ADD_PERMISSION
:
1144 return METADATA_QUEUE
;
1146 case TYPE_DOWNLOAD_FILE
:
1147 case TYPE_UPLOAD_NEW_FILE
:
1148 case TYPE_UPLOAD_EXISTING_FILE
:
1155 void JobScheduler::AbortNotRunningJob(JobEntry
* job
,
1156 google_apis::DriveApiErrorCode error
) {
1157 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1159 const base::TimeDelta elapsed
= base::Time::Now() - job
->job_info
.start_time
;
1160 const QueueType queue_type
= GetJobQueueType(job
->job_info
.job_type
);
1161 logger_
->Log(logging::LOG_INFO
,
1162 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1163 job
->job_info
.ToString().c_str(),
1164 DriveApiErrorCodeToString(error
).c_str(),
1165 base::Int64ToString(elapsed
.InMilliseconds()).c_str(),
1166 GetQueueInfo(queue_type
).c_str());
1168 base::Callback
<void(google_apis::DriveApiErrorCode
)> callback
=
1169 job
->abort_callback
;
1170 queue_
[GetJobQueueType(job
->job_info
.job_type
)]->Remove(job
->job_info
.job_id
);
1171 NotifyJobDone(job
->job_info
, error
);
1172 job_map_
.Remove(job
->job_info
.job_id
);
1173 base::MessageLoopProxy::current()->PostTask(FROM_HERE
,
1174 base::Bind(callback
, error
));
1177 void JobScheduler::NotifyJobAdded(const JobInfo
& job_info
) {
1178 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1179 FOR_EACH_OBSERVER(JobListObserver
, observer_list_
, OnJobAdded(job_info
));
1182 void JobScheduler::NotifyJobDone(const JobInfo
& job_info
,
1183 google_apis::DriveApiErrorCode error
) {
1184 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1185 FOR_EACH_OBSERVER(JobListObserver
, observer_list_
,
1186 OnJobDone(job_info
, GDataToFileError(error
)));
1189 void JobScheduler::NotifyJobUpdated(const JobInfo
& job_info
) {
1190 DCHECK_CURRENTLY_ON(BrowserThread::UI
);
1191 FOR_EACH_OBSERVER(JobListObserver
, observer_list_
, OnJobUpdated(job_info
));
1194 std::string
JobScheduler::GetQueueInfo(QueueType type
) const {
1195 return QueueTypeToString(type
) + " " + queue_
[type
]->ToString();
1199 std::string
JobScheduler::QueueTypeToString(QueueType type
) {
1201 case METADATA_QUEUE
:
1202 return "METADATA_QUEUE";
1204 return "FILE_QUEUE";
1206 break; // This value is just a sentinel. Should never be used.
1212 } // namespace drive