1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/chromeos/drive/file_system_util.h"
13 #include "chrome/browser/chromeos/drive/logging.h"
14 #include "chrome/common/pref_names.h"
15 #include "content/public/browser/browser_thread.h"
16 #include "google_apis/drive/drive_api_parser.h"
18 using content::BrowserThread
;
24 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
25 // throttling or server error. The delay before retrying a job is shared among
26 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
28 // According to the API documentation, kMaxRetryCount should be the same as
29 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
30 // But currently multiplied by 2 to ensure upload related jobs retried for a
31 // sufficient number of times. crbug.com/269918
32 const int kMaxThrottleCount
= 4;
33 const int kMaxRetryCount
= 2 * kMaxThrottleCount
;
35 // GetDefaultValue returns a value constructed by the default constructor.
36 template<typename T
> struct DefaultValueCreator
{
37 static T
GetDefaultValue() { return T(); }
39 template<typename T
> struct DefaultValueCreator
<const T
&> {
40 static T
GetDefaultValue() { return T(); }
43 // Helper of CreateErrorRunCallback implementation.
45 // - ResultType; the type of the Callback which should be returned by
46 // CreateErrorRunCallback.
47 // - Run(): a static function which takes the original |callback| and |error|,
48 // and runs the |callback|.Run() with the error code and default values
49 // for remaining arguments.
50 template<typename CallbackType
> struct CreateErrorRunCallbackHelper
;
52 // CreateErrorRunCallback with two arguments.
54 struct CreateErrorRunCallbackHelper
<void(google_apis::GDataErrorCode
, P1
)> {
56 const base::Callback
<void(google_apis::GDataErrorCode
, P1
)>& callback
,
57 google_apis::GDataErrorCode error
) {
58 callback
.Run(error
, DefaultValueCreator
<P1
>::GetDefaultValue());
62 // Returns a callback with the tail parameter bound to its default value.
63 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
64 template<typename CallbackType
>
65 base::Callback
<void(google_apis::GDataErrorCode
)>
66 CreateErrorRunCallback(const base::Callback
<CallbackType
>& callback
) {
67 return base::Bind(&CreateErrorRunCallbackHelper
<CallbackType
>::Run
, callback
);
70 // Parameter struct for RunUploadNewFile.
71 struct UploadNewFileParams
{
72 std::string parent_resource_id
;
73 base::FilePath local_file_path
;
75 std::string content_type
;
76 UploadCompletionCallback callback
;
77 google_apis::ProgressCallback progress_callback
;
80 // Helper function to work around the arity limitation of base::Bind.
81 google_apis::CancelCallback
RunUploadNewFile(
82 DriveUploaderInterface
* uploader
,
83 const UploadNewFileParams
& params
) {
84 return uploader
->UploadNewFile(params
.parent_resource_id
,
85 params
.local_file_path
,
89 params
.progress_callback
);
92 // Parameter struct for RunUploadExistingFile.
93 struct UploadExistingFileParams
{
94 std::string resource_id
;
95 base::FilePath local_file_path
;
96 std::string content_type
;
98 UploadCompletionCallback callback
;
99 google_apis::ProgressCallback progress_callback
;
102 // Helper function to work around the arity limitation of base::Bind.
103 google_apis::CancelCallback
RunUploadExistingFile(
104 DriveUploaderInterface
* uploader
,
105 const UploadExistingFileParams
& params
) {
106 return uploader
->UploadExistingFile(params
.resource_id
,
107 params
.local_file_path
,
111 params
.progress_callback
);
114 // Parameter struct for RunResumeUploadFile.
115 struct ResumeUploadFileParams
{
116 GURL upload_location
;
117 base::FilePath local_file_path
;
118 std::string content_type
;
119 UploadCompletionCallback callback
;
120 google_apis::ProgressCallback progress_callback
;
123 // Helper function to adjust the return type.
124 google_apis::CancelCallback
RunResumeUploadFile(
125 DriveUploaderInterface
* uploader
,
126 const ResumeUploadFileParams
& params
) {
127 return uploader
->ResumeUploadFile(params
.upload_location
,
128 params
.local_file_path
,
131 params
.progress_callback
);
136 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
137 const int JobScheduler::kMaxJobCount
[] = {
142 JobScheduler::JobEntry::JobEntry(JobType type
)
144 context(ClientContext(USER_INITIATED
)),
146 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
149 JobScheduler::JobEntry::~JobEntry() {
150 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
153 struct JobScheduler::ResumeUploadParams
{
154 base::FilePath drive_file_path
;
155 base::FilePath local_file_path
;
156 std::string content_type
;
159 JobScheduler::JobScheduler(
160 PrefService
* pref_service
,
161 DriveServiceInterface
* drive_service
,
162 base::SequencedTaskRunner
* blocking_task_runner
)
163 : throttle_count_(0),
164 wait_until_(base::Time::Now()),
165 disable_throttling_(false),
166 drive_service_(drive_service
),
167 uploader_(new DriveUploader(drive_service
, blocking_task_runner
)),
168 pref_service_(pref_service
),
169 weak_ptr_factory_(this) {
170 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
172 for (int i
= 0; i
< NUM_QUEUES
; ++i
)
173 queue_
[i
].reset(new JobQueue(kMaxJobCount
[i
], NUM_CONTEXT_TYPES
));
175 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
178 JobScheduler::~JobScheduler() {
179 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
181 size_t num_queued_jobs
= 0;
182 for (int i
= 0; i
< NUM_QUEUES
; ++i
)
183 num_queued_jobs
+= queue_
[i
]->GetNumberOfJobs();
184 DCHECK_EQ(num_queued_jobs
, job_map_
.size());
186 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
189 std::vector
<JobInfo
> JobScheduler::GetJobInfoList() {
190 std::vector
<JobInfo
> job_info_list
;
191 for (JobIDMap::iterator
iter(&job_map_
); !iter
.IsAtEnd(); iter
.Advance())
192 job_info_list
.push_back(iter
.GetCurrentValue()->job_info
);
193 return job_info_list
;
196 void JobScheduler::AddObserver(JobListObserver
* observer
) {
197 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
198 observer_list_
.AddObserver(observer
);
201 void JobScheduler::RemoveObserver(JobListObserver
* observer
) {
202 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
203 observer_list_
.RemoveObserver(observer
);
206 void JobScheduler::CancelJob(JobID job_id
) {
207 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
209 JobEntry
* job
= job_map_
.Lookup(job_id
);
211 if (job
->job_info
.state
== STATE_RUNNING
) {
212 // If the job is running an HTTP request, cancel it via |cancel_callback|
213 // returned from the request, and wait for termination in the normal
214 // callback handler, OnJobDone.
215 if (!job
->cancel_callback
.is_null())
216 job
->cancel_callback
.Run();
218 AbortNotRunningJob(job
, google_apis::GDATA_CANCELLED
);
223 void JobScheduler::CancelAllJobs() {
224 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
226 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
227 // removable during iteration.
228 for (JobIDMap::iterator
iter(&job_map_
); !iter
.IsAtEnd(); iter
.Advance())
229 CancelJob(iter
.GetCurrentKey());
232 void JobScheduler::GetAboutResource(
233 const google_apis::AboutResourceCallback
& callback
) {
234 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
235 DCHECK(!callback
.is_null());
237 JobEntry
* new_job
= CreateNewJob(TYPE_GET_ABOUT_RESOURCE
);
238 new_job
->task
= base::Bind(
239 &DriveServiceInterface::GetAboutResource
,
240 base::Unretained(drive_service_
),
241 base::Bind(&JobScheduler::OnGetAboutResourceJobDone
,
242 weak_ptr_factory_
.GetWeakPtr(),
243 new_job
->job_info
.job_id
,
245 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
249 void JobScheduler::GetAppList(const google_apis::AppListCallback
& callback
) {
250 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
251 DCHECK(!callback
.is_null());
253 JobEntry
* new_job
= CreateNewJob(TYPE_GET_APP_LIST
);
254 new_job
->task
= base::Bind(
255 &DriveServiceInterface::GetAppList
,
256 base::Unretained(drive_service_
),
257 base::Bind(&JobScheduler::OnGetAppListJobDone
,
258 weak_ptr_factory_
.GetWeakPtr(),
259 new_job
->job_info
.job_id
,
261 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
265 void JobScheduler::GetAllResourceList(
266 const google_apis::GetResourceListCallback
& callback
) {
267 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
268 DCHECK(!callback
.is_null());
270 JobEntry
* new_job
= CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST
);
271 new_job
->task
= base::Bind(
272 &DriveServiceInterface::GetAllResourceList
,
273 base::Unretained(drive_service_
),
274 base::Bind(&JobScheduler::OnGetResourceListJobDone
,
275 weak_ptr_factory_
.GetWeakPtr(),
276 new_job
->job_info
.job_id
,
278 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
282 void JobScheduler::GetResourceListInDirectory(
283 const std::string
& directory_resource_id
,
284 const google_apis::GetResourceListCallback
& callback
) {
285 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
286 DCHECK(!callback
.is_null());
288 JobEntry
* new_job
= CreateNewJob(
289 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY
);
290 new_job
->task
= base::Bind(
291 &DriveServiceInterface::GetResourceListInDirectory
,
292 base::Unretained(drive_service_
),
293 directory_resource_id
,
294 base::Bind(&JobScheduler::OnGetResourceListJobDone
,
295 weak_ptr_factory_
.GetWeakPtr(),
296 new_job
->job_info
.job_id
,
298 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
302 void JobScheduler::Search(
303 const std::string
& search_query
,
304 const google_apis::GetResourceListCallback
& callback
) {
305 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
306 DCHECK(!callback
.is_null());
308 JobEntry
* new_job
= CreateNewJob(TYPE_SEARCH
);
309 new_job
->task
= base::Bind(
310 &DriveServiceInterface::Search
,
311 base::Unretained(drive_service_
),
313 base::Bind(&JobScheduler::OnGetResourceListJobDone
,
314 weak_ptr_factory_
.GetWeakPtr(),
315 new_job
->job_info
.job_id
,
317 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
321 void JobScheduler::GetChangeList(
322 int64 start_changestamp
,
323 const google_apis::GetResourceListCallback
& callback
) {
324 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
325 DCHECK(!callback
.is_null());
327 JobEntry
* new_job
= CreateNewJob(TYPE_GET_CHANGE_LIST
);
328 new_job
->task
= base::Bind(
329 &DriveServiceInterface::GetChangeList
,
330 base::Unretained(drive_service_
),
332 base::Bind(&JobScheduler::OnGetResourceListJobDone
,
333 weak_ptr_factory_
.GetWeakPtr(),
334 new_job
->job_info
.job_id
,
336 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
340 void JobScheduler::GetRemainingChangeList(
341 const GURL
& next_link
,
342 const google_apis::GetResourceListCallback
& callback
) {
343 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
344 DCHECK(!callback
.is_null());
346 JobEntry
* new_job
= CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST
);
347 new_job
->task
= base::Bind(
348 &DriveServiceInterface::GetRemainingChangeList
,
349 base::Unretained(drive_service_
),
351 base::Bind(&JobScheduler::OnGetResourceListJobDone
,
352 weak_ptr_factory_
.GetWeakPtr(),
353 new_job
->job_info
.job_id
,
355 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
359 void JobScheduler::GetRemainingFileList(
360 const GURL
& next_link
,
361 const google_apis::GetResourceListCallback
& callback
) {
362 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
363 DCHECK(!callback
.is_null());
365 JobEntry
* new_job
= CreateNewJob(TYPE_GET_REMAINING_FILE_LIST
);
366 new_job
->task
= base::Bind(
367 &DriveServiceInterface::GetRemainingFileList
,
368 base::Unretained(drive_service_
),
370 base::Bind(&JobScheduler::OnGetResourceListJobDone
,
371 weak_ptr_factory_
.GetWeakPtr(),
372 new_job
->job_info
.job_id
,
374 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
378 void JobScheduler::GetResourceEntry(
379 const std::string
& resource_id
,
380 const ClientContext
& context
,
381 const google_apis::GetResourceEntryCallback
& callback
) {
382 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
383 DCHECK(!callback
.is_null());
385 JobEntry
* new_job
= CreateNewJob(TYPE_GET_RESOURCE_ENTRY
);
386 new_job
->context
= context
;
387 new_job
->task
= base::Bind(
388 &DriveServiceInterface::GetResourceEntry
,
389 base::Unretained(drive_service_
),
391 base::Bind(&JobScheduler::OnGetResourceEntryJobDone
,
392 weak_ptr_factory_
.GetWeakPtr(),
393 new_job
->job_info
.job_id
,
395 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
399 void JobScheduler::GetShareUrl(
400 const std::string
& resource_id
,
401 const GURL
& embed_origin
,
402 const ClientContext
& context
,
403 const google_apis::GetShareUrlCallback
& callback
) {
404 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
405 DCHECK(!callback
.is_null());
407 JobEntry
* new_job
= CreateNewJob(TYPE_GET_SHARE_URL
);
408 new_job
->context
= context
;
409 new_job
->task
= base::Bind(
410 &DriveServiceInterface::GetShareUrl
,
411 base::Unretained(drive_service_
),
414 base::Bind(&JobScheduler::OnGetShareUrlJobDone
,
415 weak_ptr_factory_
.GetWeakPtr(),
416 new_job
->job_info
.job_id
,
418 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
422 void JobScheduler::TrashResource(
423 const std::string
& resource_id
,
424 const ClientContext
& context
,
425 const google_apis::EntryActionCallback
& callback
) {
426 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
427 DCHECK(!callback
.is_null());
429 JobEntry
* new_job
= CreateNewJob(TYPE_TRASH_RESOURCE
);
430 new_job
->context
= context
;
431 new_job
->task
= base::Bind(
432 &DriveServiceInterface::TrashResource
,
433 base::Unretained(drive_service_
),
435 base::Bind(&JobScheduler::OnEntryActionJobDone
,
436 weak_ptr_factory_
.GetWeakPtr(),
437 new_job
->job_info
.job_id
,
439 new_job
->abort_callback
= callback
;
443 void JobScheduler::CopyResource(
444 const std::string
& resource_id
,
445 const std::string
& parent_resource_id
,
446 const std::string
& new_title
,
447 const base::Time
& last_modified
,
448 const google_apis::GetResourceEntryCallback
& callback
) {
449 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
450 DCHECK(!callback
.is_null());
452 JobEntry
* new_job
= CreateNewJob(TYPE_COPY_RESOURCE
);
453 new_job
->task
= base::Bind(
454 &DriveServiceInterface::CopyResource
,
455 base::Unretained(drive_service_
),
460 base::Bind(&JobScheduler::OnGetResourceEntryJobDone
,
461 weak_ptr_factory_
.GetWeakPtr(),
462 new_job
->job_info
.job_id
,
464 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
468 void JobScheduler::UpdateResource(
469 const std::string
& resource_id
,
470 const std::string
& parent_resource_id
,
471 const std::string
& new_title
,
472 const base::Time
& last_modified
,
473 const base::Time
& last_viewed_by_me
,
474 const ClientContext
& context
,
475 const google_apis::GetResourceEntryCallback
& callback
) {
476 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
477 DCHECK(!callback
.is_null());
479 JobEntry
* new_job
= CreateNewJob(TYPE_UPDATE_RESOURCE
);
480 new_job
->context
= context
;
481 new_job
->task
= base::Bind(
482 &DriveServiceInterface::UpdateResource
,
483 base::Unretained(drive_service_
),
489 base::Bind(&JobScheduler::OnGetResourceEntryJobDone
,
490 weak_ptr_factory_
.GetWeakPtr(),
491 new_job
->job_info
.job_id
,
493 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
497 void JobScheduler::RenameResource(
498 const std::string
& resource_id
,
499 const std::string
& new_title
,
500 const google_apis::EntryActionCallback
& callback
) {
501 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
502 DCHECK(!callback
.is_null());
504 JobEntry
* new_job
= CreateNewJob(TYPE_RENAME_RESOURCE
);
505 new_job
->task
= base::Bind(
506 &DriveServiceInterface::RenameResource
,
507 base::Unretained(drive_service_
),
510 base::Bind(&JobScheduler::OnEntryActionJobDone
,
511 weak_ptr_factory_
.GetWeakPtr(),
512 new_job
->job_info
.job_id
,
514 new_job
->abort_callback
= callback
;
518 void JobScheduler::AddResourceToDirectory(
519 const std::string
& parent_resource_id
,
520 const std::string
& resource_id
,
521 const google_apis::EntryActionCallback
& callback
) {
522 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
523 DCHECK(!callback
.is_null());
525 JobEntry
* new_job
= CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY
);
526 new_job
->task
= base::Bind(
527 &DriveServiceInterface::AddResourceToDirectory
,
528 base::Unretained(drive_service_
),
531 base::Bind(&JobScheduler::OnEntryActionJobDone
,
532 weak_ptr_factory_
.GetWeakPtr(),
533 new_job
->job_info
.job_id
,
535 new_job
->abort_callback
= callback
;
539 void JobScheduler::RemoveResourceFromDirectory(
540 const std::string
& parent_resource_id
,
541 const std::string
& resource_id
,
542 const ClientContext
& context
,
543 const google_apis::EntryActionCallback
& callback
) {
544 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
546 JobEntry
* new_job
= CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY
);
547 new_job
->context
= context
;
548 new_job
->task
= base::Bind(
549 &DriveServiceInterface::RemoveResourceFromDirectory
,
550 base::Unretained(drive_service_
),
553 base::Bind(&JobScheduler::OnEntryActionJobDone
,
554 weak_ptr_factory_
.GetWeakPtr(),
555 new_job
->job_info
.job_id
,
557 new_job
->abort_callback
= callback
;
561 void JobScheduler::AddNewDirectory(
562 const std::string
& parent_resource_id
,
563 const std::string
& directory_title
,
564 const google_apis::GetResourceEntryCallback
& callback
) {
565 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
567 JobEntry
* new_job
= CreateNewJob(TYPE_ADD_NEW_DIRECTORY
);
568 new_job
->task
= base::Bind(
569 &DriveServiceInterface::AddNewDirectory
,
570 base::Unretained(drive_service_
),
573 base::Bind(&JobScheduler::OnGetResourceEntryJobDone
,
574 weak_ptr_factory_
.GetWeakPtr(),
575 new_job
->job_info
.job_id
,
577 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
581 JobID
JobScheduler::DownloadFile(
582 const base::FilePath
& virtual_path
,
583 int64 expected_file_size
,
584 const base::FilePath
& local_cache_path
,
585 const std::string
& resource_id
,
586 const ClientContext
& context
,
587 const google_apis::DownloadActionCallback
& download_action_callback
,
588 const google_apis::GetContentCallback
& get_content_callback
) {
589 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
591 JobEntry
* new_job
= CreateNewJob(TYPE_DOWNLOAD_FILE
);
592 new_job
->job_info
.file_path
= virtual_path
;
593 new_job
->job_info
.num_total_bytes
= expected_file_size
;
594 new_job
->context
= context
;
595 new_job
->task
= base::Bind(
596 &DriveServiceInterface::DownloadFile
,
597 base::Unretained(drive_service_
),
600 base::Bind(&JobScheduler::OnDownloadActionJobDone
,
601 weak_ptr_factory_
.GetWeakPtr(),
602 new_job
->job_info
.job_id
,
603 download_action_callback
),
604 get_content_callback
,
605 base::Bind(&JobScheduler::UpdateProgress
,
606 weak_ptr_factory_
.GetWeakPtr(),
607 new_job
->job_info
.job_id
));
608 new_job
->abort_callback
= CreateErrorRunCallback(download_action_callback
);
610 return new_job
->job_info
.job_id
;
613 void JobScheduler::UploadNewFile(
614 const std::string
& parent_resource_id
,
615 const base::FilePath
& drive_file_path
,
616 const base::FilePath
& local_file_path
,
617 const std::string
& title
,
618 const std::string
& content_type
,
619 const ClientContext
& context
,
620 const google_apis::GetResourceEntryCallback
& callback
) {
621 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
623 JobEntry
* new_job
= CreateNewJob(TYPE_UPLOAD_NEW_FILE
);
624 new_job
->job_info
.file_path
= drive_file_path
;
625 new_job
->context
= context
;
627 UploadNewFileParams params
;
628 params
.parent_resource_id
= parent_resource_id
;
629 params
.local_file_path
= local_file_path
;
630 params
.title
= title
;
631 params
.content_type
= content_type
;
633 ResumeUploadParams resume_params
;
634 resume_params
.local_file_path
= params
.local_file_path
;
635 resume_params
.content_type
= params
.content_type
;
637 params
.callback
= base::Bind(&JobScheduler::OnUploadCompletionJobDone
,
638 weak_ptr_factory_
.GetWeakPtr(),
639 new_job
->job_info
.job_id
,
642 params
.progress_callback
= base::Bind(&JobScheduler::UpdateProgress
,
643 weak_ptr_factory_
.GetWeakPtr(),
644 new_job
->job_info
.job_id
);
645 new_job
->task
= base::Bind(&RunUploadNewFile
, uploader_
.get(), params
);
646 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
650 void JobScheduler::UploadExistingFile(
651 const std::string
& resource_id
,
652 const base::FilePath
& drive_file_path
,
653 const base::FilePath
& local_file_path
,
654 const std::string
& content_type
,
655 const std::string
& etag
,
656 const ClientContext
& context
,
657 const google_apis::GetResourceEntryCallback
& callback
) {
658 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
660 JobEntry
* new_job
= CreateNewJob(TYPE_UPLOAD_EXISTING_FILE
);
661 new_job
->job_info
.file_path
= drive_file_path
;
662 new_job
->context
= context
;
664 UploadExistingFileParams params
;
665 params
.resource_id
= resource_id
;
666 params
.local_file_path
= local_file_path
;
667 params
.content_type
= content_type
;
670 ResumeUploadParams resume_params
;
671 resume_params
.local_file_path
= params
.local_file_path
;
672 resume_params
.content_type
= params
.content_type
;
674 params
.callback
= base::Bind(&JobScheduler::OnUploadCompletionJobDone
,
675 weak_ptr_factory_
.GetWeakPtr(),
676 new_job
->job_info
.job_id
,
679 params
.progress_callback
= base::Bind(&JobScheduler::UpdateProgress
,
680 weak_ptr_factory_
.GetWeakPtr(),
681 new_job
->job_info
.job_id
);
682 new_job
->task
= base::Bind(&RunUploadExistingFile
, uploader_
.get(), params
);
683 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
687 void JobScheduler::CreateFile(
688 const std::string
& parent_resource_id
,
689 const base::FilePath
& drive_file_path
,
690 const std::string
& title
,
691 const std::string
& content_type
,
692 const ClientContext
& context
,
693 const google_apis::GetResourceEntryCallback
& callback
) {
694 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
696 const base::FilePath
kDevNull(FILE_PATH_LITERAL("/dev/null"));
698 JobEntry
* new_job
= CreateNewJob(TYPE_CREATE_FILE
);
699 new_job
->job_info
.file_path
= drive_file_path
;
700 new_job
->context
= context
;
702 UploadNewFileParams params
;
703 params
.parent_resource_id
= parent_resource_id
;
704 params
.local_file_path
= kDevNull
; // Upload an empty file.
705 params
.title
= title
;
706 params
.content_type
= content_type
;
708 ResumeUploadParams resume_params
;
709 resume_params
.local_file_path
= params
.local_file_path
;
710 resume_params
.content_type
= params
.content_type
;
712 params
.callback
= base::Bind(&JobScheduler::OnUploadCompletionJobDone
,
713 weak_ptr_factory_
.GetWeakPtr(),
714 new_job
->job_info
.job_id
,
717 params
.progress_callback
= google_apis::ProgressCallback();
719 new_job
->task
= base::Bind(&RunUploadNewFile
, uploader_
.get(), params
);
720 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
724 void JobScheduler::GetResourceListInDirectoryByWapi(
725 const std::string
& directory_resource_id
,
726 const google_apis::GetResourceListCallback
& callback
) {
727 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
728 DCHECK(!callback
.is_null());
730 JobEntry
* new_job
= CreateNewJob(
731 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI
);
732 new_job
->task
= base::Bind(
733 &DriveServiceInterface::GetResourceListInDirectoryByWapi
,
734 base::Unretained(drive_service_
),
735 directory_resource_id
,
736 base::Bind(&JobScheduler::OnGetResourceListJobDone
,
737 weak_ptr_factory_
.GetWeakPtr(),
738 new_job
->job_info
.job_id
,
740 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
744 void JobScheduler::GetRemainingResourceList(
745 const GURL
& next_link
,
746 const google_apis::GetResourceListCallback
& callback
) {
747 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
748 DCHECK(!callback
.is_null());
750 JobEntry
* new_job
= CreateNewJob(TYPE_GET_REMAINING_RESOURCE_LIST
);
751 new_job
->task
= base::Bind(
752 &DriveServiceInterface::GetRemainingResourceList
,
753 base::Unretained(drive_service_
),
755 base::Bind(&JobScheduler::OnGetResourceListJobDone
,
756 weak_ptr_factory_
.GetWeakPtr(),
757 new_job
->job_info
.job_id
,
759 new_job
->abort_callback
= CreateErrorRunCallback(callback
);
763 JobScheduler::JobEntry
* JobScheduler::CreateNewJob(JobType type
) {
764 JobEntry
* job
= new JobEntry(type
);
765 job
->job_info
.job_id
= job_map_
.Add(job
); // Takes the ownership of |job|.
769 void JobScheduler::StartJob(JobEntry
* job
) {
770 DCHECK(!job
->task
.is_null());
772 QueueJob(job
->job_info
.job_id
);
773 NotifyJobAdded(job
->job_info
);
774 DoJobLoop(GetJobQueueType(job
->job_info
.job_type
));
777 void JobScheduler::QueueJob(JobID job_id
) {
778 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
780 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
782 const JobInfo
& job_info
= job_entry
->job_info
;
784 QueueType queue_type
= GetJobQueueType(job_info
.job_type
);
785 queue_
[queue_type
]->Push(job_id
, job_entry
->context
.type
);
787 const std::string retry_prefix
= job_entry
->retry_count
> 0 ?
788 base::StringPrintf(" (retry %d)", job_entry
->retry_count
) : "";
789 util::Log(logging::LOG_INFO
,
790 "Job queued%s: %s - %s",
791 retry_prefix
.c_str(),
792 job_info
.ToString().c_str(),
793 GetQueueInfo(queue_type
).c_str());
796 void JobScheduler::DoJobLoop(QueueType queue_type
) {
797 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
799 const int accepted_priority
= GetCurrentAcceptedPriority(queue_type
);
801 // Abort all USER_INITAITED jobs when not accepted.
802 if (accepted_priority
< USER_INITIATED
) {
803 std::vector
<JobID
> jobs
;
804 queue_
[queue_type
]->GetQueuedJobs(USER_INITIATED
, &jobs
);
805 for (size_t i
= 0; i
< jobs
.size(); ++i
) {
806 JobEntry
* job
= job_map_
.Lookup(jobs
[i
]);
808 AbortNotRunningJob(job
, google_apis::GDATA_NO_CONNECTION
);
812 // Wait when throttled.
813 const base::Time now
= base::Time::Now();
814 if (now
< wait_until_
) {
815 base::MessageLoopProxy::current()->PostDelayedTask(
817 base::Bind(&JobScheduler::DoJobLoop
,
818 weak_ptr_factory_
.GetWeakPtr(),
824 // Run the job with the highest priority in the queue.
826 if (!queue_
[queue_type
]->PopForRun(accepted_priority
, &job_id
))
829 JobEntry
* entry
= job_map_
.Lookup(job_id
);
832 JobInfo
* job_info
= &entry
->job_info
;
833 job_info
->state
= STATE_RUNNING
;
834 job_info
->start_time
= now
;
835 NotifyJobUpdated(*job_info
);
837 entry
->cancel_callback
= entry
->task
.Run();
841 util::Log(logging::LOG_INFO
,
842 "Job started: %s - %s",
843 job_info
->ToString().c_str(),
844 GetQueueInfo(queue_type
).c_str());
847 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type
) {
848 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
850 const int kNoJobShouldRun
= -1;
852 // Should stop if Drive was disabled while running the fetch loop.
853 if (pref_service_
->GetBoolean(prefs::kDisableDrive
))
854 return kNoJobShouldRun
;
856 // Should stop if the network is not online.
857 if (net::NetworkChangeNotifier::IsOffline())
858 return kNoJobShouldRun
;
860 // For the file queue, if it is on cellular network, only user initiated
861 // operations are allowed to start.
862 if (queue_type
== FILE_QUEUE
&&
863 pref_service_
->GetBoolean(prefs::kDisableDriveOverCellular
) &&
864 net::NetworkChangeNotifier::IsConnectionCellular(
865 net::NetworkChangeNotifier::GetConnectionType()))
866 return USER_INITIATED
;
868 // Otherwise, every operations including background tasks are allowed.
872 void JobScheduler::UpdateWait() {
873 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
875 if (disable_throttling_
|| throttle_count_
== 0)
878 // Exponential backoff: https://developers.google.com/drive/handle-errors.
879 base::TimeDelta delay
=
880 base::TimeDelta::FromSeconds(1 << (throttle_count_
- 1)) +
881 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
882 VLOG(1) << "Throttling for " << delay
.InMillisecondsF();
884 wait_until_
= std::max(wait_until_
, base::Time::Now() + delay
);
887 bool JobScheduler::OnJobDone(JobID job_id
, google_apis::GDataErrorCode error
) {
888 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
890 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
892 JobInfo
* job_info
= &job_entry
->job_info
;
893 QueueType queue_type
= GetJobQueueType(job_info
->job_type
);
894 queue_
[queue_type
]->MarkFinished(job_id
);
896 const base::TimeDelta elapsed
= base::Time::Now() - job_info
->start_time
;
897 bool success
= (GDataToFileError(error
) == FILE_ERROR_OK
);
898 util::Log(success
? logging::LOG_INFO
: logging::LOG_WARNING
,
899 "Job done: %s => %s (elapsed time: %sms) - %s",
900 job_info
->ToString().c_str(),
901 GDataErrorCodeToString(error
).c_str(),
902 base::Int64ToString(elapsed
.InMilliseconds()).c_str(),
903 GetQueueInfo(queue_type
).c_str());
905 // Retry, depending on the error.
906 const bool is_server_error
=
907 error
== google_apis::HTTP_SERVICE_UNAVAILABLE
||
908 error
== google_apis::HTTP_INTERNAL_SERVER_ERROR
;
909 if (is_server_error
) {
910 if (throttle_count_
< kMaxThrottleCount
)
917 const bool should_retry
=
918 is_server_error
&& job_entry
->retry_count
< kMaxRetryCount
;
920 job_entry
->cancel_callback
.Reset();
921 job_info
->state
= STATE_RETRY
;
922 NotifyJobUpdated(*job_info
);
924 ++job_entry
->retry_count
;
929 NotifyJobDone(*job_info
, error
);
930 // The job has finished, no retry will happen in the scheduler. Now we can
931 // remove the job info from the map.
932 job_map_
.Remove(job_id
);
935 // Post a task to continue the job loop. This allows us to finish handling
936 // the current job before starting the next one.
937 base::MessageLoopProxy::current()->PostTask(FROM_HERE
,
938 base::Bind(&JobScheduler::DoJobLoop
,
939 weak_ptr_factory_
.GetWeakPtr(),
941 return !should_retry
;
944 void JobScheduler::OnGetResourceListJobDone(
946 const google_apis::GetResourceListCallback
& callback
,
947 google_apis::GDataErrorCode error
,
948 scoped_ptr
<google_apis::ResourceList
> resource_list
) {
949 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
950 DCHECK(!callback
.is_null());
952 if (OnJobDone(job_id
, error
))
953 callback
.Run(error
, resource_list
.Pass());
956 void JobScheduler::OnGetResourceEntryJobDone(
958 const google_apis::GetResourceEntryCallback
& callback
,
959 google_apis::GDataErrorCode error
,
960 scoped_ptr
<google_apis::ResourceEntry
> entry
) {
961 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
962 DCHECK(!callback
.is_null());
964 if (OnJobDone(job_id
, error
))
965 callback
.Run(error
, entry
.Pass());
968 void JobScheduler::OnGetAboutResourceJobDone(
970 const google_apis::AboutResourceCallback
& callback
,
971 google_apis::GDataErrorCode error
,
972 scoped_ptr
<google_apis::AboutResource
> about_resource
) {
973 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
974 DCHECK(!callback
.is_null());
976 if (OnJobDone(job_id
, error
))
977 callback
.Run(error
, about_resource
.Pass());
980 void JobScheduler::OnGetShareUrlJobDone(
982 const google_apis::GetShareUrlCallback
& callback
,
983 google_apis::GDataErrorCode error
,
984 const GURL
& share_url
) {
985 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
986 DCHECK(!callback
.is_null());
988 if (OnJobDone(job_id
, error
))
989 callback
.Run(error
, share_url
);
992 void JobScheduler::OnGetAppListJobDone(
994 const google_apis::AppListCallback
& callback
,
995 google_apis::GDataErrorCode error
,
996 scoped_ptr
<google_apis::AppList
> app_list
) {
997 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
998 DCHECK(!callback
.is_null());
1000 if (OnJobDone(job_id
, error
))
1001 callback
.Run(error
, app_list
.Pass());
1004 void JobScheduler::OnEntryActionJobDone(
1006 const google_apis::EntryActionCallback
& callback
,
1007 google_apis::GDataErrorCode error
) {
1008 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
1009 DCHECK(!callback
.is_null());
1011 if (OnJobDone(job_id
, error
))
1012 callback
.Run(error
);
1015 void JobScheduler::OnDownloadActionJobDone(
1017 const google_apis::DownloadActionCallback
& callback
,
1018 google_apis::GDataErrorCode error
,
1019 const base::FilePath
& temp_file
) {
1020 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
1021 DCHECK(!callback
.is_null());
1023 if (OnJobDone(job_id
, error
))
1024 callback
.Run(error
, temp_file
);
1027 void JobScheduler::OnUploadCompletionJobDone(
1029 const ResumeUploadParams
& resume_params
,
1030 const google_apis::GetResourceEntryCallback
& callback
,
1031 google_apis::GDataErrorCode error
,
1032 const GURL
& upload_location
,
1033 scoped_ptr
<google_apis::ResourceEntry
> resource_entry
) {
1034 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
1035 DCHECK(!callback
.is_null());
1037 if (!upload_location
.is_empty()) {
1038 // If upload_location is available, update the task to resume the
1039 // upload process from the terminated point.
1040 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1041 // so OnJobDone called below will be in charge to re-queue the job.
1042 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
1045 ResumeUploadFileParams params
;
1046 params
.upload_location
= upload_location
;
1047 params
.local_file_path
= resume_params
.local_file_path
;
1048 params
.content_type
= resume_params
.content_type
;
1049 params
.callback
= base::Bind(&JobScheduler::OnResumeUploadFileDone
,
1050 weak_ptr_factory_
.GetWeakPtr(),
1054 params
.progress_callback
= base::Bind(&JobScheduler::UpdateProgress
,
1055 weak_ptr_factory_
.GetWeakPtr(),
1057 job_entry
->task
= base::Bind(&RunResumeUploadFile
, uploader_
.get(), params
);
1060 if (OnJobDone(job_id
, error
))
1061 callback
.Run(error
, resource_entry
.Pass());
1064 void JobScheduler::OnResumeUploadFileDone(
1066 const base::Callback
<google_apis::CancelCallback()>& original_task
,
1067 const google_apis::GetResourceEntryCallback
& callback
,
1068 google_apis::GDataErrorCode error
,
1069 const GURL
& upload_location
,
1070 scoped_ptr
<google_apis::ResourceEntry
> resource_entry
) {
1071 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
1072 DCHECK(!original_task
.is_null());
1073 DCHECK(!callback
.is_null());
1075 if (upload_location
.is_empty()) {
1076 // If upload_location is not available, we should discard it and stop trying
1077 // to resume. Restore the original task.
1078 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
1080 job_entry
->task
= original_task
;
1083 if (OnJobDone(job_id
, error
))
1084 callback
.Run(error
, resource_entry
.Pass());
1087 void JobScheduler::UpdateProgress(JobID job_id
, int64 progress
, int64 total
) {
1088 JobEntry
* job_entry
= job_map_
.Lookup(job_id
);
1091 job_entry
->job_info
.num_completed_bytes
= progress
;
1093 job_entry
->job_info
.num_total_bytes
= total
;
1094 NotifyJobUpdated(job_entry
->job_info
);
1097 void JobScheduler::OnConnectionTypeChanged(
1098 net::NetworkChangeNotifier::ConnectionType type
) {
1099 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
1101 // Resume the job loop.
1102 // Note that we don't need to check the network connection status as it will
1103 // be checked in GetCurrentAcceptedPriority().
1104 for (int i
= METADATA_QUEUE
; i
< NUM_QUEUES
; ++i
)
1105 DoJobLoop(static_cast<QueueType
>(i
));
1108 JobScheduler::QueueType
JobScheduler::GetJobQueueType(JobType type
) {
1110 case TYPE_GET_ABOUT_RESOURCE
:
1111 case TYPE_GET_APP_LIST
:
1112 case TYPE_GET_ALL_RESOURCE_LIST
:
1113 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY
:
1115 case TYPE_GET_CHANGE_LIST
:
1116 case TYPE_GET_REMAINING_CHANGE_LIST
:
1117 case TYPE_GET_REMAINING_FILE_LIST
:
1118 case TYPE_GET_RESOURCE_ENTRY
:
1119 case TYPE_GET_SHARE_URL
:
1120 case TYPE_TRASH_RESOURCE
:
1121 case TYPE_COPY_RESOURCE
:
1122 case TYPE_UPDATE_RESOURCE
:
1123 case TYPE_RENAME_RESOURCE
:
1124 case TYPE_ADD_RESOURCE_TO_DIRECTORY
:
1125 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY
:
1126 case TYPE_ADD_NEW_DIRECTORY
:
1127 case TYPE_CREATE_FILE
:
1128 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI
:
1129 case TYPE_GET_REMAINING_RESOURCE_LIST
:
1130 return METADATA_QUEUE
;
1132 case TYPE_DOWNLOAD_FILE
:
1133 case TYPE_UPLOAD_NEW_FILE
:
1134 case TYPE_UPLOAD_EXISTING_FILE
:
1141 void JobScheduler::AbortNotRunningJob(JobEntry
* job
,
1142 google_apis::GDataErrorCode error
) {
1143 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
1145 const base::TimeDelta elapsed
= base::Time::Now() - job
->job_info
.start_time
;
1146 const QueueType queue_type
= GetJobQueueType(job
->job_info
.job_type
);
1147 util::Log(logging::LOG_INFO
,
1148 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1149 job
->job_info
.ToString().c_str(),
1150 GDataErrorCodeToString(error
).c_str(),
1151 base::Int64ToString(elapsed
.InMilliseconds()).c_str(),
1152 GetQueueInfo(queue_type
).c_str());
1154 base::Callback
<void(google_apis::GDataErrorCode
)> callback
=
1155 job
->abort_callback
;
1156 queue_
[GetJobQueueType(job
->job_info
.job_type
)]->Remove(job
->job_info
.job_id
);
1157 NotifyJobDone(job
->job_info
, error
);
1158 job_map_
.Remove(job
->job_info
.job_id
);
1159 base::MessageLoopProxy::current()->PostTask(FROM_HERE
,
1160 base::Bind(callback
, error
));
1163 void JobScheduler::NotifyJobAdded(const JobInfo
& job_info
) {
1164 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
1165 FOR_EACH_OBSERVER(JobListObserver
, observer_list_
, OnJobAdded(job_info
));
1168 void JobScheduler::NotifyJobDone(const JobInfo
& job_info
,
1169 google_apis::GDataErrorCode error
) {
1170 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
1171 FOR_EACH_OBSERVER(JobListObserver
, observer_list_
,
1172 OnJobDone(job_info
, GDataToFileError(error
)));
1175 void JobScheduler::NotifyJobUpdated(const JobInfo
& job_info
) {
1176 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI
));
1177 FOR_EACH_OBSERVER(JobListObserver
, observer_list_
, OnJobUpdated(job_info
));
1180 std::string
JobScheduler::GetQueueInfo(QueueType type
) const {
1181 return QueueTypeToString(type
) + " " + queue_
[type
]->ToString();
1185 std::string
JobScheduler::QueueTypeToString(QueueType type
) {
1187 case METADATA_QUEUE
:
1188 return "METADATA_QUEUE";
1190 return "FILE_QUEUE";
1192 break; // This value is just a sentinel. Should never be used.
1198 } // namespace drive