Update .DEPS.git
[chromium-blink-merge.git] / sync / engine / sync_scheduler_impl.cc
blobac169ec18f4c394994c8230fffb3a2b757801138
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/sync_scheduler_impl.h"
7 #include <algorithm>
8 #include <cstring>
10 #include "base/auto_reset.h"
11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h"
19 #include "sync/notifier/object_id_invalidation_map.h"
20 #include "sync/protocol/proto_enum_conversions.h"
21 #include "sync/protocol/sync.pb.h"
22 #include "sync/util/data_type_histogram.h"
23 #include "sync/util/logging.h"
25 using base::TimeDelta;
26 using base::TimeTicks;
28 namespace syncer {
30 using sessions::SyncSession;
31 using sessions::SyncSessionSnapshot;
32 using sync_pb::GetUpdatesCallerInfo;
34 namespace {
36 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
37 switch (error.error_type) {
38 case SYNC_SUCCESS:
39 case MIGRATION_DONE:
40 case THROTTLED:
41 case TRANSIENT_ERROR:
42 return false;
43 case NOT_MY_BIRTHDAY:
44 case CLEAR_PENDING:
45 case DISABLED_BY_ADMIN:
46 // If we send terminate sync early then |sync_cycle_ended| notification
47 // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
48 // notification wouldnt be sent either. Then the UI layer would be left
49 // waiting forever. So assert we would send something.
50 DCHECK_NE(error.action, UNKNOWN_ACTION);
51 return true;
52 case INVALID_CREDENTIAL:
53 // The notification for this is handled by PostAndProcessHeaders|.
54 // Server does no have to send any action for this.
55 return true;
56 // Make the default a NOTREACHED. So if a new error is introduced we
57 // think about its expected functionality.
58 default:
59 NOTREACHED();
60 return false;
64 bool IsActionableError(
65 const SyncProtocolError& error) {
66 return (error.action != UNKNOWN_ACTION);
68 } // namespace
70 ConfigurationParams::ConfigurationParams()
71 : source(GetUpdatesCallerInfo::UNKNOWN) {}
72 ConfigurationParams::ConfigurationParams(
73 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
74 ModelTypeSet types_to_download,
75 const ModelSafeRoutingInfo& routing_info,
76 const base::Closure& ready_task,
77 const base::Closure& retry_task)
78 : source(source),
79 types_to_download(types_to_download),
80 routing_info(routing_info),
81 ready_task(ready_task),
82 retry_task(retry_task) {
83 DCHECK(!ready_task.is_null());
84 DCHECK(!retry_task.is_null());
86 ConfigurationParams::~ConfigurationParams() {}
88 SyncSchedulerImpl::WaitInterval::WaitInterval()
89 : mode(UNKNOWN) {}
91 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
92 : mode(mode), length(length) {}
94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
96 #define ENUM_CASE(x) case x: return #x; break;
98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
99 switch (mode) {
100 ENUM_CASE(UNKNOWN);
101 ENUM_CASE(EXPONENTIAL_BACKOFF);
102 ENUM_CASE(THROTTLED);
104 NOTREACHED();
105 return "";
108 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
109 NudgeSource source) {
110 switch (source) {
111 case NUDGE_SOURCE_NOTIFICATION:
112 return GetUpdatesCallerInfo::NOTIFICATION;
113 case NUDGE_SOURCE_LOCAL:
114 return GetUpdatesCallerInfo::LOCAL;
115 case NUDGE_SOURCE_LOCAL_REFRESH:
116 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
117 case NUDGE_SOURCE_UNKNOWN:
118 return GetUpdatesCallerInfo::UNKNOWN;
119 default:
120 NOTREACHED();
121 return GetUpdatesCallerInfo::UNKNOWN;
125 // Helper macros to log with the syncer thread name; useful when there
126 // are multiple syncer threads involved.
128 #define SLOG(severity) LOG(severity) << name_ << ": "
130 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
132 #define SDVLOG_LOC(from_here, verbose_level) \
133 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
135 namespace {
137 const int kDefaultSessionsCommitDelaySeconds = 10;
139 bool IsConfigRelatedUpdateSourceValue(
140 GetUpdatesCallerInfo::GetUpdatesSource source) {
141 switch (source) {
142 case GetUpdatesCallerInfo::RECONFIGURATION:
143 case GetUpdatesCallerInfo::MIGRATION:
144 case GetUpdatesCallerInfo::NEW_CLIENT:
145 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
146 return true;
147 default:
148 return false;
152 } // namespace
154 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
155 BackoffDelayProvider* delay_provider,
156 sessions::SyncSessionContext* context,
157 Syncer* syncer)
158 : name_(name),
159 started_(false),
160 syncer_short_poll_interval_seconds_(
161 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
162 syncer_long_poll_interval_seconds_(
163 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
164 sessions_commit_delay_(
165 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
166 mode_(NORMAL_MODE),
167 delay_provider_(delay_provider),
168 syncer_(syncer),
169 session_context_(context),
170 no_scheduling_allowed_(false),
171 do_poll_after_credentials_updated_(false),
172 next_sync_session_job_priority_(NORMAL_PRIORITY),
173 weak_ptr_factory_(this),
174 weak_ptr_factory_for_weak_handle_(this) {
175 weak_handle_this_ = MakeWeakHandle(
176 weak_ptr_factory_for_weak_handle_.GetWeakPtr());
179 SyncSchedulerImpl::~SyncSchedulerImpl() {
180 DCHECK(CalledOnValidThread());
181 Stop();
184 void SyncSchedulerImpl::OnCredentialsUpdated() {
185 DCHECK(CalledOnValidThread());
187 if (HttpResponse::SYNC_AUTH_ERROR ==
188 session_context_->connection_manager()->server_status()) {
189 OnServerConnectionErrorFixed();
193 void SyncSchedulerImpl::OnConnectionStatusChange() {
194 if (HttpResponse::CONNECTION_UNAVAILABLE ==
195 session_context_->connection_manager()->server_status()) {
196 // Optimistically assume that the connection is fixed and try
197 // connecting.
198 OnServerConnectionErrorFixed();
202 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
203 // There could be a pending nudge or configuration job in several cases:
205 // 1. We're in exponential backoff.
206 // 2. We're silenced / throttled.
207 // 3. A nudge was saved previously due to not having a valid auth token.
208 // 4. A nudge was scheduled + saved while in configuration mode.
210 // In all cases except (2), we want to retry contacting the server. We
211 // call TryCanaryJob to achieve this, and note that nothing -- not even a
212 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
213 // has the authority to do that is the Unthrottle timer.
214 TryCanaryJob();
217 void SyncSchedulerImpl::Start(Mode mode) {
218 DCHECK(CalledOnValidThread());
219 std::string thread_name = base::MessageLoop::current()->thread_name();
220 if (thread_name.empty())
221 thread_name = "<Main thread>";
222 SDVLOG(2) << "Start called from thread "
223 << thread_name << " with mode " << GetModeString(mode);
224 if (!started_) {
225 started_ = true;
226 SendInitialSnapshot();
229 DCHECK(!session_context_->account_name().empty());
230 DCHECK(syncer_.get());
231 Mode old_mode = mode_;
232 mode_ = mode;
233 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed.
235 if (old_mode != mode_ &&
236 mode_ == NORMAL_MODE &&
237 (nudge_tracker_.IsSyncRequired() ||
238 nudge_tracker_.IsRetryRequired(base::TimeTicks::Now())) &&
239 CanRunNudgeJobNow(NORMAL_PRIORITY)) {
240 // We just got back to normal mode. Let's try to run the work that was
241 // queued up while we were configuring.
242 TrySyncSessionJob();
246 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
247 ModelTypeSet enabled_types = session_context_->enabled_types();
248 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
249 return Difference(enabled_types, throttled_types);
252 void SyncSchedulerImpl::SendInitialSnapshot() {
253 DCHECK(CalledOnValidThread());
254 scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
255 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
256 event.snapshot = dummy->TakeSnapshot();
257 session_context_->NotifyListeners(event);
260 namespace {
262 // Helper to extract the routing info corresponding to types in
263 // |types_to_download| from |current_routes|.
264 void BuildModelSafeParams(
265 ModelTypeSet types_to_download,
266 const ModelSafeRoutingInfo& current_routes,
267 ModelSafeRoutingInfo* result_routes) {
268 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
269 iter.Inc()) {
270 ModelType type = iter.Get();
271 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
272 DCHECK(route != current_routes.end());
273 ModelSafeGroup group = route->second;
274 (*result_routes)[type] = group;
278 } // namespace.
280 void SyncSchedulerImpl::ScheduleConfiguration(
281 const ConfigurationParams& params) {
282 DCHECK(CalledOnValidThread());
283 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
284 DCHECK_EQ(CONFIGURATION_MODE, mode_);
285 DCHECK(!params.ready_task.is_null());
286 CHECK(started_) << "Scheduler must be running to configure.";
287 SDVLOG(2) << "Reconfiguring syncer.";
289 // Only one configuration is allowed at a time. Verify we're not waiting
290 // for a pending configure job.
291 DCHECK(!pending_configure_params_);
293 ModelSafeRoutingInfo restricted_routes;
294 BuildModelSafeParams(params.types_to_download,
295 params.routing_info,
296 &restricted_routes);
297 session_context_->SetRoutingInfo(restricted_routes);
299 // Only reconfigure if we have types to download.
300 if (!params.types_to_download.Empty()) {
301 pending_configure_params_.reset(new ConfigurationParams(params));
302 TrySyncSessionJob();
303 } else {
304 SDVLOG(2) << "No change in routing info, calling ready task directly.";
305 params.ready_task.Run();
309 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
310 DCHECK(CalledOnValidThread());
311 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
312 SDVLOG(1) << "Unable to run a job because we're throttled.";
313 return false;
316 if (wait_interval_
317 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
318 && priority != CANARY_PRIORITY) {
319 SDVLOG(1) << "Unable to run a job because we're backing off.";
320 return false;
323 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
324 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
325 return false;
328 return true;
331 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
332 DCHECK(CalledOnValidThread());
334 if (!CanRunJobNow(priority)) {
335 SDVLOG(1) << "Unable to run a nudge job right now";
336 return false;
339 const ModelTypeSet enabled_types = session_context_->enabled_types();
340 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
341 SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
342 return false;
345 if (mode_ == CONFIGURATION_MODE) {
346 SDVLOG(1) << "Not running nudge because we're in configuration mode.";
347 return false;
350 return true;
353 void SyncSchedulerImpl::ScheduleLocalNudge(
354 const TimeDelta& desired_delay,
355 ModelTypeSet types,
356 const tracked_objects::Location& nudge_location) {
357 DCHECK(CalledOnValidThread());
358 DCHECK(!types.Empty());
360 SDVLOG_LOC(nudge_location, 2)
361 << "Scheduling sync because of local change to "
362 << ModelTypeSetToString(types);
363 UpdateNudgeTimeRecords(types);
364 nudge_tracker_.RecordLocalChange(types);
365 ScheduleNudgeImpl(desired_delay, nudge_location);
368 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
369 const TimeDelta& desired_delay,
370 ModelTypeSet types,
371 const tracked_objects::Location& nudge_location) {
372 DCHECK(CalledOnValidThread());
373 DCHECK(!types.Empty());
375 SDVLOG_LOC(nudge_location, 2)
376 << "Scheduling sync because of local refresh request for "
377 << ModelTypeSetToString(types);
378 nudge_tracker_.RecordLocalRefreshRequest(types);
379 ScheduleNudgeImpl(desired_delay, nudge_location);
382 void SyncSchedulerImpl::ScheduleInvalidationNudge(
383 const TimeDelta& desired_delay,
384 const ObjectIdInvalidationMap& invalidation_map,
385 const tracked_objects::Location& nudge_location) {
386 DCHECK(CalledOnValidThread());
387 DCHECK(!invalidation_map.Empty());
389 SDVLOG_LOC(nudge_location, 2)
390 << "Scheduling sync because we received invalidation for "
391 << ModelTypeSetToString(
392 ObjectIdSetToModelTypeSet(invalidation_map.GetObjectIds()));
393 nudge_tracker_.RecordRemoteInvalidation(invalidation_map);
394 ScheduleNudgeImpl(desired_delay, nudge_location);
397 // TODO(zea): Consider adding separate throttling/backoff for datatype
398 // refresh requests.
399 void SyncSchedulerImpl::ScheduleNudgeImpl(
400 const TimeDelta& delay,
401 const tracked_objects::Location& nudge_location) {
402 DCHECK(CalledOnValidThread());
404 if (no_scheduling_allowed_) {
405 NOTREACHED() << "Illegal to schedule job while session in progress.";
406 return;
409 if (!started_) {
410 SDVLOG_LOC(nudge_location, 2)
411 << "Dropping nudge, scheduler is not running.";
412 return;
415 SDVLOG_LOC(nudge_location, 2)
416 << "In ScheduleNudgeImpl with delay "
417 << delay.InMilliseconds() << " ms";
419 if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
420 return;
422 TimeTicks incoming_run_time = TimeTicks::Now() + delay;
423 if (!scheduled_nudge_time_.is_null() &&
424 (scheduled_nudge_time_ < incoming_run_time)) {
425 // Old job arrives sooner than this one. Don't reschedule it.
426 return;
429 // Either there is no existing nudge in flight or the incoming nudge should be
430 // made to arrive first (preempt) the existing nudge. We reschedule in either
431 // case.
432 SDVLOG_LOC(nudge_location, 2)
433 << "Scheduling a nudge with "
434 << delay.InMilliseconds() << " ms delay";
435 scheduled_nudge_time_ = incoming_run_time;
436 pending_wakeup_timer_.Start(
437 nudge_location,
438 delay,
439 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
440 weak_ptr_factory_.GetWeakPtr()));
443 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
444 switch (mode) {
445 ENUM_CASE(CONFIGURATION_MODE);
446 ENUM_CASE(NORMAL_MODE);
448 return "";
451 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
452 DCHECK(CalledOnValidThread());
453 DCHECK(CanRunNudgeJobNow(priority));
455 DVLOG(2) << "Will run normal mode sync cycle with types "
456 << ModelTypeSetToString(session_context_->enabled_types());
457 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
458 bool premature_exit = !syncer_->NormalSyncShare(
459 GetEnabledAndUnthrottledTypes(),
460 nudge_tracker_,
461 session.get());
462 AdjustPolling(FORCE_RESET);
463 // Don't run poll job till the next time poll timer fires.
464 do_poll_after_credentials_updated_ = false;
466 bool success = !premature_exit
467 && !sessions::HasSyncerError(
468 session->status_controller().model_neutral_state());
470 if (success) {
471 // That cycle took care of any outstanding work we had.
472 SDVLOG(2) << "Nudge succeeded.";
473 nudge_tracker_.RecordSuccessfulSyncCycle(base::TimeTicks::Now());
474 scheduled_nudge_time_ = base::TimeTicks();
476 // If we're here, then we successfully reached the server. End all backoff.
477 wait_interval_.reset();
478 NotifyRetryTime(base::Time());
479 return;
480 } else {
481 HandleFailure(session->status_controller().model_neutral_state());
485 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
486 DCHECK(CalledOnValidThread());
487 DCHECK_EQ(mode_, CONFIGURATION_MODE);
488 DCHECK(pending_configure_params_ != NULL);
490 if (!CanRunJobNow(priority)) {
491 SDVLOG(2) << "Unable to run configure job right now.";
492 if (!pending_configure_params_->retry_task.is_null()) {
493 pending_configure_params_->retry_task.Run();
494 pending_configure_params_->retry_task.Reset();
496 return;
499 SDVLOG(2) << "Will run configure SyncShare with types "
500 << ModelTypeSetToString(session_context_->enabled_types());
501 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
502 bool premature_exit = !syncer_->ConfigureSyncShare(
503 session_context_->enabled_types(),
504 pending_configure_params_->source,
505 session.get());
506 AdjustPolling(FORCE_RESET);
507 // Don't run poll job till the next time poll timer fires.
508 do_poll_after_credentials_updated_ = false;
510 bool success = !premature_exit
511 && !sessions::HasSyncerError(
512 session->status_controller().model_neutral_state());
514 if (success) {
515 SDVLOG(2) << "Configure succeeded.";
516 pending_configure_params_->ready_task.Run();
517 pending_configure_params_.reset();
519 // If we're here, then we successfully reached the server. End all backoff.
520 wait_interval_.reset();
521 NotifyRetryTime(base::Time());
522 } else {
523 HandleFailure(session->status_controller().model_neutral_state());
524 // Sync cycle might receive response from server that causes scheduler to
525 // stop and draws pending_configure_params_ invalid.
526 if (started_ && !pending_configure_params_->retry_task.is_null()) {
527 pending_configure_params_->retry_task.Run();
528 pending_configure_params_->retry_task.Reset();
533 void SyncSchedulerImpl::HandleFailure(
534 const sessions::ModelNeutralState& model_neutral_state) {
535 if (IsCurrentlyThrottled()) {
536 SDVLOG(2) << "Was throttled during previous sync cycle.";
537 RestartWaiting();
538 } else if (!IsBackingOff()) {
539 // Setup our backoff if this is our first such failure.
540 TimeDelta length = delay_provider_->GetDelay(
541 delay_provider_->GetInitialDelay(model_neutral_state));
542 wait_interval_.reset(
543 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
544 SDVLOG(2) << "Sync cycle failed. Will back off for "
545 << wait_interval_->length.InMilliseconds() << "ms.";
546 RestartWaiting();
550 void SyncSchedulerImpl::DoPollSyncSessionJob() {
551 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
553 SDVLOG(2) << "Polling with types "
554 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
555 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
556 syncer_->PollSyncShare(
557 GetEnabledAndUnthrottledTypes(),
558 session.get());
560 AdjustPolling(FORCE_RESET);
562 if (IsCurrentlyThrottled()) {
563 SDVLOG(2) << "Poll request got us throttled.";
564 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
565 // to do is start the timer.
566 RestartWaiting();
570 void SyncSchedulerImpl::DoRetrySyncSessionJob() {
571 DCHECK(CalledOnValidThread());
572 DCHECK_EQ(mode_, NORMAL_MODE);
574 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
576 SDVLOG(2) << "Retrying with types "
577 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
578 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
579 if (syncer_->RetrySyncShare(GetEnabledAndUnthrottledTypes(),
580 session.get()) &&
581 !sessions::HasSyncerError(
582 session->status_controller().model_neutral_state())) {
583 nudge_tracker_.RecordSuccessfulSyncCycle(base::TimeTicks::Now());
584 } else {
585 HandleFailure(session->status_controller().model_neutral_state());
589 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
590 DCHECK(CalledOnValidThread());
591 base::TimeTicks now = TimeTicks::Now();
592 // Update timing information for how often datatypes are triggering nudges.
593 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
594 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
595 last_local_nudges_by_model_type_[iter.Get()] = now;
596 if (previous.is_null())
597 continue;
599 #define PER_DATA_TYPE_MACRO(type_str) \
600 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
601 SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
602 #undef PER_DATA_TYPE_MACRO
606 TimeDelta SyncSchedulerImpl::GetPollInterval() {
607 return (!session_context_->notifications_enabled() ||
608 !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
609 syncer_short_poll_interval_seconds_ :
610 syncer_long_poll_interval_seconds_;
613 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
614 DCHECK(CalledOnValidThread());
616 TimeDelta poll = GetPollInterval();
617 bool rate_changed = !poll_timer_.IsRunning() ||
618 poll != poll_timer_.GetCurrentDelay();
620 if (type == FORCE_RESET) {
621 last_poll_reset_ = base::TimeTicks::Now();
622 if (!rate_changed)
623 poll_timer_.Reset();
626 if (!rate_changed)
627 return;
629 // Adjust poll rate.
630 poll_timer_.Stop();
631 poll_timer_.Start(FROM_HERE, poll, this,
632 &SyncSchedulerImpl::PollTimerCallback);
635 void SyncSchedulerImpl::RestartWaiting() {
636 CHECK(wait_interval_.get());
637 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
638 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
639 SDVLOG(2) << "Starting WaitInterval timer of length "
640 << wait_interval_->length.InMilliseconds() << "ms.";
641 if (wait_interval_->mode == WaitInterval::THROTTLED) {
642 pending_wakeup_timer_.Start(
643 FROM_HERE,
644 wait_interval_->length,
645 base::Bind(&SyncSchedulerImpl::Unthrottle,
646 weak_ptr_factory_.GetWeakPtr()));
647 } else {
648 pending_wakeup_timer_.Start(
649 FROM_HERE,
650 wait_interval_->length,
651 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
652 weak_ptr_factory_.GetWeakPtr()));
656 void SyncSchedulerImpl::Stop() {
657 DCHECK(CalledOnValidThread());
658 SDVLOG(2) << "Stop called";
660 // Kill any in-flight method calls.
661 weak_ptr_factory_.InvalidateWeakPtrs();
662 wait_interval_.reset();
663 NotifyRetryTime(base::Time());
664 poll_timer_.Stop();
665 pending_wakeup_timer_.Stop();
666 pending_configure_params_.reset();
667 if (started_)
668 started_ = false;
671 // This is the only place where we invoke DoSyncSessionJob with canary
672 // privileges. Everyone else should use NORMAL_PRIORITY.
673 void SyncSchedulerImpl::TryCanaryJob() {
674 next_sync_session_job_priority_ = CANARY_PRIORITY;
675 TrySyncSessionJob();
678 void SyncSchedulerImpl::TrySyncSessionJob() {
679 // Post call to TrySyncSessionJobImpl on current thread. Later request for
680 // access token will be here.
681 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
682 &SyncSchedulerImpl::TrySyncSessionJobImpl,
683 weak_ptr_factory_.GetWeakPtr()));
686 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
687 JobPriority priority = next_sync_session_job_priority_;
688 next_sync_session_job_priority_ = NORMAL_PRIORITY;
690 DCHECK(CalledOnValidThread());
691 if (mode_ == CONFIGURATION_MODE) {
692 if (pending_configure_params_) {
693 SDVLOG(2) << "Found pending configure job";
694 DoConfigurationSyncSessionJob(priority);
696 } else if (CanRunNudgeJobNow(priority)) {
697 if (nudge_tracker_.IsSyncRequired()) {
698 SDVLOG(2) << "Found pending nudge job";
699 DoNudgeSyncSessionJob(priority);
700 } else if (nudge_tracker_.IsRetryRequired(base::TimeTicks::Now())) {
701 DoRetrySyncSessionJob();
702 } else if (do_poll_after_credentials_updated_ ||
703 ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
704 DoPollSyncSessionJob();
705 // Poll timer fires infrequently. Usually by this time access token is
706 // already expired and poll job will fail with auth error. Set flag to
707 // retry poll once ProfileSyncService gets new access token, TryCanaryJob
708 // will be called after access token is retrieved.
709 if (HttpResponse::SYNC_AUTH_ERROR ==
710 session_context_->connection_manager()->server_status()) {
711 do_poll_after_credentials_updated_ = true;
716 if (priority == CANARY_PRIORITY) {
717 // If this is canary job then whatever result was don't run poll job till
718 // the next time poll timer fires.
719 do_poll_after_credentials_updated_ = false;
722 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
723 // If we succeeded, our wait interval would have been cleared. If it hasn't
724 // been cleared, then we should increase our backoff interval and schedule
725 // another retry.
726 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
727 wait_interval_.reset(
728 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
729 SDVLOG(2) << "Sync cycle failed. Will back off for "
730 << wait_interval_->length.InMilliseconds() << "ms.";
731 RestartWaiting();
735 void SyncSchedulerImpl::PollTimerCallback() {
736 DCHECK(CalledOnValidThread());
737 if (no_scheduling_allowed_) {
738 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
739 // functions that are called only on the sync thread. This function is also
740 // called only on the sync thread, and only when it is posted by an expiring
741 // timer. If we find that no_scheduling_allowed_ is set here, then
742 // something is very wrong. Maybe someone mistakenly called us directly, or
743 // mishandled the book-keeping for no_scheduling_allowed_.
744 NOTREACHED() << "Illegal to schedule job while session in progress.";
745 return;
748 TrySyncSessionJob();
751 void SyncSchedulerImpl::RetryTimerCallback() {
752 TrySyncSessionJob();
755 void SyncSchedulerImpl::Unthrottle() {
756 DCHECK(CalledOnValidThread());
757 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
759 // We're no longer throttled, so clear the wait interval.
760 wait_interval_.reset();
761 NotifyRetryTime(base::Time());
763 // We treat this as a 'canary' in the sense that it was originally scheduled
764 // to run some time ago, failed, and we now want to retry, versus a job that
765 // was just created (e.g via ScheduleNudgeImpl). The main implication is
766 // that we're careful to update routing info (etc) with such potentially
767 // stale canary jobs.
768 TryCanaryJob();
771 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
772 DCHECK(CalledOnValidThread());
773 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
774 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
776 if (nudge_tracker_.IsAnyTypeThrottled()) {
777 base::TimeDelta time_until_next_unthrottle =
778 nudge_tracker_.GetTimeUntilNextUnthrottle(unthrottle_time);
779 type_unthrottle_timer_.Start(
780 FROM_HERE,
781 time_until_next_unthrottle,
782 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
783 weak_ptr_factory_.GetWeakPtr(),
784 unthrottle_time + time_until_next_unthrottle));
787 // Maybe this is a good time to run a nudge job. Let's try it.
788 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
789 TrySyncSessionJob();
792 void SyncSchedulerImpl::PerformDelayedNudge() {
793 // Circumstances may have changed since we scheduled this delayed nudge.
794 // We must check to see if it's OK to run the job before we do so.
795 if (CanRunNudgeJobNow(NORMAL_PRIORITY))
796 TrySyncSessionJob();
798 // We're not responsible for setting up any retries here. The functions that
799 // first put us into a state that prevents successful sync cycles (eg. global
800 // throttling, type throttling, network errors, transient errors) will also
801 // setup the appropriate retry logic (eg. retry after timeout, exponential
802 // backoff, retry when the network changes).
805 void SyncSchedulerImpl::ExponentialBackoffRetry() {
806 TryCanaryJob();
809 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
810 DCHECK(CalledOnValidThread());
811 session_context_->NotifyListeners(SyncEngineEvent(cause));
814 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
815 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED);
816 event.retry_time = retry_time;
817 session_context_->NotifyListeners(event);
820 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
821 SyncEngineEvent event(SyncEngineEvent::THROTTLED_TYPES_CHANGED);
822 event.throttled_types = types;
823 session_context_->NotifyListeners(event);
826 bool SyncSchedulerImpl::IsBackingOff() const {
827 DCHECK(CalledOnValidThread());
828 return wait_interval_.get() && wait_interval_->mode ==
829 WaitInterval::EXPONENTIAL_BACKOFF;
832 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
833 DCHECK(CalledOnValidThread());
834 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
835 throttle_duration));
836 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
839 void SyncSchedulerImpl::OnTypesThrottled(
840 ModelTypeSet types,
841 const base::TimeDelta& throttle_duration) {
842 base::TimeTicks now = base::TimeTicks::Now();
844 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
845 base::TimeDelta time_until_next_unthrottle =
846 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
847 type_unthrottle_timer_.Start(
848 FROM_HERE,
849 time_until_next_unthrottle,
850 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
851 weak_ptr_factory_.GetWeakPtr(),
852 now + time_until_next_unthrottle));
853 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
856 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
857 DCHECK(CalledOnValidThread());
858 return wait_interval_.get() && wait_interval_->mode ==
859 WaitInterval::THROTTLED;
862 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
863 const base::TimeDelta& new_interval) {
864 DCHECK(CalledOnValidThread());
865 syncer_short_poll_interval_seconds_ = new_interval;
868 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
869 const base::TimeDelta& new_interval) {
870 DCHECK(CalledOnValidThread());
871 syncer_long_poll_interval_seconds_ = new_interval;
874 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
875 const base::TimeDelta& new_delay) {
876 DCHECK(CalledOnValidThread());
877 sessions_commit_delay_ = new_delay;
880 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
881 if (size > 0)
882 nudge_tracker_.SetHintBufferSize(size);
883 else
884 NOTREACHED() << "Hint buffer size should be > 0.";
887 void SyncSchedulerImpl::OnActionableError(
888 const sessions::SyncSessionSnapshot& snap) {
889 DCHECK(CalledOnValidThread());
890 SDVLOG(2) << "OnActionableError";
891 SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR);
892 event.snapshot = snap;
893 session_context_->NotifyListeners(event);
896 void SyncSchedulerImpl::OnSyncProtocolError(
897 const sessions::SyncSessionSnapshot& snapshot) {
898 DCHECK(CalledOnValidThread());
899 if (ShouldRequestEarlyExit(
900 snapshot.model_neutral_state().sync_protocol_error)) {
901 SDVLOG(2) << "Sync Scheduler requesting early exit.";
902 Stop();
904 if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error))
905 OnActionableError(snapshot);
908 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
909 nudge_tracker_.set_next_retry_time(base::TimeTicks::Now() + delay);
910 retry_timer_.Start(FROM_HERE, delay, this,
911 &SyncSchedulerImpl::RetryTimerCallback);
914 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
915 DCHECK(CalledOnValidThread());
916 session_context_->set_notifications_enabled(notifications_enabled);
917 if (notifications_enabled)
918 nudge_tracker_.OnInvalidationsEnabled();
919 else
920 nudge_tracker_.OnInvalidationsDisabled();
923 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
924 DCHECK(CalledOnValidThread());
925 return sessions_commit_delay_;
928 #undef SDVLOG_LOC
930 #undef SDVLOG
932 #undef SLOG
934 #undef ENUM_CASE
936 } // namespace syncer