[SyncFS] Build indexes from FileTracker entries on disk.
[chromium-blink-merge.git] / sync / engine / sync_scheduler_impl.cc
blob4d38c21f87a8f68ba5cb34112ae4bf6521a4b461
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/sync_scheduler_impl.h"
7 #include <algorithm>
8 #include <cstring>
10 #include "base/auto_reset.h"
11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h"
19 #include "sync/protocol/proto_enum_conversions.h"
20 #include "sync/protocol/sync.pb.h"
21 #include "sync/util/data_type_histogram.h"
22 #include "sync/util/logging.h"
24 using base::TimeDelta;
25 using base::TimeTicks;
27 namespace syncer {
29 using sessions::SyncSession;
30 using sessions::SyncSessionSnapshot;
31 using sync_pb::GetUpdatesCallerInfo;
33 namespace {
35 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
36 switch (error.error_type) {
37 case SYNC_SUCCESS:
38 case MIGRATION_DONE:
39 case THROTTLED:
40 case TRANSIENT_ERROR:
41 return false;
42 case NOT_MY_BIRTHDAY:
43 case CLEAR_PENDING:
44 case DISABLED_BY_ADMIN:
45 case USER_ROLLBACK:
46 // If we send terminate sync early then |sync_cycle_ended| notification
47 // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
48 // notification wouldnt be sent either. Then the UI layer would be left
49 // waiting forever. So assert we would send something.
50 DCHECK_NE(error.action, UNKNOWN_ACTION);
51 return true;
52 case INVALID_CREDENTIAL:
53 // The notification for this is handled by PostAndProcessHeaders|.
54 // Server does no have to send any action for this.
55 return true;
56 // Make the default a NOTREACHED. So if a new error is introduced we
57 // think about its expected functionality.
58 default:
59 NOTREACHED();
60 return false;
64 bool IsActionableError(
65 const SyncProtocolError& error) {
66 return (error.action != UNKNOWN_ACTION);
68 } // namespace
70 ConfigurationParams::ConfigurationParams()
71 : source(GetUpdatesCallerInfo::UNKNOWN) {}
72 ConfigurationParams::ConfigurationParams(
73 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
74 ModelTypeSet types_to_download,
75 const ModelSafeRoutingInfo& routing_info,
76 const base::Closure& ready_task,
77 const base::Closure& retry_task)
78 : source(source),
79 types_to_download(types_to_download),
80 routing_info(routing_info),
81 ready_task(ready_task),
82 retry_task(retry_task) {
83 DCHECK(!ready_task.is_null());
84 DCHECK(!retry_task.is_null());
86 ConfigurationParams::~ConfigurationParams() {}
88 SyncSchedulerImpl::WaitInterval::WaitInterval()
89 : mode(UNKNOWN) {}
91 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
92 : mode(mode), length(length) {}
94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
96 #define ENUM_CASE(x) case x: return #x; break;
98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
99 switch (mode) {
100 ENUM_CASE(UNKNOWN);
101 ENUM_CASE(EXPONENTIAL_BACKOFF);
102 ENUM_CASE(THROTTLED);
104 NOTREACHED();
105 return "";
108 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
109 NudgeSource source) {
110 switch (source) {
111 case NUDGE_SOURCE_NOTIFICATION:
112 return GetUpdatesCallerInfo::NOTIFICATION;
113 case NUDGE_SOURCE_LOCAL:
114 return GetUpdatesCallerInfo::LOCAL;
115 case NUDGE_SOURCE_LOCAL_REFRESH:
116 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
117 case NUDGE_SOURCE_UNKNOWN:
118 return GetUpdatesCallerInfo::UNKNOWN;
119 default:
120 NOTREACHED();
121 return GetUpdatesCallerInfo::UNKNOWN;
125 // Helper macros to log with the syncer thread name; useful when there
126 // are multiple syncer threads involved.
128 #define SLOG(severity) LOG(severity) << name_ << ": "
130 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
132 #define SDVLOG_LOC(from_here, verbose_level) \
133 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
135 namespace {
137 const int kDefaultSessionsCommitDelaySeconds = 10;
139 bool IsConfigRelatedUpdateSourceValue(
140 GetUpdatesCallerInfo::GetUpdatesSource source) {
141 switch (source) {
142 case GetUpdatesCallerInfo::RECONFIGURATION:
143 case GetUpdatesCallerInfo::MIGRATION:
144 case GetUpdatesCallerInfo::NEW_CLIENT:
145 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
146 return true;
147 default:
148 return false;
152 } // namespace
154 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
155 BackoffDelayProvider* delay_provider,
156 sessions::SyncSessionContext* context,
157 Syncer* syncer)
158 : name_(name),
159 started_(false),
160 syncer_short_poll_interval_seconds_(
161 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
162 syncer_long_poll_interval_seconds_(
163 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
164 sessions_commit_delay_(
165 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
166 mode_(NORMAL_MODE),
167 delay_provider_(delay_provider),
168 syncer_(syncer),
169 session_context_(context),
170 no_scheduling_allowed_(false),
171 do_poll_after_credentials_updated_(false),
172 next_sync_session_job_priority_(NORMAL_PRIORITY),
173 weak_ptr_factory_(this),
174 weak_ptr_factory_for_weak_handle_(this) {
175 weak_handle_this_ = MakeWeakHandle(
176 weak_ptr_factory_for_weak_handle_.GetWeakPtr());
179 SyncSchedulerImpl::~SyncSchedulerImpl() {
180 DCHECK(CalledOnValidThread());
181 Stop();
184 void SyncSchedulerImpl::OnCredentialsUpdated() {
185 DCHECK(CalledOnValidThread());
187 if (HttpResponse::SYNC_AUTH_ERROR ==
188 session_context_->connection_manager()->server_status()) {
189 OnServerConnectionErrorFixed();
193 void SyncSchedulerImpl::OnConnectionStatusChange() {
194 if (HttpResponse::CONNECTION_UNAVAILABLE ==
195 session_context_->connection_manager()->server_status()) {
196 // Optimistically assume that the connection is fixed and try
197 // connecting.
198 OnServerConnectionErrorFixed();
202 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
203 // There could be a pending nudge or configuration job in several cases:
205 // 1. We're in exponential backoff.
206 // 2. We're silenced / throttled.
207 // 3. A nudge was saved previously due to not having a valid auth token.
208 // 4. A nudge was scheduled + saved while in configuration mode.
210 // In all cases except (2), we want to retry contacting the server. We
211 // call TryCanaryJob to achieve this, and note that nothing -- not even a
212 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
213 // has the authority to do that is the Unthrottle timer.
214 TryCanaryJob();
217 void SyncSchedulerImpl::Start(Mode mode) {
218 DCHECK(CalledOnValidThread());
219 std::string thread_name = base::MessageLoop::current()->thread_name();
220 if (thread_name.empty())
221 thread_name = "<Main thread>";
222 SDVLOG(2) << "Start called from thread "
223 << thread_name << " with mode " << GetModeString(mode);
224 if (!started_) {
225 started_ = true;
226 SendInitialSnapshot();
229 DCHECK(!session_context_->account_name().empty());
230 DCHECK(syncer_.get());
231 Mode old_mode = mode_;
232 mode_ = mode;
233 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed.
235 if (old_mode != mode_ && mode_ == NORMAL_MODE) {
236 // We just got back to normal mode. Let's try to run the work that was
237 // queued up while we were configuring.
239 // Update our current time before checking IsRetryRequired().
240 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
241 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
242 TrySyncSessionJob();
247 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
248 ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
249 ModelTypeSet enabled_protocol_types =
250 Intersection(ProtocolTypes(), enabled_types);
251 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
252 return Difference(enabled_protocol_types, throttled_types);
255 void SyncSchedulerImpl::SendInitialSnapshot() {
256 DCHECK(CalledOnValidThread());
257 scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
258 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
259 event.snapshot = dummy->TakeSnapshot();
260 FOR_EACH_OBSERVER(SyncEngineEventListener,
261 *session_context_->listeners(),
262 OnSyncCycleEvent(event));
265 namespace {
267 // Helper to extract the routing info corresponding to types in
268 // |types_to_download| from |current_routes|.
269 void BuildModelSafeParams(
270 ModelTypeSet types_to_download,
271 const ModelSafeRoutingInfo& current_routes,
272 ModelSafeRoutingInfo* result_routes) {
273 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
274 iter.Inc()) {
275 ModelType type = iter.Get();
276 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
277 DCHECK(route != current_routes.end());
278 ModelSafeGroup group = route->second;
279 (*result_routes)[type] = group;
283 } // namespace.
285 void SyncSchedulerImpl::ScheduleConfiguration(
286 const ConfigurationParams& params) {
287 DCHECK(CalledOnValidThread());
288 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
289 DCHECK_EQ(CONFIGURATION_MODE, mode_);
290 DCHECK(!params.ready_task.is_null());
291 CHECK(started_) << "Scheduler must be running to configure.";
292 SDVLOG(2) << "Reconfiguring syncer.";
294 // Only one configuration is allowed at a time. Verify we're not waiting
295 // for a pending configure job.
296 DCHECK(!pending_configure_params_);
298 ModelSafeRoutingInfo restricted_routes;
299 BuildModelSafeParams(params.types_to_download,
300 params.routing_info,
301 &restricted_routes);
302 session_context_->SetRoutingInfo(restricted_routes);
304 // Only reconfigure if we have types to download.
305 if (!params.types_to_download.Empty()) {
306 pending_configure_params_.reset(new ConfigurationParams(params));
307 TrySyncSessionJob();
308 } else {
309 SDVLOG(2) << "No change in routing info, calling ready task directly.";
310 params.ready_task.Run();
314 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
315 DCHECK(CalledOnValidThread());
316 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
317 SDVLOG(1) << "Unable to run a job because we're throttled.";
318 return false;
321 if (wait_interval_
322 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
323 && priority != CANARY_PRIORITY) {
324 SDVLOG(1) << "Unable to run a job because we're backing off.";
325 return false;
328 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
329 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
330 return false;
333 return true;
336 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
337 DCHECK(CalledOnValidThread());
339 if (!CanRunJobNow(priority)) {
340 SDVLOG(1) << "Unable to run a nudge job right now";
341 return false;
344 const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
345 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
346 SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
347 return false;
350 if (mode_ == CONFIGURATION_MODE) {
351 SDVLOG(1) << "Not running nudge because we're in configuration mode.";
352 return false;
355 return true;
358 void SyncSchedulerImpl::ScheduleLocalNudge(
359 const TimeDelta& desired_delay,
360 ModelTypeSet types,
361 const tracked_objects::Location& nudge_location) {
362 DCHECK(CalledOnValidThread());
363 DCHECK(!types.Empty());
365 SDVLOG_LOC(nudge_location, 2)
366 << "Scheduling sync because of local change to "
367 << ModelTypeSetToString(types);
368 UpdateNudgeTimeRecords(types);
369 nudge_tracker_.RecordLocalChange(types);
370 ScheduleNudgeImpl(desired_delay, nudge_location);
373 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
374 const TimeDelta& desired_delay,
375 ModelTypeSet types,
376 const tracked_objects::Location& nudge_location) {
377 DCHECK(CalledOnValidThread());
378 DCHECK(!types.Empty());
380 SDVLOG_LOC(nudge_location, 2)
381 << "Scheduling sync because of local refresh request for "
382 << ModelTypeSetToString(types);
383 nudge_tracker_.RecordLocalRefreshRequest(types);
384 ScheduleNudgeImpl(desired_delay, nudge_location);
387 void SyncSchedulerImpl::ScheduleInvalidationNudge(
388 const TimeDelta& desired_delay,
389 syncer::ModelType model_type,
390 scoped_ptr<InvalidationInterface> invalidation,
391 const tracked_objects::Location& nudge_location) {
392 DCHECK(CalledOnValidThread());
394 SDVLOG_LOC(nudge_location, 2)
395 << "Scheduling sync because we received invalidation for "
396 << ModelTypeToString(model_type);
397 nudge_tracker_.RecordRemoteInvalidation(model_type, invalidation.Pass());
398 ScheduleNudgeImpl(desired_delay, nudge_location);
401 // TODO(zea): Consider adding separate throttling/backoff for datatype
402 // refresh requests.
403 void SyncSchedulerImpl::ScheduleNudgeImpl(
404 const TimeDelta& delay,
405 const tracked_objects::Location& nudge_location) {
406 DCHECK(CalledOnValidThread());
408 if (no_scheduling_allowed_) {
409 NOTREACHED() << "Illegal to schedule job while session in progress.";
410 return;
413 if (!started_) {
414 SDVLOG_LOC(nudge_location, 2)
415 << "Dropping nudge, scheduler is not running.";
416 return;
419 SDVLOG_LOC(nudge_location, 2)
420 << "In ScheduleNudgeImpl with delay "
421 << delay.InMilliseconds() << " ms";
423 if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
424 return;
426 TimeTicks incoming_run_time = TimeTicks::Now() + delay;
427 if (!scheduled_nudge_time_.is_null() &&
428 (scheduled_nudge_time_ < incoming_run_time)) {
429 // Old job arrives sooner than this one. Don't reschedule it.
430 return;
433 // Either there is no existing nudge in flight or the incoming nudge should be
434 // made to arrive first (preempt) the existing nudge. We reschedule in either
435 // case.
436 SDVLOG_LOC(nudge_location, 2)
437 << "Scheduling a nudge with "
438 << delay.InMilliseconds() << " ms delay";
439 scheduled_nudge_time_ = incoming_run_time;
440 pending_wakeup_timer_.Start(
441 nudge_location,
442 delay,
443 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
444 weak_ptr_factory_.GetWeakPtr()));
447 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
448 switch (mode) {
449 ENUM_CASE(CONFIGURATION_MODE);
450 ENUM_CASE(NORMAL_MODE);
452 return "";
455 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
456 DCHECK(CalledOnValidThread());
457 DCHECK(CanRunNudgeJobNow(priority));
459 DVLOG(2) << "Will run normal mode sync cycle with types "
460 << ModelTypeSetToString(session_context_->GetEnabledTypes());
461 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
462 bool premature_exit = !syncer_->NormalSyncShare(
463 GetEnabledAndUnthrottledTypes(),
464 nudge_tracker_,
465 session.get());
466 AdjustPolling(FORCE_RESET);
467 // Don't run poll job till the next time poll timer fires.
468 do_poll_after_credentials_updated_ = false;
470 bool success = !premature_exit
471 && !sessions::HasSyncerError(
472 session->status_controller().model_neutral_state());
474 if (success) {
475 // That cycle took care of any outstanding work we had.
476 SDVLOG(2) << "Nudge succeeded.";
477 nudge_tracker_.RecordSuccessfulSyncCycle();
478 scheduled_nudge_time_ = base::TimeTicks();
480 // If we're here, then we successfully reached the server. End all backoff.
481 wait_interval_.reset();
482 NotifyRetryTime(base::Time());
483 } else {
484 HandleFailure(session->status_controller().model_neutral_state());
488 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
489 DCHECK(CalledOnValidThread());
490 DCHECK_EQ(mode_, CONFIGURATION_MODE);
491 DCHECK(pending_configure_params_ != NULL);
493 if (!CanRunJobNow(priority)) {
494 SDVLOG(2) << "Unable to run configure job right now.";
495 if (!pending_configure_params_->retry_task.is_null()) {
496 pending_configure_params_->retry_task.Run();
497 pending_configure_params_->retry_task.Reset();
499 return;
502 SDVLOG(2) << "Will run configure SyncShare with types "
503 << ModelTypeSetToString(session_context_->GetEnabledTypes());
504 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
505 bool premature_exit = !syncer_->ConfigureSyncShare(
506 pending_configure_params_->types_to_download,
507 pending_configure_params_->source,
508 session.get());
509 AdjustPolling(FORCE_RESET);
510 // Don't run poll job till the next time poll timer fires.
511 do_poll_after_credentials_updated_ = false;
513 bool success = !premature_exit
514 && !sessions::HasSyncerError(
515 session->status_controller().model_neutral_state());
517 if (success) {
518 SDVLOG(2) << "Configure succeeded.";
519 pending_configure_params_->ready_task.Run();
520 pending_configure_params_.reset();
522 // If we're here, then we successfully reached the server. End all backoff.
523 wait_interval_.reset();
524 NotifyRetryTime(base::Time());
525 } else {
526 HandleFailure(session->status_controller().model_neutral_state());
527 // Sync cycle might receive response from server that causes scheduler to
528 // stop and draws pending_configure_params_ invalid.
529 if (started_ && !pending_configure_params_->retry_task.is_null()) {
530 pending_configure_params_->retry_task.Run();
531 pending_configure_params_->retry_task.Reset();
536 void SyncSchedulerImpl::HandleFailure(
537 const sessions::ModelNeutralState& model_neutral_state) {
538 if (IsCurrentlyThrottled()) {
539 SDVLOG(2) << "Was throttled during previous sync cycle.";
540 RestartWaiting();
541 } else if (!IsBackingOff()) {
542 // Setup our backoff if this is our first such failure.
543 TimeDelta length = delay_provider_->GetDelay(
544 delay_provider_->GetInitialDelay(model_neutral_state));
545 wait_interval_.reset(
546 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
547 SDVLOG(2) << "Sync cycle failed. Will back off for "
548 << wait_interval_->length.InMilliseconds() << "ms.";
549 RestartWaiting();
553 void SyncSchedulerImpl::DoPollSyncSessionJob() {
554 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
556 SDVLOG(2) << "Polling with types "
557 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
558 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
559 syncer_->PollSyncShare(
560 GetEnabledAndUnthrottledTypes(),
561 session.get());
563 AdjustPolling(FORCE_RESET);
565 if (IsCurrentlyThrottled()) {
566 SDVLOG(2) << "Poll request got us throttled.";
567 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
568 // to do is start the timer.
569 RestartWaiting();
573 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
574 DCHECK(CalledOnValidThread());
575 base::TimeTicks now = TimeTicks::Now();
576 // Update timing information for how often datatypes are triggering nudges.
577 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
578 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
579 last_local_nudges_by_model_type_[iter.Get()] = now;
580 if (previous.is_null())
581 continue;
583 #define PER_DATA_TYPE_MACRO(type_str) \
584 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
585 SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
586 #undef PER_DATA_TYPE_MACRO
590 TimeDelta SyncSchedulerImpl::GetPollInterval() {
591 return (!session_context_->notifications_enabled() ||
592 !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
593 syncer_short_poll_interval_seconds_ :
594 syncer_long_poll_interval_seconds_;
597 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
598 DCHECK(CalledOnValidThread());
600 TimeDelta poll = GetPollInterval();
601 bool rate_changed = !poll_timer_.IsRunning() ||
602 poll != poll_timer_.GetCurrentDelay();
604 if (type == FORCE_RESET) {
605 last_poll_reset_ = base::TimeTicks::Now();
606 if (!rate_changed)
607 poll_timer_.Reset();
610 if (!rate_changed)
611 return;
613 // Adjust poll rate.
614 poll_timer_.Stop();
615 poll_timer_.Start(FROM_HERE, poll, this,
616 &SyncSchedulerImpl::PollTimerCallback);
619 void SyncSchedulerImpl::RestartWaiting() {
620 CHECK(wait_interval_.get());
621 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
622 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
623 SDVLOG(2) << "Starting WaitInterval timer of length "
624 << wait_interval_->length.InMilliseconds() << "ms.";
625 if (wait_interval_->mode == WaitInterval::THROTTLED) {
626 pending_wakeup_timer_.Start(
627 FROM_HERE,
628 wait_interval_->length,
629 base::Bind(&SyncSchedulerImpl::Unthrottle,
630 weak_ptr_factory_.GetWeakPtr()));
631 } else {
632 pending_wakeup_timer_.Start(
633 FROM_HERE,
634 wait_interval_->length,
635 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
636 weak_ptr_factory_.GetWeakPtr()));
640 void SyncSchedulerImpl::Stop() {
641 DCHECK(CalledOnValidThread());
642 SDVLOG(2) << "Stop called";
644 // Kill any in-flight method calls.
645 weak_ptr_factory_.InvalidateWeakPtrs();
646 wait_interval_.reset();
647 NotifyRetryTime(base::Time());
648 poll_timer_.Stop();
649 pending_wakeup_timer_.Stop();
650 pending_configure_params_.reset();
651 if (started_)
652 started_ = false;
655 // This is the only place where we invoke DoSyncSessionJob with canary
656 // privileges. Everyone else should use NORMAL_PRIORITY.
657 void SyncSchedulerImpl::TryCanaryJob() {
658 next_sync_session_job_priority_ = CANARY_PRIORITY;
659 TrySyncSessionJob();
662 void SyncSchedulerImpl::TrySyncSessionJob() {
663 // Post call to TrySyncSessionJobImpl on current thread. Later request for
664 // access token will be here.
665 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
666 &SyncSchedulerImpl::TrySyncSessionJobImpl,
667 weak_ptr_factory_.GetWeakPtr()));
670 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
671 JobPriority priority = next_sync_session_job_priority_;
672 next_sync_session_job_priority_ = NORMAL_PRIORITY;
674 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
676 DCHECK(CalledOnValidThread());
677 if (mode_ == CONFIGURATION_MODE) {
678 if (pending_configure_params_) {
679 SDVLOG(2) << "Found pending configure job";
680 DoConfigurationSyncSessionJob(priority);
682 } else if (CanRunNudgeJobNow(priority)) {
683 if (nudge_tracker_.IsSyncRequired()) {
684 SDVLOG(2) << "Found pending nudge job";
685 DoNudgeSyncSessionJob(priority);
686 } else if (do_poll_after_credentials_updated_ ||
687 ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
688 DoPollSyncSessionJob();
689 // Poll timer fires infrequently. Usually by this time access token is
690 // already expired and poll job will fail with auth error. Set flag to
691 // retry poll once ProfileSyncService gets new access token, TryCanaryJob
692 // will be called after access token is retrieved.
693 if (HttpResponse::SYNC_AUTH_ERROR ==
694 session_context_->connection_manager()->server_status()) {
695 do_poll_after_credentials_updated_ = true;
700 if (priority == CANARY_PRIORITY) {
701 // If this is canary job then whatever result was don't run poll job till
702 // the next time poll timer fires.
703 do_poll_after_credentials_updated_ = false;
706 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
707 // If we succeeded, our wait interval would have been cleared. If it hasn't
708 // been cleared, then we should increase our backoff interval and schedule
709 // another retry.
710 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
711 wait_interval_.reset(
712 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
713 SDVLOG(2) << "Sync cycle failed. Will back off for "
714 << wait_interval_->length.InMilliseconds() << "ms.";
715 RestartWaiting();
719 void SyncSchedulerImpl::PollTimerCallback() {
720 DCHECK(CalledOnValidThread());
721 if (no_scheduling_allowed_) {
722 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
723 // functions that are called only on the sync thread. This function is also
724 // called only on the sync thread, and only when it is posted by an expiring
725 // timer. If we find that no_scheduling_allowed_ is set here, then
726 // something is very wrong. Maybe someone mistakenly called us directly, or
727 // mishandled the book-keeping for no_scheduling_allowed_.
728 NOTREACHED() << "Illegal to schedule job while session in progress.";
729 return;
732 TrySyncSessionJob();
735 void SyncSchedulerImpl::RetryTimerCallback() {
736 TrySyncSessionJob();
739 void SyncSchedulerImpl::Unthrottle() {
740 DCHECK(CalledOnValidThread());
741 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
743 // We're no longer throttled, so clear the wait interval.
744 wait_interval_.reset();
745 NotifyRetryTime(base::Time());
746 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
748 // We treat this as a 'canary' in the sense that it was originally scheduled
749 // to run some time ago, failed, and we now want to retry, versus a job that
750 // was just created (e.g via ScheduleNudgeImpl). The main implication is
751 // that we're careful to update routing info (etc) with such potentially
752 // stale canary jobs.
753 TryCanaryJob();
756 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
757 DCHECK(CalledOnValidThread());
758 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
759 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
761 if (nudge_tracker_.IsAnyTypeThrottled()) {
762 const base::TimeTicks now = base::TimeTicks::Now();
763 base::TimeDelta time_until_next_unthrottle =
764 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
765 type_unthrottle_timer_.Start(
766 FROM_HERE,
767 time_until_next_unthrottle,
768 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
769 weak_ptr_factory_.GetWeakPtr(),
770 now + time_until_next_unthrottle));
773 // Maybe this is a good time to run a nudge job. Let's try it.
774 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
775 TrySyncSessionJob();
778 void SyncSchedulerImpl::PerformDelayedNudge() {
779 // Circumstances may have changed since we scheduled this delayed nudge.
780 // We must check to see if it's OK to run the job before we do so.
781 if (CanRunNudgeJobNow(NORMAL_PRIORITY))
782 TrySyncSessionJob();
784 // We're not responsible for setting up any retries here. The functions that
785 // first put us into a state that prevents successful sync cycles (eg. global
786 // throttling, type throttling, network errors, transient errors) will also
787 // setup the appropriate retry logic (eg. retry after timeout, exponential
788 // backoff, retry when the network changes).
791 void SyncSchedulerImpl::ExponentialBackoffRetry() {
792 TryCanaryJob();
795 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
796 FOR_EACH_OBSERVER(SyncEngineEventListener,
797 *session_context_->listeners(),
798 OnRetryTimeChanged(retry_time));
801 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
802 FOR_EACH_OBSERVER(SyncEngineEventListener,
803 *session_context_->listeners(),
804 OnThrottledTypesChanged(types));
807 bool SyncSchedulerImpl::IsBackingOff() const {
808 DCHECK(CalledOnValidThread());
809 return wait_interval_.get() && wait_interval_->mode ==
810 WaitInterval::EXPONENTIAL_BACKOFF;
813 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
814 DCHECK(CalledOnValidThread());
815 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
816 throttle_duration));
817 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
818 NotifyThrottledTypesChanged(ModelTypeSet::All());
821 void SyncSchedulerImpl::OnTypesThrottled(
822 ModelTypeSet types,
823 const base::TimeDelta& throttle_duration) {
824 base::TimeTicks now = base::TimeTicks::Now();
826 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
827 base::TimeDelta time_until_next_unthrottle =
828 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
829 type_unthrottle_timer_.Start(
830 FROM_HERE,
831 time_until_next_unthrottle,
832 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
833 weak_ptr_factory_.GetWeakPtr(),
834 now + time_until_next_unthrottle));
835 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
838 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
839 DCHECK(CalledOnValidThread());
840 return wait_interval_.get() && wait_interval_->mode ==
841 WaitInterval::THROTTLED;
844 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
845 const base::TimeDelta& new_interval) {
846 DCHECK(CalledOnValidThread());
847 syncer_short_poll_interval_seconds_ = new_interval;
850 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
851 const base::TimeDelta& new_interval) {
852 DCHECK(CalledOnValidThread());
853 syncer_long_poll_interval_seconds_ = new_interval;
856 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
857 const base::TimeDelta& new_delay) {
858 DCHECK(CalledOnValidThread());
859 sessions_commit_delay_ = new_delay;
862 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
863 if (size > 0)
864 nudge_tracker_.SetHintBufferSize(size);
865 else
866 NOTREACHED() << "Hint buffer size should be > 0.";
869 void SyncSchedulerImpl::OnSyncProtocolError(
870 const SyncProtocolError& sync_protocol_error) {
871 DCHECK(CalledOnValidThread());
872 if (ShouldRequestEarlyExit(sync_protocol_error)) {
873 SDVLOG(2) << "Sync Scheduler requesting early exit.";
874 Stop();
876 if (IsActionableError(sync_protocol_error)) {
877 SDVLOG(2) << "OnActionableError";
878 FOR_EACH_OBSERVER(SyncEngineEventListener,
879 *session_context_->listeners(),
880 OnActionableError(sync_protocol_error));
884 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
885 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
886 retry_timer_.Start(FROM_HERE, delay, this,
887 &SyncSchedulerImpl::RetryTimerCallback);
890 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
891 FOR_EACH_OBSERVER(SyncEngineEventListener,
892 *session_context_->listeners(),
893 OnMigrationRequested(types));
896 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
897 DCHECK(CalledOnValidThread());
898 session_context_->set_notifications_enabled(notifications_enabled);
899 if (notifications_enabled)
900 nudge_tracker_.OnInvalidationsEnabled();
901 else
902 nudge_tracker_.OnInvalidationsDisabled();
905 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
906 DCHECK(CalledOnValidThread());
907 return sessions_commit_delay_;
910 #undef SDVLOG_LOC
912 #undef SDVLOG
914 #undef SLOG
916 #undef ENUM_CASE
918 } // namespace syncer