Disable view source for Developer Tools.
[chromium-blink-merge.git] / chrome / browser / chromeos / drive / job_scheduler.cc
blob723cc918fd2a896d325c4c680d8623ecd0c26a3d
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/job_scheduler.h"
7 #include "base/message_loop/message_loop.h"
8 #include "base/prefs/pref_service.h"
9 #include "base/rand_util.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/strings/stringprintf.h"
12 #include "chrome/browser/chromeos/drive/file_system_util.h"
13 #include "chrome/browser/chromeos/drive/logging.h"
14 #include "chrome/common/pref_names.h"
15 #include "content/public/browser/browser_thread.h"
16 #include "google_apis/drive/drive_api_parser.h"
18 using content::BrowserThread;
20 namespace drive {
22 namespace {
24 // All jobs are retried at maximum of kMaxRetryCount when they fail due to
25 // throttling or server error. The delay before retrying a job is shared among
26 // jobs. It doubles in length on each failure, upto 2^kMaxThrottleCount seconds.
28 // According to the API documentation, kMaxRetryCount should be the same as
29 // kMaxThrottleCount (https://developers.google.com/drive/handle-errors).
30 // But currently multiplied by 2 to ensure upload related jobs retried for a
31 // sufficient number of times. crbug.com/269918
32 const int kMaxThrottleCount = 4;
33 const int kMaxRetryCount = 2 * kMaxThrottleCount;
35 // GetDefaultValue returns a value constructed by the default constructor.
36 template<typename T> struct DefaultValueCreator {
37 static T GetDefaultValue() { return T(); }
39 template<typename T> struct DefaultValueCreator<const T&> {
40 static T GetDefaultValue() { return T(); }
43 // Helper of CreateErrorRunCallback implementation.
44 // Provides:
45 // - ResultType; the type of the Callback which should be returned by
46 // CreateErrorRunCallback.
47 // - Run(): a static function which takes the original |callback| and |error|,
48 // and runs the |callback|.Run() with the error code and default values
49 // for remaining arguments.
50 template<typename CallbackType> struct CreateErrorRunCallbackHelper;
52 // CreateErrorRunCallback with two arguments.
53 template<typename P1>
54 struct CreateErrorRunCallbackHelper<void(google_apis::GDataErrorCode, P1)> {
55 static void Run(
56 const base::Callback<void(google_apis::GDataErrorCode, P1)>& callback,
57 google_apis::GDataErrorCode error) {
58 callback.Run(error, DefaultValueCreator<P1>::GetDefaultValue());
62 // Returns a callback with the tail parameter bound to its default value.
63 // In other words, returned_callback.Run(error) runs callback.Run(error, T()).
64 template<typename CallbackType>
65 base::Callback<void(google_apis::GDataErrorCode)>
66 CreateErrorRunCallback(const base::Callback<CallbackType>& callback) {
67 return base::Bind(&CreateErrorRunCallbackHelper<CallbackType>::Run, callback);
70 // Parameter struct for RunUploadNewFile.
71 struct UploadNewFileParams {
72 std::string parent_resource_id;
73 base::FilePath local_file_path;
74 std::string title;
75 std::string content_type;
76 UploadCompletionCallback callback;
77 google_apis::ProgressCallback progress_callback;
80 // Helper function to work around the arity limitation of base::Bind.
81 google_apis::CancelCallback RunUploadNewFile(
82 DriveUploaderInterface* uploader,
83 const UploadNewFileParams& params) {
84 return uploader->UploadNewFile(params.parent_resource_id,
85 params.local_file_path,
86 params.title,
87 params.content_type,
88 params.callback,
89 params.progress_callback);
92 // Parameter struct for RunUploadExistingFile.
93 struct UploadExistingFileParams {
94 std::string resource_id;
95 base::FilePath local_file_path;
96 std::string content_type;
97 std::string etag;
98 UploadCompletionCallback callback;
99 google_apis::ProgressCallback progress_callback;
102 // Helper function to work around the arity limitation of base::Bind.
103 google_apis::CancelCallback RunUploadExistingFile(
104 DriveUploaderInterface* uploader,
105 const UploadExistingFileParams& params) {
106 return uploader->UploadExistingFile(params.resource_id,
107 params.local_file_path,
108 params.content_type,
109 params.etag,
110 params.callback,
111 params.progress_callback);
114 // Parameter struct for RunResumeUploadFile.
115 struct ResumeUploadFileParams {
116 GURL upload_location;
117 base::FilePath local_file_path;
118 std::string content_type;
119 UploadCompletionCallback callback;
120 google_apis::ProgressCallback progress_callback;
123 // Helper function to adjust the return type.
124 google_apis::CancelCallback RunResumeUploadFile(
125 DriveUploaderInterface* uploader,
126 const ResumeUploadFileParams& params) {
127 return uploader->ResumeUploadFile(params.upload_location,
128 params.local_file_path,
129 params.content_type,
130 params.callback,
131 params.progress_callback);
134 } // namespace
136 // Metadata jobs are cheap, so we run them concurrently. File jobs run serially.
137 const int JobScheduler::kMaxJobCount[] = {
138 5, // METADATA_QUEUE
139 1, // FILE_QUEUE
142 JobScheduler::JobEntry::JobEntry(JobType type)
143 : job_info(type),
144 context(ClientContext(USER_INITIATED)),
145 retry_count(0) {
146 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
149 JobScheduler::JobEntry::~JobEntry() {
150 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
153 struct JobScheduler::ResumeUploadParams {
154 base::FilePath drive_file_path;
155 base::FilePath local_file_path;
156 std::string content_type;
159 JobScheduler::JobScheduler(
160 PrefService* pref_service,
161 DriveServiceInterface* drive_service,
162 base::SequencedTaskRunner* blocking_task_runner)
163 : throttle_count_(0),
164 wait_until_(base::Time::Now()),
165 disable_throttling_(false),
166 drive_service_(drive_service),
167 uploader_(new DriveUploader(drive_service, blocking_task_runner)),
168 pref_service_(pref_service),
169 weak_ptr_factory_(this) {
170 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
172 for (int i = 0; i < NUM_QUEUES; ++i)
173 queue_[i].reset(new JobQueue(kMaxJobCount[i], NUM_CONTEXT_TYPES));
175 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
178 JobScheduler::~JobScheduler() {
179 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
181 size_t num_queued_jobs = 0;
182 for (int i = 0; i < NUM_QUEUES; ++i)
183 num_queued_jobs += queue_[i]->GetNumberOfJobs();
184 DCHECK_EQ(num_queued_jobs, job_map_.size());
186 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
189 std::vector<JobInfo> JobScheduler::GetJobInfoList() {
190 std::vector<JobInfo> job_info_list;
191 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
192 job_info_list.push_back(iter.GetCurrentValue()->job_info);
193 return job_info_list;
196 void JobScheduler::AddObserver(JobListObserver* observer) {
197 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
198 observer_list_.AddObserver(observer);
201 void JobScheduler::RemoveObserver(JobListObserver* observer) {
202 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
203 observer_list_.RemoveObserver(observer);
206 void JobScheduler::CancelJob(JobID job_id) {
207 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
209 JobEntry* job = job_map_.Lookup(job_id);
210 if (job) {
211 if (job->job_info.state == STATE_RUNNING) {
212 // If the job is running an HTTP request, cancel it via |cancel_callback|
213 // returned from the request, and wait for termination in the normal
214 // callback handler, OnJobDone.
215 if (!job->cancel_callback.is_null())
216 job->cancel_callback.Run();
217 } else {
218 AbortNotRunningJob(job, google_apis::GDATA_CANCELLED);
223 void JobScheduler::CancelAllJobs() {
224 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
226 // CancelJob may remove the entry from |job_map_|. That's OK. IDMap supports
227 // removable during iteration.
228 for (JobIDMap::iterator iter(&job_map_); !iter.IsAtEnd(); iter.Advance())
229 CancelJob(iter.GetCurrentKey());
232 void JobScheduler::GetAboutResource(
233 const google_apis::AboutResourceCallback& callback) {
234 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
235 DCHECK(!callback.is_null());
237 JobEntry* new_job = CreateNewJob(TYPE_GET_ABOUT_RESOURCE);
238 new_job->task = base::Bind(
239 &DriveServiceInterface::GetAboutResource,
240 base::Unretained(drive_service_),
241 base::Bind(&JobScheduler::OnGetAboutResourceJobDone,
242 weak_ptr_factory_.GetWeakPtr(),
243 new_job->job_info.job_id,
244 callback));
245 new_job->abort_callback = CreateErrorRunCallback(callback);
246 StartJob(new_job);
249 void JobScheduler::GetAppList(const google_apis::AppListCallback& callback) {
250 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
251 DCHECK(!callback.is_null());
253 JobEntry* new_job = CreateNewJob(TYPE_GET_APP_LIST);
254 new_job->task = base::Bind(
255 &DriveServiceInterface::GetAppList,
256 base::Unretained(drive_service_),
257 base::Bind(&JobScheduler::OnGetAppListJobDone,
258 weak_ptr_factory_.GetWeakPtr(),
259 new_job->job_info.job_id,
260 callback));
261 new_job->abort_callback = CreateErrorRunCallback(callback);
262 StartJob(new_job);
265 void JobScheduler::GetAllResourceList(
266 const google_apis::GetResourceListCallback& callback) {
267 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
268 DCHECK(!callback.is_null());
270 JobEntry* new_job = CreateNewJob(TYPE_GET_ALL_RESOURCE_LIST);
271 new_job->task = base::Bind(
272 &DriveServiceInterface::GetAllResourceList,
273 base::Unretained(drive_service_),
274 base::Bind(&JobScheduler::OnGetResourceListJobDone,
275 weak_ptr_factory_.GetWeakPtr(),
276 new_job->job_info.job_id,
277 callback));
278 new_job->abort_callback = CreateErrorRunCallback(callback);
279 StartJob(new_job);
282 void JobScheduler::GetResourceListInDirectory(
283 const std::string& directory_resource_id,
284 const google_apis::GetResourceListCallback& callback) {
285 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
286 DCHECK(!callback.is_null());
288 JobEntry* new_job = CreateNewJob(
289 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY);
290 new_job->task = base::Bind(
291 &DriveServiceInterface::GetResourceListInDirectory,
292 base::Unretained(drive_service_),
293 directory_resource_id,
294 base::Bind(&JobScheduler::OnGetResourceListJobDone,
295 weak_ptr_factory_.GetWeakPtr(),
296 new_job->job_info.job_id,
297 callback));
298 new_job->abort_callback = CreateErrorRunCallback(callback);
299 StartJob(new_job);
302 void JobScheduler::Search(
303 const std::string& search_query,
304 const google_apis::GetResourceListCallback& callback) {
305 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
306 DCHECK(!callback.is_null());
308 JobEntry* new_job = CreateNewJob(TYPE_SEARCH);
309 new_job->task = base::Bind(
310 &DriveServiceInterface::Search,
311 base::Unretained(drive_service_),
312 search_query,
313 base::Bind(&JobScheduler::OnGetResourceListJobDone,
314 weak_ptr_factory_.GetWeakPtr(),
315 new_job->job_info.job_id,
316 callback));
317 new_job->abort_callback = CreateErrorRunCallback(callback);
318 StartJob(new_job);
321 void JobScheduler::GetChangeList(
322 int64 start_changestamp,
323 const google_apis::GetResourceListCallback& callback) {
324 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
325 DCHECK(!callback.is_null());
327 JobEntry* new_job = CreateNewJob(TYPE_GET_CHANGE_LIST);
328 new_job->task = base::Bind(
329 &DriveServiceInterface::GetChangeList,
330 base::Unretained(drive_service_),
331 start_changestamp,
332 base::Bind(&JobScheduler::OnGetResourceListJobDone,
333 weak_ptr_factory_.GetWeakPtr(),
334 new_job->job_info.job_id,
335 callback));
336 new_job->abort_callback = CreateErrorRunCallback(callback);
337 StartJob(new_job);
340 void JobScheduler::GetRemainingChangeList(
341 const GURL& next_link,
342 const google_apis::GetResourceListCallback& callback) {
343 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
344 DCHECK(!callback.is_null());
346 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_CHANGE_LIST);
347 new_job->task = base::Bind(
348 &DriveServiceInterface::GetRemainingChangeList,
349 base::Unretained(drive_service_),
350 next_link,
351 base::Bind(&JobScheduler::OnGetResourceListJobDone,
352 weak_ptr_factory_.GetWeakPtr(),
353 new_job->job_info.job_id,
354 callback));
355 new_job->abort_callback = CreateErrorRunCallback(callback);
356 StartJob(new_job);
359 void JobScheduler::GetRemainingFileList(
360 const GURL& next_link,
361 const google_apis::GetResourceListCallback& callback) {
362 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
363 DCHECK(!callback.is_null());
365 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_FILE_LIST);
366 new_job->task = base::Bind(
367 &DriveServiceInterface::GetRemainingFileList,
368 base::Unretained(drive_service_),
369 next_link,
370 base::Bind(&JobScheduler::OnGetResourceListJobDone,
371 weak_ptr_factory_.GetWeakPtr(),
372 new_job->job_info.job_id,
373 callback));
374 new_job->abort_callback = CreateErrorRunCallback(callback);
375 StartJob(new_job);
378 void JobScheduler::GetResourceEntry(
379 const std::string& resource_id,
380 const ClientContext& context,
381 const google_apis::GetResourceEntryCallback& callback) {
382 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
383 DCHECK(!callback.is_null());
385 JobEntry* new_job = CreateNewJob(TYPE_GET_RESOURCE_ENTRY);
386 new_job->context = context;
387 new_job->task = base::Bind(
388 &DriveServiceInterface::GetResourceEntry,
389 base::Unretained(drive_service_),
390 resource_id,
391 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
392 weak_ptr_factory_.GetWeakPtr(),
393 new_job->job_info.job_id,
394 callback));
395 new_job->abort_callback = CreateErrorRunCallback(callback);
396 StartJob(new_job);
399 void JobScheduler::GetShareUrl(
400 const std::string& resource_id,
401 const GURL& embed_origin,
402 const ClientContext& context,
403 const google_apis::GetShareUrlCallback& callback) {
404 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
405 DCHECK(!callback.is_null());
407 JobEntry* new_job = CreateNewJob(TYPE_GET_SHARE_URL);
408 new_job->context = context;
409 new_job->task = base::Bind(
410 &DriveServiceInterface::GetShareUrl,
411 base::Unretained(drive_service_),
412 resource_id,
413 embed_origin,
414 base::Bind(&JobScheduler::OnGetShareUrlJobDone,
415 weak_ptr_factory_.GetWeakPtr(),
416 new_job->job_info.job_id,
417 callback));
418 new_job->abort_callback = CreateErrorRunCallback(callback);
419 StartJob(new_job);
422 void JobScheduler::TrashResource(
423 const std::string& resource_id,
424 const ClientContext& context,
425 const google_apis::EntryActionCallback& callback) {
426 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
427 DCHECK(!callback.is_null());
429 JobEntry* new_job = CreateNewJob(TYPE_TRASH_RESOURCE);
430 new_job->context = context;
431 new_job->task = base::Bind(
432 &DriveServiceInterface::TrashResource,
433 base::Unretained(drive_service_),
434 resource_id,
435 base::Bind(&JobScheduler::OnEntryActionJobDone,
436 weak_ptr_factory_.GetWeakPtr(),
437 new_job->job_info.job_id,
438 callback));
439 new_job->abort_callback = callback;
440 StartJob(new_job);
443 void JobScheduler::CopyResource(
444 const std::string& resource_id,
445 const std::string& parent_resource_id,
446 const std::string& new_title,
447 const base::Time& last_modified,
448 const google_apis::GetResourceEntryCallback& callback) {
449 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
450 DCHECK(!callback.is_null());
452 JobEntry* new_job = CreateNewJob(TYPE_COPY_RESOURCE);
453 new_job->task = base::Bind(
454 &DriveServiceInterface::CopyResource,
455 base::Unretained(drive_service_),
456 resource_id,
457 parent_resource_id,
458 new_title,
459 last_modified,
460 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
461 weak_ptr_factory_.GetWeakPtr(),
462 new_job->job_info.job_id,
463 callback));
464 new_job->abort_callback = CreateErrorRunCallback(callback);
465 StartJob(new_job);
468 void JobScheduler::UpdateResource(
469 const std::string& resource_id,
470 const std::string& parent_resource_id,
471 const std::string& new_title,
472 const base::Time& last_modified,
473 const base::Time& last_viewed_by_me,
474 const ClientContext& context,
475 const google_apis::GetResourceEntryCallback& callback) {
476 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
477 DCHECK(!callback.is_null());
479 JobEntry* new_job = CreateNewJob(TYPE_UPDATE_RESOURCE);
480 new_job->context = context;
481 new_job->task = base::Bind(
482 &DriveServiceInterface::UpdateResource,
483 base::Unretained(drive_service_),
484 resource_id,
485 parent_resource_id,
486 new_title,
487 last_modified,
488 last_viewed_by_me,
489 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
490 weak_ptr_factory_.GetWeakPtr(),
491 new_job->job_info.job_id,
492 callback));
493 new_job->abort_callback = CreateErrorRunCallback(callback);
494 StartJob(new_job);
497 void JobScheduler::RenameResource(
498 const std::string& resource_id,
499 const std::string& new_title,
500 const google_apis::EntryActionCallback& callback) {
501 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
502 DCHECK(!callback.is_null());
504 JobEntry* new_job = CreateNewJob(TYPE_RENAME_RESOURCE);
505 new_job->task = base::Bind(
506 &DriveServiceInterface::RenameResource,
507 base::Unretained(drive_service_),
508 resource_id,
509 new_title,
510 base::Bind(&JobScheduler::OnEntryActionJobDone,
511 weak_ptr_factory_.GetWeakPtr(),
512 new_job->job_info.job_id,
513 callback));
514 new_job->abort_callback = callback;
515 StartJob(new_job);
518 void JobScheduler::AddResourceToDirectory(
519 const std::string& parent_resource_id,
520 const std::string& resource_id,
521 const google_apis::EntryActionCallback& callback) {
522 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
523 DCHECK(!callback.is_null());
525 JobEntry* new_job = CreateNewJob(TYPE_ADD_RESOURCE_TO_DIRECTORY);
526 new_job->task = base::Bind(
527 &DriveServiceInterface::AddResourceToDirectory,
528 base::Unretained(drive_service_),
529 parent_resource_id,
530 resource_id,
531 base::Bind(&JobScheduler::OnEntryActionJobDone,
532 weak_ptr_factory_.GetWeakPtr(),
533 new_job->job_info.job_id,
534 callback));
535 new_job->abort_callback = callback;
536 StartJob(new_job);
539 void JobScheduler::RemoveResourceFromDirectory(
540 const std::string& parent_resource_id,
541 const std::string& resource_id,
542 const ClientContext& context,
543 const google_apis::EntryActionCallback& callback) {
544 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
546 JobEntry* new_job = CreateNewJob(TYPE_REMOVE_RESOURCE_FROM_DIRECTORY);
547 new_job->context = context;
548 new_job->task = base::Bind(
549 &DriveServiceInterface::RemoveResourceFromDirectory,
550 base::Unretained(drive_service_),
551 parent_resource_id,
552 resource_id,
553 base::Bind(&JobScheduler::OnEntryActionJobDone,
554 weak_ptr_factory_.GetWeakPtr(),
555 new_job->job_info.job_id,
556 callback));
557 new_job->abort_callback = callback;
558 StartJob(new_job);
561 void JobScheduler::AddNewDirectory(
562 const std::string& parent_resource_id,
563 const std::string& directory_title,
564 const google_apis::GetResourceEntryCallback& callback) {
565 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
567 JobEntry* new_job = CreateNewJob(TYPE_ADD_NEW_DIRECTORY);
568 new_job->task = base::Bind(
569 &DriveServiceInterface::AddNewDirectory,
570 base::Unretained(drive_service_),
571 parent_resource_id,
572 directory_title,
573 base::Bind(&JobScheduler::OnGetResourceEntryJobDone,
574 weak_ptr_factory_.GetWeakPtr(),
575 new_job->job_info.job_id,
576 callback));
577 new_job->abort_callback = CreateErrorRunCallback(callback);
578 StartJob(new_job);
581 JobID JobScheduler::DownloadFile(
582 const base::FilePath& virtual_path,
583 int64 expected_file_size,
584 const base::FilePath& local_cache_path,
585 const std::string& resource_id,
586 const ClientContext& context,
587 const google_apis::DownloadActionCallback& download_action_callback,
588 const google_apis::GetContentCallback& get_content_callback) {
589 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
591 JobEntry* new_job = CreateNewJob(TYPE_DOWNLOAD_FILE);
592 new_job->job_info.file_path = virtual_path;
593 new_job->job_info.num_total_bytes = expected_file_size;
594 new_job->context = context;
595 new_job->task = base::Bind(
596 &DriveServiceInterface::DownloadFile,
597 base::Unretained(drive_service_),
598 local_cache_path,
599 resource_id,
600 base::Bind(&JobScheduler::OnDownloadActionJobDone,
601 weak_ptr_factory_.GetWeakPtr(),
602 new_job->job_info.job_id,
603 download_action_callback),
604 get_content_callback,
605 base::Bind(&JobScheduler::UpdateProgress,
606 weak_ptr_factory_.GetWeakPtr(),
607 new_job->job_info.job_id));
608 new_job->abort_callback = CreateErrorRunCallback(download_action_callback);
609 StartJob(new_job);
610 return new_job->job_info.job_id;
613 void JobScheduler::UploadNewFile(
614 const std::string& parent_resource_id,
615 const base::FilePath& drive_file_path,
616 const base::FilePath& local_file_path,
617 const std::string& title,
618 const std::string& content_type,
619 const ClientContext& context,
620 const google_apis::GetResourceEntryCallback& callback) {
621 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
623 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_NEW_FILE);
624 new_job->job_info.file_path = drive_file_path;
625 new_job->context = context;
627 UploadNewFileParams params;
628 params.parent_resource_id = parent_resource_id;
629 params.local_file_path = local_file_path;
630 params.title = title;
631 params.content_type = content_type;
633 ResumeUploadParams resume_params;
634 resume_params.local_file_path = params.local_file_path;
635 resume_params.content_type = params.content_type;
637 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
638 weak_ptr_factory_.GetWeakPtr(),
639 new_job->job_info.job_id,
640 resume_params,
641 callback);
642 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
643 weak_ptr_factory_.GetWeakPtr(),
644 new_job->job_info.job_id);
645 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
646 new_job->abort_callback = CreateErrorRunCallback(callback);
647 StartJob(new_job);
650 void JobScheduler::UploadExistingFile(
651 const std::string& resource_id,
652 const base::FilePath& drive_file_path,
653 const base::FilePath& local_file_path,
654 const std::string& content_type,
655 const std::string& etag,
656 const ClientContext& context,
657 const google_apis::GetResourceEntryCallback& callback) {
658 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
660 JobEntry* new_job = CreateNewJob(TYPE_UPLOAD_EXISTING_FILE);
661 new_job->job_info.file_path = drive_file_path;
662 new_job->context = context;
664 UploadExistingFileParams params;
665 params.resource_id = resource_id;
666 params.local_file_path = local_file_path;
667 params.content_type = content_type;
668 params.etag = etag;
670 ResumeUploadParams resume_params;
671 resume_params.local_file_path = params.local_file_path;
672 resume_params.content_type = params.content_type;
674 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
675 weak_ptr_factory_.GetWeakPtr(),
676 new_job->job_info.job_id,
677 resume_params,
678 callback);
679 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
680 weak_ptr_factory_.GetWeakPtr(),
681 new_job->job_info.job_id);
682 new_job->task = base::Bind(&RunUploadExistingFile, uploader_.get(), params);
683 new_job->abort_callback = CreateErrorRunCallback(callback);
684 StartJob(new_job);
687 void JobScheduler::CreateFile(
688 const std::string& parent_resource_id,
689 const base::FilePath& drive_file_path,
690 const std::string& title,
691 const std::string& content_type,
692 const ClientContext& context,
693 const google_apis::GetResourceEntryCallback& callback) {
694 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
696 const base::FilePath kDevNull(FILE_PATH_LITERAL("/dev/null"));
698 JobEntry* new_job = CreateNewJob(TYPE_CREATE_FILE);
699 new_job->job_info.file_path = drive_file_path;
700 new_job->context = context;
702 UploadNewFileParams params;
703 params.parent_resource_id = parent_resource_id;
704 params.local_file_path = kDevNull; // Upload an empty file.
705 params.title = title;
706 params.content_type = content_type;
708 ResumeUploadParams resume_params;
709 resume_params.local_file_path = params.local_file_path;
710 resume_params.content_type = params.content_type;
712 params.callback = base::Bind(&JobScheduler::OnUploadCompletionJobDone,
713 weak_ptr_factory_.GetWeakPtr(),
714 new_job->job_info.job_id,
715 resume_params,
716 callback);
717 params.progress_callback = google_apis::ProgressCallback();
719 new_job->task = base::Bind(&RunUploadNewFile, uploader_.get(), params);
720 new_job->abort_callback = CreateErrorRunCallback(callback);
721 StartJob(new_job);
724 void JobScheduler::GetResourceListInDirectoryByWapi(
725 const std::string& directory_resource_id,
726 const google_apis::GetResourceListCallback& callback) {
727 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
728 DCHECK(!callback.is_null());
730 JobEntry* new_job = CreateNewJob(
731 TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI);
732 new_job->task = base::Bind(
733 &DriveServiceInterface::GetResourceListInDirectoryByWapi,
734 base::Unretained(drive_service_),
735 directory_resource_id,
736 base::Bind(&JobScheduler::OnGetResourceListJobDone,
737 weak_ptr_factory_.GetWeakPtr(),
738 new_job->job_info.job_id,
739 callback));
740 new_job->abort_callback = CreateErrorRunCallback(callback);
741 StartJob(new_job);
744 void JobScheduler::GetRemainingResourceList(
745 const GURL& next_link,
746 const google_apis::GetResourceListCallback& callback) {
747 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
748 DCHECK(!callback.is_null());
750 JobEntry* new_job = CreateNewJob(TYPE_GET_REMAINING_RESOURCE_LIST);
751 new_job->task = base::Bind(
752 &DriveServiceInterface::GetRemainingResourceList,
753 base::Unretained(drive_service_),
754 next_link,
755 base::Bind(&JobScheduler::OnGetResourceListJobDone,
756 weak_ptr_factory_.GetWeakPtr(),
757 new_job->job_info.job_id,
758 callback));
759 new_job->abort_callback = CreateErrorRunCallback(callback);
760 StartJob(new_job);
763 JobScheduler::JobEntry* JobScheduler::CreateNewJob(JobType type) {
764 JobEntry* job = new JobEntry(type);
765 job->job_info.job_id = job_map_.Add(job); // Takes the ownership of |job|.
766 return job;
769 void JobScheduler::StartJob(JobEntry* job) {
770 DCHECK(!job->task.is_null());
772 QueueJob(job->job_info.job_id);
773 NotifyJobAdded(job->job_info);
774 DoJobLoop(GetJobQueueType(job->job_info.job_type));
777 void JobScheduler::QueueJob(JobID job_id) {
778 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
780 JobEntry* job_entry = job_map_.Lookup(job_id);
781 DCHECK(job_entry);
782 const JobInfo& job_info = job_entry->job_info;
784 QueueType queue_type = GetJobQueueType(job_info.job_type);
785 queue_[queue_type]->Push(job_id, job_entry->context.type);
787 const std::string retry_prefix = job_entry->retry_count > 0 ?
788 base::StringPrintf(" (retry %d)", job_entry->retry_count) : "";
789 util::Log(logging::LOG_INFO,
790 "Job queued%s: %s - %s",
791 retry_prefix.c_str(),
792 job_info.ToString().c_str(),
793 GetQueueInfo(queue_type).c_str());
796 void JobScheduler::DoJobLoop(QueueType queue_type) {
797 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
799 const int accepted_priority = GetCurrentAcceptedPriority(queue_type);
801 // Abort all USER_INITAITED jobs when not accepted.
802 if (accepted_priority < USER_INITIATED) {
803 std::vector<JobID> jobs;
804 queue_[queue_type]->GetQueuedJobs(USER_INITIATED, &jobs);
805 for (size_t i = 0; i < jobs.size(); ++i) {
806 JobEntry* job = job_map_.Lookup(jobs[i]);
807 DCHECK(job);
808 AbortNotRunningJob(job, google_apis::GDATA_NO_CONNECTION);
812 // Wait when throttled.
813 const base::Time now = base::Time::Now();
814 if (now < wait_until_) {
815 base::MessageLoopProxy::current()->PostDelayedTask(
816 FROM_HERE,
817 base::Bind(&JobScheduler::DoJobLoop,
818 weak_ptr_factory_.GetWeakPtr(),
819 queue_type),
820 wait_until_ - now);
821 return;
824 // Run the job with the highest priority in the queue.
825 JobID job_id = -1;
826 if (!queue_[queue_type]->PopForRun(accepted_priority, &job_id))
827 return;
829 JobEntry* entry = job_map_.Lookup(job_id);
830 DCHECK(entry);
832 JobInfo* job_info = &entry->job_info;
833 job_info->state = STATE_RUNNING;
834 job_info->start_time = now;
835 NotifyJobUpdated(*job_info);
837 entry->cancel_callback = entry->task.Run();
839 UpdateWait();
841 util::Log(logging::LOG_INFO,
842 "Job started: %s - %s",
843 job_info->ToString().c_str(),
844 GetQueueInfo(queue_type).c_str());
847 int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
848 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
850 const int kNoJobShouldRun = -1;
852 // Should stop if Drive was disabled while running the fetch loop.
853 if (pref_service_->GetBoolean(prefs::kDisableDrive))
854 return kNoJobShouldRun;
856 // Should stop if the network is not online.
857 if (net::NetworkChangeNotifier::IsOffline())
858 return kNoJobShouldRun;
860 // For the file queue, if it is on cellular network, only user initiated
861 // operations are allowed to start.
862 if (queue_type == FILE_QUEUE &&
863 pref_service_->GetBoolean(prefs::kDisableDriveOverCellular) &&
864 net::NetworkChangeNotifier::IsConnectionCellular(
865 net::NetworkChangeNotifier::GetConnectionType()))
866 return USER_INITIATED;
868 // Otherwise, every operations including background tasks are allowed.
869 return BACKGROUND;
872 void JobScheduler::UpdateWait() {
873 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
875 if (disable_throttling_ || throttle_count_ == 0)
876 return;
878 // Exponential backoff: https://developers.google.com/drive/handle-errors.
879 base::TimeDelta delay =
880 base::TimeDelta::FromSeconds(1 << (throttle_count_ - 1)) +
881 base::TimeDelta::FromMilliseconds(base::RandInt(0, 1000));
882 VLOG(1) << "Throttling for " << delay.InMillisecondsF();
884 wait_until_ = std::max(wait_until_, base::Time::Now() + delay);
887 bool JobScheduler::OnJobDone(JobID job_id, google_apis::GDataErrorCode error) {
888 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
890 JobEntry* job_entry = job_map_.Lookup(job_id);
891 DCHECK(job_entry);
892 JobInfo* job_info = &job_entry->job_info;
893 QueueType queue_type = GetJobQueueType(job_info->job_type);
894 queue_[queue_type]->MarkFinished(job_id);
896 const base::TimeDelta elapsed = base::Time::Now() - job_info->start_time;
897 bool success = (GDataToFileError(error) == FILE_ERROR_OK);
898 util::Log(success ? logging::LOG_INFO : logging::LOG_WARNING,
899 "Job done: %s => %s (elapsed time: %sms) - %s",
900 job_info->ToString().c_str(),
901 GDataErrorCodeToString(error).c_str(),
902 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
903 GetQueueInfo(queue_type).c_str());
905 // Retry, depending on the error.
906 const bool is_server_error =
907 error == google_apis::HTTP_SERVICE_UNAVAILABLE ||
908 error == google_apis::HTTP_INTERNAL_SERVER_ERROR;
909 if (is_server_error) {
910 if (throttle_count_ < kMaxThrottleCount)
911 ++throttle_count_;
912 UpdateWait();
913 } else {
914 throttle_count_ = 0;
917 const bool should_retry =
918 is_server_error && job_entry->retry_count < kMaxRetryCount;
919 if (should_retry) {
920 job_entry->cancel_callback.Reset();
921 job_info->state = STATE_RETRY;
922 NotifyJobUpdated(*job_info);
924 ++job_entry->retry_count;
926 // Requeue the job.
927 QueueJob(job_id);
928 } else {
929 NotifyJobDone(*job_info, error);
930 // The job has finished, no retry will happen in the scheduler. Now we can
931 // remove the job info from the map.
932 job_map_.Remove(job_id);
935 // Post a task to continue the job loop. This allows us to finish handling
936 // the current job before starting the next one.
937 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
938 base::Bind(&JobScheduler::DoJobLoop,
939 weak_ptr_factory_.GetWeakPtr(),
940 queue_type));
941 return !should_retry;
944 void JobScheduler::OnGetResourceListJobDone(
945 JobID job_id,
946 const google_apis::GetResourceListCallback& callback,
947 google_apis::GDataErrorCode error,
948 scoped_ptr<google_apis::ResourceList> resource_list) {
949 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
950 DCHECK(!callback.is_null());
952 if (OnJobDone(job_id, error))
953 callback.Run(error, resource_list.Pass());
956 void JobScheduler::OnGetResourceEntryJobDone(
957 JobID job_id,
958 const google_apis::GetResourceEntryCallback& callback,
959 google_apis::GDataErrorCode error,
960 scoped_ptr<google_apis::ResourceEntry> entry) {
961 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
962 DCHECK(!callback.is_null());
964 if (OnJobDone(job_id, error))
965 callback.Run(error, entry.Pass());
968 void JobScheduler::OnGetAboutResourceJobDone(
969 JobID job_id,
970 const google_apis::AboutResourceCallback& callback,
971 google_apis::GDataErrorCode error,
972 scoped_ptr<google_apis::AboutResource> about_resource) {
973 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
974 DCHECK(!callback.is_null());
976 if (OnJobDone(job_id, error))
977 callback.Run(error, about_resource.Pass());
980 void JobScheduler::OnGetShareUrlJobDone(
981 JobID job_id,
982 const google_apis::GetShareUrlCallback& callback,
983 google_apis::GDataErrorCode error,
984 const GURL& share_url) {
985 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
986 DCHECK(!callback.is_null());
988 if (OnJobDone(job_id, error))
989 callback.Run(error, share_url);
992 void JobScheduler::OnGetAppListJobDone(
993 JobID job_id,
994 const google_apis::AppListCallback& callback,
995 google_apis::GDataErrorCode error,
996 scoped_ptr<google_apis::AppList> app_list) {
997 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
998 DCHECK(!callback.is_null());
1000 if (OnJobDone(job_id, error))
1001 callback.Run(error, app_list.Pass());
1004 void JobScheduler::OnEntryActionJobDone(
1005 JobID job_id,
1006 const google_apis::EntryActionCallback& callback,
1007 google_apis::GDataErrorCode error) {
1008 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1009 DCHECK(!callback.is_null());
1011 if (OnJobDone(job_id, error))
1012 callback.Run(error);
1015 void JobScheduler::OnDownloadActionJobDone(
1016 JobID job_id,
1017 const google_apis::DownloadActionCallback& callback,
1018 google_apis::GDataErrorCode error,
1019 const base::FilePath& temp_file) {
1020 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1021 DCHECK(!callback.is_null());
1023 if (OnJobDone(job_id, error))
1024 callback.Run(error, temp_file);
1027 void JobScheduler::OnUploadCompletionJobDone(
1028 JobID job_id,
1029 const ResumeUploadParams& resume_params,
1030 const google_apis::GetResourceEntryCallback& callback,
1031 google_apis::GDataErrorCode error,
1032 const GURL& upload_location,
1033 scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1034 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1035 DCHECK(!callback.is_null());
1037 if (!upload_location.is_empty()) {
1038 // If upload_location is available, update the task to resume the
1039 // upload process from the terminated point.
1040 // When we need to retry, the error code should be HTTP_SERVICE_UNAVAILABLE
1041 // so OnJobDone called below will be in charge to re-queue the job.
1042 JobEntry* job_entry = job_map_.Lookup(job_id);
1043 DCHECK(job_entry);
1045 ResumeUploadFileParams params;
1046 params.upload_location = upload_location;
1047 params.local_file_path = resume_params.local_file_path;
1048 params.content_type = resume_params.content_type;
1049 params.callback = base::Bind(&JobScheduler::OnResumeUploadFileDone,
1050 weak_ptr_factory_.GetWeakPtr(),
1051 job_id,
1052 job_entry->task,
1053 callback);
1054 params.progress_callback = base::Bind(&JobScheduler::UpdateProgress,
1055 weak_ptr_factory_.GetWeakPtr(),
1056 job_id);
1057 job_entry->task = base::Bind(&RunResumeUploadFile, uploader_.get(), params);
1060 if (OnJobDone(job_id, error))
1061 callback.Run(error, resource_entry.Pass());
1064 void JobScheduler::OnResumeUploadFileDone(
1065 JobID job_id,
1066 const base::Callback<google_apis::CancelCallback()>& original_task,
1067 const google_apis::GetResourceEntryCallback& callback,
1068 google_apis::GDataErrorCode error,
1069 const GURL& upload_location,
1070 scoped_ptr<google_apis::ResourceEntry> resource_entry) {
1071 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1072 DCHECK(!original_task.is_null());
1073 DCHECK(!callback.is_null());
1075 if (upload_location.is_empty()) {
1076 // If upload_location is not available, we should discard it and stop trying
1077 // to resume. Restore the original task.
1078 JobEntry* job_entry = job_map_.Lookup(job_id);
1079 DCHECK(job_entry);
1080 job_entry->task = original_task;
1083 if (OnJobDone(job_id, error))
1084 callback.Run(error, resource_entry.Pass());
1087 void JobScheduler::UpdateProgress(JobID job_id, int64 progress, int64 total) {
1088 JobEntry* job_entry = job_map_.Lookup(job_id);
1089 DCHECK(job_entry);
1091 job_entry->job_info.num_completed_bytes = progress;
1092 if (total != -1)
1093 job_entry->job_info.num_total_bytes = total;
1094 NotifyJobUpdated(job_entry->job_info);
1097 void JobScheduler::OnConnectionTypeChanged(
1098 net::NetworkChangeNotifier::ConnectionType type) {
1099 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1101 // Resume the job loop.
1102 // Note that we don't need to check the network connection status as it will
1103 // be checked in GetCurrentAcceptedPriority().
1104 for (int i = METADATA_QUEUE; i < NUM_QUEUES; ++i)
1105 DoJobLoop(static_cast<QueueType>(i));
1108 JobScheduler::QueueType JobScheduler::GetJobQueueType(JobType type) {
1109 switch (type) {
1110 case TYPE_GET_ABOUT_RESOURCE:
1111 case TYPE_GET_APP_LIST:
1112 case TYPE_GET_ALL_RESOURCE_LIST:
1113 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY:
1114 case TYPE_SEARCH:
1115 case TYPE_GET_CHANGE_LIST:
1116 case TYPE_GET_REMAINING_CHANGE_LIST:
1117 case TYPE_GET_REMAINING_FILE_LIST:
1118 case TYPE_GET_RESOURCE_ENTRY:
1119 case TYPE_GET_SHARE_URL:
1120 case TYPE_TRASH_RESOURCE:
1121 case TYPE_COPY_RESOURCE:
1122 case TYPE_UPDATE_RESOURCE:
1123 case TYPE_RENAME_RESOURCE:
1124 case TYPE_ADD_RESOURCE_TO_DIRECTORY:
1125 case TYPE_REMOVE_RESOURCE_FROM_DIRECTORY:
1126 case TYPE_ADD_NEW_DIRECTORY:
1127 case TYPE_CREATE_FILE:
1128 case TYPE_GET_RESOURCE_LIST_IN_DIRECTORY_BY_WAPI:
1129 case TYPE_GET_REMAINING_RESOURCE_LIST:
1130 return METADATA_QUEUE;
1132 case TYPE_DOWNLOAD_FILE:
1133 case TYPE_UPLOAD_NEW_FILE:
1134 case TYPE_UPLOAD_EXISTING_FILE:
1135 return FILE_QUEUE;
1137 NOTREACHED();
1138 return FILE_QUEUE;
1141 void JobScheduler::AbortNotRunningJob(JobEntry* job,
1142 google_apis::GDataErrorCode error) {
1143 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1145 const base::TimeDelta elapsed = base::Time::Now() - job->job_info.start_time;
1146 const QueueType queue_type = GetJobQueueType(job->job_info.job_type);
1147 util::Log(logging::LOG_INFO,
1148 "Job aborted: %s => %s (elapsed time: %sms) - %s",
1149 job->job_info.ToString().c_str(),
1150 GDataErrorCodeToString(error).c_str(),
1151 base::Int64ToString(elapsed.InMilliseconds()).c_str(),
1152 GetQueueInfo(queue_type).c_str());
1154 base::Callback<void(google_apis::GDataErrorCode)> callback =
1155 job->abort_callback;
1156 queue_[GetJobQueueType(job->job_info.job_type)]->Remove(job->job_info.job_id);
1157 NotifyJobDone(job->job_info, error);
1158 job_map_.Remove(job->job_info.job_id);
1159 base::MessageLoopProxy::current()->PostTask(FROM_HERE,
1160 base::Bind(callback, error));
1163 void JobScheduler::NotifyJobAdded(const JobInfo& job_info) {
1164 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1165 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobAdded(job_info));
1168 void JobScheduler::NotifyJobDone(const JobInfo& job_info,
1169 google_apis::GDataErrorCode error) {
1170 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1171 FOR_EACH_OBSERVER(JobListObserver, observer_list_,
1172 OnJobDone(job_info, GDataToFileError(error)));
1175 void JobScheduler::NotifyJobUpdated(const JobInfo& job_info) {
1176 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
1177 FOR_EACH_OBSERVER(JobListObserver, observer_list_, OnJobUpdated(job_info));
1180 std::string JobScheduler::GetQueueInfo(QueueType type) const {
1181 return QueueTypeToString(type) + " " + queue_[type]->ToString();
1184 // static
1185 std::string JobScheduler::QueueTypeToString(QueueType type) {
1186 switch (type) {
1187 case METADATA_QUEUE:
1188 return "METADATA_QUEUE";
1189 case FILE_QUEUE:
1190 return "FILE_QUEUE";
1191 case NUM_QUEUES:
1192 break; // This value is just a sentinel. Should never be used.
1194 NOTREACHED();
1195 return "";
1198 } // namespace drive