Roll src/third_party/WebKit d9c6159:8139f33 (svn 201974:201975)
[chromium-blink-merge.git] / sync / engine / sync_scheduler_impl.cc
blob3e03114fe57e8a2458a3f422cd2eba1833e44fee
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/sync_scheduler_impl.h"
7 #include <algorithm>
8 #include <cstring>
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/compiler_specific.h"
13 #include "base/location.h"
14 #include "base/logging.h"
15 #include "base/message_loop/message_loop.h"
16 #include "sync/engine/backoff_delay_provider.h"
17 #include "sync/engine/syncer.h"
18 #include "sync/protocol/proto_enum_conversions.h"
19 #include "sync/protocol/sync.pb.h"
20 #include "sync/util/data_type_histogram.h"
21 #include "sync/util/logging.h"
23 using base::TimeDelta;
24 using base::TimeTicks;
26 namespace syncer {
28 using sessions::SyncSession;
29 using sessions::SyncSessionSnapshot;
30 using sync_pb::GetUpdatesCallerInfo;
32 namespace {
34 bool IsConfigRelatedUpdateSourceValue(
35 GetUpdatesCallerInfo::GetUpdatesSource source) {
36 switch (source) {
37 case GetUpdatesCallerInfo::RECONFIGURATION:
38 case GetUpdatesCallerInfo::MIGRATION:
39 case GetUpdatesCallerInfo::NEW_CLIENT:
40 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
41 case GetUpdatesCallerInfo::PROGRAMMATIC:
42 return true;
43 default:
44 return false;
48 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
49 switch (error.error_type) {
50 case SYNC_SUCCESS:
51 case MIGRATION_DONE:
52 case THROTTLED:
53 case TRANSIENT_ERROR:
54 return false;
55 case NOT_MY_BIRTHDAY:
56 case CLEAR_PENDING:
57 case DISABLED_BY_ADMIN:
58 case USER_ROLLBACK:
59 // If we send terminate sync early then |sync_cycle_ended| notification
60 // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
61 // notification wouldnt be sent either. Then the UI layer would be left
62 // waiting forever. So assert we would send something.
63 DCHECK_NE(error.action, UNKNOWN_ACTION);
64 return true;
65 case INVALID_CREDENTIAL:
66 // The notification for this is handled by PostAndProcessHeaders|.
67 // Server does no have to send any action for this.
68 return true;
69 // Make the default a NOTREACHED. So if a new error is introduced we
70 // think about its expected functionality.
71 default:
72 NOTREACHED();
73 return false;
77 bool IsActionableError(
78 const SyncProtocolError& error) {
79 return (error.action != UNKNOWN_ACTION);
82 void RunAndReset(base::Closure* task) {
83 DCHECK(task);
84 if (task->is_null())
85 return;
86 task->Run();
87 task->Reset();
90 } // namespace
92 ConfigurationParams::ConfigurationParams()
93 : source(GetUpdatesCallerInfo::UNKNOWN) {}
94 ConfigurationParams::ConfigurationParams(
95 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
96 ModelTypeSet types_to_download,
97 const ModelSafeRoutingInfo& routing_info,
98 const base::Closure& ready_task,
99 const base::Closure& retry_task)
100 : source(source),
101 types_to_download(types_to_download),
102 routing_info(routing_info),
103 ready_task(ready_task),
104 retry_task(retry_task) {
105 DCHECK(!ready_task.is_null());
107 ConfigurationParams::~ConfigurationParams() {}
109 ClearParams::ClearParams(const base::Closure& report_success_task)
110 : report_success_task(report_success_task) {
111 DCHECK(!report_success_task.is_null());
113 ClearParams::~ClearParams() {}
115 SyncSchedulerImpl::WaitInterval::WaitInterval()
116 : mode(UNKNOWN) {}
118 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
119 : mode(mode), length(length) {}
121 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
123 #define ENUM_CASE(x) case x: return #x; break;
125 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
126 switch (mode) {
127 ENUM_CASE(UNKNOWN);
128 ENUM_CASE(EXPONENTIAL_BACKOFF);
129 ENUM_CASE(THROTTLED);
131 NOTREACHED();
132 return "";
135 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
136 NudgeSource source) {
137 switch (source) {
138 case NUDGE_SOURCE_NOTIFICATION:
139 return GetUpdatesCallerInfo::NOTIFICATION;
140 case NUDGE_SOURCE_LOCAL:
141 return GetUpdatesCallerInfo::LOCAL;
142 case NUDGE_SOURCE_LOCAL_REFRESH:
143 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
144 case NUDGE_SOURCE_UNKNOWN:
145 return GetUpdatesCallerInfo::UNKNOWN;
146 default:
147 NOTREACHED();
148 return GetUpdatesCallerInfo::UNKNOWN;
152 // Helper macros to log with the syncer thread name; useful when there
153 // are multiple syncer threads involved.
155 #define SLOG(severity) LOG(severity) << name_ << ": "
157 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
159 #define SDVLOG_LOC(from_here, verbose_level) \
160 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
162 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
163 BackoffDelayProvider* delay_provider,
164 sessions::SyncSessionContext* context,
165 Syncer* syncer)
166 : name_(name),
167 started_(false),
168 syncer_short_poll_interval_seconds_(
169 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
170 syncer_long_poll_interval_seconds_(
171 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
172 mode_(CONFIGURATION_MODE),
173 delay_provider_(delay_provider),
174 syncer_(syncer),
175 session_context_(context),
176 next_sync_session_job_priority_(NORMAL_PRIORITY),
177 weak_ptr_factory_(this),
178 weak_ptr_factory_for_weak_handle_(this) {
179 weak_handle_this_ = MakeWeakHandle(
180 weak_ptr_factory_for_weak_handle_.GetWeakPtr());
183 SyncSchedulerImpl::~SyncSchedulerImpl() {
184 DCHECK(CalledOnValidThread());
185 Stop();
188 void SyncSchedulerImpl::OnCredentialsUpdated() {
189 DCHECK(CalledOnValidThread());
191 if (HttpResponse::SYNC_AUTH_ERROR ==
192 session_context_->connection_manager()->server_status()) {
193 OnServerConnectionErrorFixed();
197 void SyncSchedulerImpl::OnConnectionStatusChange() {
198 if (HttpResponse::CONNECTION_UNAVAILABLE ==
199 session_context_->connection_manager()->server_status()) {
200 // Optimistically assume that the connection is fixed and try
201 // connecting.
202 OnServerConnectionErrorFixed();
206 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
207 // There could be a pending nudge or configuration job in several cases:
209 // 1. We're in exponential backoff.
210 // 2. We're silenced / throttled.
211 // 3. A nudge was saved previously due to not having a valid auth token.
212 // 4. A nudge was scheduled + saved while in configuration mode.
214 // In all cases except (2), we want to retry contacting the server. We
215 // call TryCanaryJob to achieve this, and note that nothing -- not even a
216 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
217 // has the authority to do that is the Unthrottle timer.
218 TryCanaryJob();
221 void SyncSchedulerImpl::Start(Mode mode, base::Time last_poll_time) {
222 DCHECK(CalledOnValidThread());
223 std::string thread_name = base::MessageLoop::current()->thread_name();
224 if (thread_name.empty())
225 thread_name = "<Main thread>";
226 SDVLOG(2) << "Start called from thread "
227 << thread_name << " with mode " << GetModeString(mode);
228 if (!started_) {
229 started_ = true;
230 SendInitialSnapshot();
233 DCHECK(!session_context_->account_name().empty());
234 DCHECK(syncer_.get());
236 if (mode == CLEAR_SERVER_DATA_MODE) {
237 DCHECK_EQ(mode_, CONFIGURATION_MODE);
239 Mode old_mode = mode_;
240 mode_ = mode;
241 // Only adjust the poll reset time if it was valid and in the past.
242 if (!last_poll_time.is_null() && last_poll_time < base::Time::Now()) {
243 // Convert from base::Time to base::TimeTicks. The reason we use Time
244 // for persisting is that TimeTicks can stop making forward progress when
245 // the machine is suspended. This implies that on resume the client might
246 // actually have miss the real poll, unless the client is restarted. Fixing
247 // that would require using an AlarmTimer though, which is only supported
248 // on certain platforms.
249 last_poll_reset_ =
250 base::TimeTicks::Now() - (base::Time::Now() - last_poll_time);
253 if (old_mode != mode_ && mode_ == NORMAL_MODE) {
254 // We just got back to normal mode. Let's try to run the work that was
255 // queued up while we were configuring.
257 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed.
259 // Update our current time before checking IsRetryRequired().
260 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
261 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
262 TrySyncSessionJob();
267 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
268 ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
269 ModelTypeSet enabled_protocol_types =
270 Intersection(ProtocolTypes(), enabled_types);
271 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
272 return Difference(enabled_protocol_types, throttled_types);
275 void SyncSchedulerImpl::SendInitialSnapshot() {
276 DCHECK(CalledOnValidThread());
277 scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
278 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
279 event.snapshot = dummy->TakeSnapshot();
280 FOR_EACH_OBSERVER(SyncEngineEventListener,
281 *session_context_->listeners(),
282 OnSyncCycleEvent(event));
285 namespace {
287 // Helper to extract the routing info corresponding to types in
288 // |types_to_download| from |current_routes|.
289 void BuildModelSafeParams(
290 ModelTypeSet types_to_download,
291 const ModelSafeRoutingInfo& current_routes,
292 ModelSafeRoutingInfo* result_routes) {
293 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
294 iter.Inc()) {
295 ModelType type = iter.Get();
296 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
297 DCHECK(route != current_routes.end());
298 ModelSafeGroup group = route->second;
299 (*result_routes)[type] = group;
303 } // namespace.
305 void SyncSchedulerImpl::ScheduleConfiguration(
306 const ConfigurationParams& params) {
307 DCHECK(CalledOnValidThread());
308 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
309 DCHECK_EQ(CONFIGURATION_MODE, mode_);
310 DCHECK(!params.ready_task.is_null());
311 CHECK(started_) << "Scheduler must be running to configure.";
312 SDVLOG(2) << "Reconfiguring syncer.";
314 // Only one configuration is allowed at a time. Verify we're not waiting
315 // for a pending configure job.
316 DCHECK(!pending_configure_params_);
318 ModelSafeRoutingInfo restricted_routes;
319 BuildModelSafeParams(params.types_to_download,
320 params.routing_info,
321 &restricted_routes);
322 session_context_->SetRoutingInfo(restricted_routes);
324 // Only reconfigure if we have types to download.
325 if (!params.types_to_download.Empty()) {
326 pending_configure_params_.reset(new ConfigurationParams(params));
327 TrySyncSessionJob();
328 } else {
329 SDVLOG(2) << "No change in routing info, calling ready task directly.";
330 params.ready_task.Run();
334 void SyncSchedulerImpl::ScheduleClearServerData(const ClearParams& params) {
335 DCHECK(CalledOnValidThread());
336 DCHECK_EQ(CLEAR_SERVER_DATA_MODE, mode_);
337 DCHECK(!pending_configure_params_);
338 DCHECK(!params.report_success_task.is_null());
339 CHECK(started_) << "Scheduler must be running to clear.";
340 pending_clear_params_.reset(new ClearParams(params));
341 TrySyncSessionJob();
344 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
345 DCHECK(CalledOnValidThread());
346 if (IsCurrentlyThrottled()) {
347 SDVLOG(1) << "Unable to run a job because we're throttled.";
348 return false;
351 if (IsBackingOff() && priority != CANARY_PRIORITY) {
352 SDVLOG(1) << "Unable to run a job because we're backing off.";
353 return false;
356 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
357 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
358 return false;
361 return true;
364 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
365 DCHECK(CalledOnValidThread());
367 if (!CanRunJobNow(priority)) {
368 SDVLOG(1) << "Unable to run a nudge job right now";
369 return false;
372 const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
373 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
374 SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
375 return false;
378 if (mode_ != NORMAL_MODE) {
379 SDVLOG(1) << "Not running nudge because we're not in normal mode.";
380 return false;
383 return true;
386 void SyncSchedulerImpl::ScheduleLocalNudge(
387 ModelTypeSet types,
388 const tracked_objects::Location& nudge_location) {
389 DCHECK(CalledOnValidThread());
390 DCHECK(!types.Empty());
392 SDVLOG_LOC(nudge_location, 2)
393 << "Scheduling sync because of local change to "
394 << ModelTypeSetToString(types);
395 UpdateNudgeTimeRecords(types);
396 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types);
397 ScheduleNudgeImpl(nudge_delay, nudge_location);
400 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
401 ModelTypeSet types,
402 const tracked_objects::Location& nudge_location) {
403 DCHECK(CalledOnValidThread());
404 DCHECK(!types.Empty());
406 SDVLOG_LOC(nudge_location, 2)
407 << "Scheduling sync because of local refresh request for "
408 << ModelTypeSetToString(types);
409 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types);
410 ScheduleNudgeImpl(nudge_delay, nudge_location);
413 void SyncSchedulerImpl::ScheduleInvalidationNudge(
414 syncer::ModelType model_type,
415 scoped_ptr<InvalidationInterface> invalidation,
416 const tracked_objects::Location& nudge_location) {
417 DCHECK(CalledOnValidThread());
419 SDVLOG_LOC(nudge_location, 2)
420 << "Scheduling sync because we received invalidation for "
421 << ModelTypeToString(model_type);
422 base::TimeDelta nudge_delay =
423 nudge_tracker_.RecordRemoteInvalidation(model_type, invalidation.Pass());
424 ScheduleNudgeImpl(nudge_delay, nudge_location);
427 void SyncSchedulerImpl::ScheduleInitialSyncNudge(syncer::ModelType model_type) {
428 DCHECK(CalledOnValidThread());
430 SDVLOG(2) << "Scheduling non-blocking initial sync for "
431 << ModelTypeToString(model_type);
432 nudge_tracker_.RecordInitialSyncRequired(model_type);
433 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE);
436 // TODO(zea): Consider adding separate throttling/backoff for datatype
437 // refresh requests.
438 void SyncSchedulerImpl::ScheduleNudgeImpl(
439 const TimeDelta& delay,
440 const tracked_objects::Location& nudge_location) {
441 DCHECK(CalledOnValidThread());
442 CHECK(!syncer_->IsSyncing());
444 if (!started_) {
445 SDVLOG_LOC(nudge_location, 2)
446 << "Dropping nudge, scheduler is not running.";
447 return;
450 SDVLOG_LOC(nudge_location, 2)
451 << "In ScheduleNudgeImpl with delay "
452 << delay.InMilliseconds() << " ms";
454 if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
455 return;
457 TimeTicks incoming_run_time = TimeTicks::Now() + delay;
458 if (!scheduled_nudge_time_.is_null() &&
459 (scheduled_nudge_time_ < incoming_run_time)) {
460 // Old job arrives sooner than this one. Don't reschedule it.
461 return;
464 // Either there is no existing nudge in flight or the incoming nudge should be
465 // made to arrive first (preempt) the existing nudge. We reschedule in either
466 // case.
467 SDVLOG_LOC(nudge_location, 2)
468 << "Scheduling a nudge with "
469 << delay.InMilliseconds() << " ms delay";
470 scheduled_nudge_time_ = incoming_run_time;
471 pending_wakeup_timer_.Start(
472 nudge_location,
473 delay,
474 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
475 weak_ptr_factory_.GetWeakPtr()));
478 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
479 switch (mode) {
480 ENUM_CASE(CONFIGURATION_MODE);
481 ENUM_CASE(CLEAR_SERVER_DATA_MODE);
482 ENUM_CASE(NORMAL_MODE);
484 return "";
487 void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms) {
488 DCHECK(CalledOnValidThread());
489 nudge_tracker_.SetDefaultNudgeDelay(delay_ms);
492 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
493 DCHECK(CalledOnValidThread());
494 DCHECK(CanRunNudgeJobNow(priority));
496 DVLOG(2) << "Will run normal mode sync cycle with types "
497 << ModelTypeSetToString(session_context_->GetEnabledTypes());
498 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
499 bool success = syncer_->NormalSyncShare(
500 GetEnabledAndUnthrottledTypes(), &nudge_tracker_, session.get());
502 if (success) {
503 // That cycle took care of any outstanding work we had.
504 SDVLOG(2) << "Nudge succeeded.";
505 nudge_tracker_.RecordSuccessfulSyncCycle();
506 scheduled_nudge_time_ = base::TimeTicks();
507 HandleSuccess();
509 // If this was a canary, we may need to restart the poll timer (the poll
510 // timer may have fired while the scheduler was in an error state, ignoring
511 // the poll).
512 if (!poll_timer_.IsRunning()) {
513 SDVLOG(1) << "Canary succeeded, restarting polling.";
514 AdjustPolling(UPDATE_INTERVAL);
516 } else {
517 HandleFailure(session->status_controller().model_neutral_state());
521 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
522 DCHECK(CalledOnValidThread());
523 DCHECK_EQ(mode_, CONFIGURATION_MODE);
524 DCHECK(pending_configure_params_ != NULL);
526 if (!CanRunJobNow(priority)) {
527 SDVLOG(2) << "Unable to run configure job right now.";
528 RunAndReset(&pending_configure_params_->retry_task);
529 return;
532 SDVLOG(2) << "Will run configure SyncShare with types "
533 << ModelTypeSetToString(session_context_->GetEnabledTypes());
534 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
535 bool success = syncer_->ConfigureSyncShare(
536 pending_configure_params_->types_to_download,
537 pending_configure_params_->source,
538 session.get());
540 if (success) {
541 SDVLOG(2) << "Configure succeeded.";
542 pending_configure_params_->ready_task.Run();
543 pending_configure_params_.reset();
544 HandleSuccess();
545 } else {
546 HandleFailure(session->status_controller().model_neutral_state());
547 // Sync cycle might receive response from server that causes scheduler to
548 // stop and draws pending_configure_params_ invalid.
549 if (started_)
550 RunAndReset(&pending_configure_params_->retry_task);
554 void SyncSchedulerImpl::DoClearServerDataSyncSessionJob(JobPriority priority) {
555 DCHECK(CalledOnValidThread());
556 DCHECK_EQ(mode_, CLEAR_SERVER_DATA_MODE);
558 if (!CanRunJobNow(priority)) {
559 SDVLOG(2) << "Unable to run clear server data job right now.";
560 RunAndReset(&pending_configure_params_->retry_task);
561 return;
564 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
565 const bool success = syncer_->PostClearServerData(session.get());
566 if (!success) {
567 HandleFailure(session->status_controller().model_neutral_state());
568 return;
571 SDVLOG(2) << "Clear succeeded.";
572 pending_clear_params_->report_success_task.Run();
573 pending_clear_params_.reset();
574 HandleSuccess();
577 void SyncSchedulerImpl::HandleSuccess() {
578 // If we're here, then we successfully reached the server. End all backoff.
579 wait_interval_.reset();
580 NotifyRetryTime(base::Time());
583 void SyncSchedulerImpl::HandleFailure(
584 const sessions::ModelNeutralState& model_neutral_state) {
585 if (IsCurrentlyThrottled()) {
586 SDVLOG(2) << "Was throttled during previous sync cycle.";
587 } else if (!IsBackingOff()) {
588 // Setup our backoff if this is our first such failure.
589 TimeDelta length = delay_provider_->GetDelay(
590 delay_provider_->GetInitialDelay(model_neutral_state));
591 wait_interval_.reset(
592 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
593 SDVLOG(2) << "Sync cycle failed. Will back off for "
594 << wait_interval_->length.InMilliseconds() << "ms.";
595 } else {
596 // Increase our backoff interval and schedule another retry.
597 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
598 wait_interval_.reset(
599 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
600 SDVLOG(2) << "Sync cycle failed. Will back off for "
601 << wait_interval_->length.InMilliseconds() << "ms.";
603 RestartWaiting();
606 void SyncSchedulerImpl::DoPollSyncSessionJob() {
607 SDVLOG(2) << "Polling with types "
608 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
609 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
610 bool success = syncer_->PollSyncShare(
611 GetEnabledAndUnthrottledTypes(),
612 session.get());
614 // Only restart the timer if the poll succeeded. Otherwise rely on normal
615 // failure handling to retry with backoff.
616 if (success) {
617 AdjustPolling(FORCE_RESET);
618 HandleSuccess();
619 } else {
620 HandleFailure(session->status_controller().model_neutral_state());
624 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
625 DCHECK(CalledOnValidThread());
626 base::TimeTicks now = TimeTicks::Now();
627 // Update timing information for how often datatypes are triggering nudges.
628 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
629 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
630 last_local_nudges_by_model_type_[iter.Get()] = now;
631 if (previous.is_null())
632 continue;
634 #define PER_DATA_TYPE_MACRO(type_str) \
635 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
636 SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
637 #undef PER_DATA_TYPE_MACRO
641 TimeDelta SyncSchedulerImpl::GetPollInterval() {
642 return (!session_context_->notifications_enabled() ||
643 !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
644 syncer_short_poll_interval_seconds_ :
645 syncer_long_poll_interval_seconds_;
648 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
649 DCHECK(CalledOnValidThread());
650 if (!started_)
651 return;
653 TimeDelta poll_interval = GetPollInterval();
654 TimeDelta poll_delay = poll_interval;
655 const TimeTicks now = TimeTicks::Now();
657 if (type == UPDATE_INTERVAL) {
658 if (!last_poll_reset_.is_null()) {
659 // Override the delay based on the last successful poll time (if it was
660 // set).
661 TimeTicks new_poll_time = poll_interval + last_poll_reset_;
662 poll_delay = new_poll_time - TimeTicks::Now();
664 if (poll_delay < TimeDelta()) {
665 // The desired poll time was in the past, so trigger a poll now (the
666 // timer will post the task asynchronously, so re-entrancy isn't an
667 // issue).
668 poll_delay = TimeDelta();
670 } else {
671 // There was no previous poll. Keep the delay set to the normal interval,
672 // as if we had just completed a poll.
673 DCHECK_EQ(GetPollInterval(), poll_delay);
674 last_poll_reset_ = now;
676 } else {
677 // Otherwise just restart the timer.
678 DCHECK_EQ(FORCE_RESET, type);
679 DCHECK_EQ(GetPollInterval(), poll_delay);
680 last_poll_reset_ = now;
683 SDVLOG(1) << "Updating polling delay to " << poll_delay.InMinutes()
684 << " minutes.";
686 // Adjust poll rate. Start will reset the timer if it was already running.
687 poll_timer_.Start(FROM_HERE, poll_delay, this,
688 &SyncSchedulerImpl::PollTimerCallback);
691 void SyncSchedulerImpl::RestartWaiting() {
692 CHECK(wait_interval_.get());
693 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
694 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
695 SDVLOG(2) << "Starting WaitInterval timer of length "
696 << wait_interval_->length.InMilliseconds() << "ms.";
697 if (wait_interval_->mode == WaitInterval::THROTTLED) {
698 pending_wakeup_timer_.Start(
699 FROM_HERE,
700 wait_interval_->length,
701 base::Bind(&SyncSchedulerImpl::Unthrottle,
702 weak_ptr_factory_.GetWeakPtr()));
703 } else {
704 pending_wakeup_timer_.Start(
705 FROM_HERE,
706 wait_interval_->length,
707 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
708 weak_ptr_factory_.GetWeakPtr()));
712 void SyncSchedulerImpl::Stop() {
713 DCHECK(CalledOnValidThread());
714 SDVLOG(2) << "Stop called";
716 // Kill any in-flight method calls.
717 weak_ptr_factory_.InvalidateWeakPtrs();
718 wait_interval_.reset();
719 NotifyRetryTime(base::Time());
720 poll_timer_.Stop();
721 pending_wakeup_timer_.Stop();
722 pending_configure_params_.reset();
723 pending_clear_params_.reset();
724 if (started_)
725 started_ = false;
728 // This is the only place where we invoke DoSyncSessionJob with canary
729 // privileges. Everyone else should use NORMAL_PRIORITY.
730 void SyncSchedulerImpl::TryCanaryJob() {
731 next_sync_session_job_priority_ = CANARY_PRIORITY;
732 SDVLOG(2) << "Attempting canary job";
733 TrySyncSessionJob();
736 void SyncSchedulerImpl::TrySyncSessionJob() {
737 // Post call to TrySyncSessionJobImpl on current thread. Later request for
738 // access token will be here.
739 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
740 &SyncSchedulerImpl::TrySyncSessionJobImpl,
741 weak_ptr_factory_.GetWeakPtr()));
744 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
745 JobPriority priority = next_sync_session_job_priority_;
746 next_sync_session_job_priority_ = NORMAL_PRIORITY;
748 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
750 DCHECK(CalledOnValidThread());
751 if (mode_ == CONFIGURATION_MODE) {
752 if (pending_configure_params_) {
753 SDVLOG(2) << "Found pending configure job";
754 DoConfigurationSyncSessionJob(priority);
756 } else if (mode_ == CLEAR_SERVER_DATA_MODE) {
757 if (pending_clear_params_) {
758 DoClearServerDataSyncSessionJob(priority);
760 } else if (CanRunNudgeJobNow(priority)) {
761 if (nudge_tracker_.IsSyncRequired()) {
762 SDVLOG(2) << "Found pending nudge job";
763 DoNudgeSyncSessionJob(priority);
764 } else if (((base::TimeTicks::Now() - last_poll_reset_) >=
765 GetPollInterval())) {
766 SDVLOG(2) << "Found pending poll";
767 DoPollSyncSessionJob();
769 } else {
770 // We must be in an error state. Transitioning out of each of these
771 // error states should trigger a canary job.
772 DCHECK(IsCurrentlyThrottled() || IsBackingOff() ||
773 session_context_->connection_manager()->HasInvalidAuthToken());
776 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
777 // If we succeeded, our wait interval would have been cleared. If it hasn't
778 // been cleared, then we should increase our backoff interval and schedule
779 // another retry.
780 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
781 wait_interval_.reset(
782 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
783 SDVLOG(2) << "Sync cycle failed. Will back off for "
784 << wait_interval_->length.InMilliseconds() << "ms.";
785 RestartWaiting();
789 void SyncSchedulerImpl::PollTimerCallback() {
790 DCHECK(CalledOnValidThread());
791 CHECK(!syncer_->IsSyncing());
793 TrySyncSessionJob();
796 void SyncSchedulerImpl::RetryTimerCallback() {
797 TrySyncSessionJob();
800 void SyncSchedulerImpl::Unthrottle() {
801 DCHECK(CalledOnValidThread());
802 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
804 // We're no longer throttled, so clear the wait interval.
805 wait_interval_.reset();
806 NotifyRetryTime(base::Time());
807 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
809 // We treat this as a 'canary' in the sense that it was originally scheduled
810 // to run some time ago, failed, and we now want to retry, versus a job that
811 // was just created (e.g via ScheduleNudgeImpl). The main implication is
812 // that we're careful to update routing info (etc) with such potentially
813 // stale canary jobs.
814 TryCanaryJob();
817 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
818 DCHECK(CalledOnValidThread());
819 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
820 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
822 if (nudge_tracker_.IsAnyTypeThrottled()) {
823 const base::TimeTicks now = base::TimeTicks::Now();
824 base::TimeDelta time_until_next_unthrottle =
825 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
826 type_unthrottle_timer_.Start(
827 FROM_HERE,
828 time_until_next_unthrottle,
829 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
830 weak_ptr_factory_.GetWeakPtr(),
831 now + time_until_next_unthrottle));
834 // Maybe this is a good time to run a nudge job. Let's try it.
835 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
836 TrySyncSessionJob();
839 void SyncSchedulerImpl::PerformDelayedNudge() {
840 // Circumstances may have changed since we scheduled this delayed nudge.
841 // We must check to see if it's OK to run the job before we do so.
842 if (CanRunNudgeJobNow(NORMAL_PRIORITY))
843 TrySyncSessionJob();
845 // We're not responsible for setting up any retries here. The functions that
846 // first put us into a state that prevents successful sync cycles (eg. global
847 // throttling, type throttling, network errors, transient errors) will also
848 // setup the appropriate retry logic (eg. retry after timeout, exponential
849 // backoff, retry when the network changes).
852 void SyncSchedulerImpl::ExponentialBackoffRetry() {
853 TryCanaryJob();
856 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
857 FOR_EACH_OBSERVER(SyncEngineEventListener,
858 *session_context_->listeners(),
859 OnRetryTimeChanged(retry_time));
862 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
863 FOR_EACH_OBSERVER(SyncEngineEventListener,
864 *session_context_->listeners(),
865 OnThrottledTypesChanged(types));
868 bool SyncSchedulerImpl::IsBackingOff() const {
869 DCHECK(CalledOnValidThread());
870 return wait_interval_.get() && wait_interval_->mode ==
871 WaitInterval::EXPONENTIAL_BACKOFF;
874 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
875 DCHECK(CalledOnValidThread());
876 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
877 throttle_duration));
878 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
879 NotifyThrottledTypesChanged(ModelTypeSet::All());
882 void SyncSchedulerImpl::OnTypesThrottled(
883 ModelTypeSet types,
884 const base::TimeDelta& throttle_duration) {
885 base::TimeTicks now = base::TimeTicks::Now();
887 SDVLOG(1) << "Throttling " << ModelTypeSetToString(types) << " for "
888 << throttle_duration.InMinutes() << " minutes.";
890 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
891 base::TimeDelta time_until_next_unthrottle =
892 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
893 type_unthrottle_timer_.Start(
894 FROM_HERE,
895 time_until_next_unthrottle,
896 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
897 weak_ptr_factory_.GetWeakPtr(),
898 now + time_until_next_unthrottle));
899 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
902 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
903 DCHECK(CalledOnValidThread());
904 return wait_interval_.get() && wait_interval_->mode ==
905 WaitInterval::THROTTLED;
908 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
909 const base::TimeDelta& new_interval) {
910 DCHECK(CalledOnValidThread());
911 if (new_interval == syncer_short_poll_interval_seconds_)
912 return;
913 SDVLOG(1) << "Updating short poll interval to " << new_interval.InMinutes()
914 << " minutes.";
915 syncer_short_poll_interval_seconds_ = new_interval;
916 AdjustPolling(UPDATE_INTERVAL);
919 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
920 const base::TimeDelta& new_interval) {
921 DCHECK(CalledOnValidThread());
922 if (new_interval == syncer_long_poll_interval_seconds_)
923 return;
924 SDVLOG(1) << "Updating long poll interval to " << new_interval.InMinutes()
925 << " minutes.";
926 syncer_long_poll_interval_seconds_ = new_interval;
927 AdjustPolling(UPDATE_INTERVAL);
930 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays(
931 const std::map<ModelType, base::TimeDelta>& nudge_delays) {
932 DCHECK(CalledOnValidThread());
933 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays);
936 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
937 if (size > 0)
938 nudge_tracker_.SetHintBufferSize(size);
939 else
940 NOTREACHED() << "Hint buffer size should be > 0.";
943 void SyncSchedulerImpl::OnSyncProtocolError(
944 const SyncProtocolError& sync_protocol_error) {
945 DCHECK(CalledOnValidThread());
946 if (ShouldRequestEarlyExit(sync_protocol_error)) {
947 SDVLOG(2) << "Sync Scheduler requesting early exit.";
948 Stop();
950 if (IsActionableError(sync_protocol_error)) {
951 SDVLOG(2) << "OnActionableError";
952 FOR_EACH_OBSERVER(SyncEngineEventListener,
953 *session_context_->listeners(),
954 OnActionableError(sync_protocol_error));
958 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
959 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
960 retry_timer_.Start(FROM_HERE, delay, this,
961 &SyncSchedulerImpl::RetryTimerCallback);
964 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
965 FOR_EACH_OBSERVER(SyncEngineEventListener,
966 *session_context_->listeners(),
967 OnMigrationRequested(types));
970 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
971 DCHECK(CalledOnValidThread());
972 session_context_->set_notifications_enabled(notifications_enabled);
973 if (notifications_enabled)
974 nudge_tracker_.OnInvalidationsEnabled();
975 else
976 nudge_tracker_.OnInvalidationsDisabled();
979 #undef SDVLOG_LOC
981 #undef SDVLOG
983 #undef SLOG
985 #undef ENUM_CASE
987 } // namespace syncer