Updating trunk VERSION from 2139.0 to 2140.0
[chromium-blink-merge.git] / sync / engine / sync_scheduler_impl.cc
blob22dc5e0ebcbbb8100be9f16f025330afeb90ac83
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/sync_scheduler_impl.h"
7 #include <algorithm>
8 #include <cstring>
10 #include "base/auto_reset.h"
11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h"
19 #include "sync/protocol/proto_enum_conversions.h"
20 #include "sync/protocol/sync.pb.h"
21 #include "sync/util/data_type_histogram.h"
22 #include "sync/util/logging.h"
24 using base::TimeDelta;
25 using base::TimeTicks;
27 namespace syncer {
29 using sessions::SyncSession;
30 using sessions::SyncSessionSnapshot;
31 using sync_pb::GetUpdatesCallerInfo;
33 namespace {
35 bool IsConfigRelatedUpdateSourceValue(
36 GetUpdatesCallerInfo::GetUpdatesSource source) {
37 switch (source) {
38 case GetUpdatesCallerInfo::RECONFIGURATION:
39 case GetUpdatesCallerInfo::MIGRATION:
40 case GetUpdatesCallerInfo::NEW_CLIENT:
41 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
42 case GetUpdatesCallerInfo::PROGRAMMATIC:
43 return true;
44 default:
45 return false;
49 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
50 switch (error.error_type) {
51 case SYNC_SUCCESS:
52 case MIGRATION_DONE:
53 case THROTTLED:
54 case TRANSIENT_ERROR:
55 return false;
56 case NOT_MY_BIRTHDAY:
57 case CLEAR_PENDING:
58 case DISABLED_BY_ADMIN:
59 case USER_ROLLBACK:
60 // If we send terminate sync early then |sync_cycle_ended| notification
61 // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
62 // notification wouldnt be sent either. Then the UI layer would be left
63 // waiting forever. So assert we would send something.
64 DCHECK_NE(error.action, UNKNOWN_ACTION);
65 return true;
66 case INVALID_CREDENTIAL:
67 // The notification for this is handled by PostAndProcessHeaders|.
68 // Server does no have to send any action for this.
69 return true;
70 // Make the default a NOTREACHED. So if a new error is introduced we
71 // think about its expected functionality.
72 default:
73 NOTREACHED();
74 return false;
78 bool IsActionableError(
79 const SyncProtocolError& error) {
80 return (error.action != UNKNOWN_ACTION);
83 } // namespace
85 ConfigurationParams::ConfigurationParams()
86 : source(GetUpdatesCallerInfo::UNKNOWN) {}
87 ConfigurationParams::ConfigurationParams(
88 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
89 ModelTypeSet types_to_download,
90 const ModelSafeRoutingInfo& routing_info,
91 const base::Closure& ready_task,
92 const base::Closure& retry_task)
93 : source(source),
94 types_to_download(types_to_download),
95 routing_info(routing_info),
96 ready_task(ready_task),
97 retry_task(retry_task) {
98 DCHECK(!ready_task.is_null());
99 DCHECK(!retry_task.is_null());
101 ConfigurationParams::~ConfigurationParams() {}
103 SyncSchedulerImpl::WaitInterval::WaitInterval()
104 : mode(UNKNOWN) {}
106 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
107 : mode(mode), length(length) {}
109 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
111 #define ENUM_CASE(x) case x: return #x; break;
113 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
114 switch (mode) {
115 ENUM_CASE(UNKNOWN);
116 ENUM_CASE(EXPONENTIAL_BACKOFF);
117 ENUM_CASE(THROTTLED);
119 NOTREACHED();
120 return "";
123 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
124 NudgeSource source) {
125 switch (source) {
126 case NUDGE_SOURCE_NOTIFICATION:
127 return GetUpdatesCallerInfo::NOTIFICATION;
128 case NUDGE_SOURCE_LOCAL:
129 return GetUpdatesCallerInfo::LOCAL;
130 case NUDGE_SOURCE_LOCAL_REFRESH:
131 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
132 case NUDGE_SOURCE_UNKNOWN:
133 return GetUpdatesCallerInfo::UNKNOWN;
134 default:
135 NOTREACHED();
136 return GetUpdatesCallerInfo::UNKNOWN;
140 // Helper macros to log with the syncer thread name; useful when there
141 // are multiple syncer threads involved.
143 #define SLOG(severity) LOG(severity) << name_ << ": "
145 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
147 #define SDVLOG_LOC(from_here, verbose_level) \
148 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
150 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
151 BackoffDelayProvider* delay_provider,
152 sessions::SyncSessionContext* context,
153 Syncer* syncer)
154 : name_(name),
155 started_(false),
156 syncer_short_poll_interval_seconds_(
157 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
158 syncer_long_poll_interval_seconds_(
159 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
160 mode_(NORMAL_MODE),
161 delay_provider_(delay_provider),
162 syncer_(syncer),
163 session_context_(context),
164 no_scheduling_allowed_(false),
165 do_poll_after_credentials_updated_(false),
166 next_sync_session_job_priority_(NORMAL_PRIORITY),
167 weak_ptr_factory_(this),
168 weak_ptr_factory_for_weak_handle_(this) {
169 weak_handle_this_ = MakeWeakHandle(
170 weak_ptr_factory_for_weak_handle_.GetWeakPtr());
173 SyncSchedulerImpl::~SyncSchedulerImpl() {
174 DCHECK(CalledOnValidThread());
175 Stop();
178 void SyncSchedulerImpl::OnCredentialsUpdated() {
179 DCHECK(CalledOnValidThread());
181 if (HttpResponse::SYNC_AUTH_ERROR ==
182 session_context_->connection_manager()->server_status()) {
183 OnServerConnectionErrorFixed();
187 void SyncSchedulerImpl::OnConnectionStatusChange() {
188 if (HttpResponse::CONNECTION_UNAVAILABLE ==
189 session_context_->connection_manager()->server_status()) {
190 // Optimistically assume that the connection is fixed and try
191 // connecting.
192 OnServerConnectionErrorFixed();
196 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
197 // There could be a pending nudge or configuration job in several cases:
199 // 1. We're in exponential backoff.
200 // 2. We're silenced / throttled.
201 // 3. A nudge was saved previously due to not having a valid auth token.
202 // 4. A nudge was scheduled + saved while in configuration mode.
204 // In all cases except (2), we want to retry contacting the server. We
205 // call TryCanaryJob to achieve this, and note that nothing -- not even a
206 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
207 // has the authority to do that is the Unthrottle timer.
208 TryCanaryJob();
211 void SyncSchedulerImpl::Start(Mode mode) {
212 DCHECK(CalledOnValidThread());
213 std::string thread_name = base::MessageLoop::current()->thread_name();
214 if (thread_name.empty())
215 thread_name = "<Main thread>";
216 SDVLOG(2) << "Start called from thread "
217 << thread_name << " with mode " << GetModeString(mode);
218 if (!started_) {
219 started_ = true;
220 SendInitialSnapshot();
223 DCHECK(!session_context_->account_name().empty());
224 DCHECK(syncer_.get());
225 Mode old_mode = mode_;
226 mode_ = mode;
227 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed.
229 if (old_mode != mode_ && mode_ == NORMAL_MODE) {
230 // We just got back to normal mode. Let's try to run the work that was
231 // queued up while we were configuring.
233 // Update our current time before checking IsRetryRequired().
234 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
235 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
236 TrySyncSessionJob();
241 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
242 ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
243 ModelTypeSet enabled_protocol_types =
244 Intersection(ProtocolTypes(), enabled_types);
245 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
246 return Difference(enabled_protocol_types, throttled_types);
249 void SyncSchedulerImpl::SendInitialSnapshot() {
250 DCHECK(CalledOnValidThread());
251 scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
252 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
253 event.snapshot = dummy->TakeSnapshot();
254 FOR_EACH_OBSERVER(SyncEngineEventListener,
255 *session_context_->listeners(),
256 OnSyncCycleEvent(event));
259 namespace {
261 // Helper to extract the routing info corresponding to types in
262 // |types_to_download| from |current_routes|.
263 void BuildModelSafeParams(
264 ModelTypeSet types_to_download,
265 const ModelSafeRoutingInfo& current_routes,
266 ModelSafeRoutingInfo* result_routes) {
267 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
268 iter.Inc()) {
269 ModelType type = iter.Get();
270 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
271 DCHECK(route != current_routes.end());
272 ModelSafeGroup group = route->second;
273 (*result_routes)[type] = group;
277 } // namespace.
279 void SyncSchedulerImpl::ScheduleConfiguration(
280 const ConfigurationParams& params) {
281 DCHECK(CalledOnValidThread());
282 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
283 DCHECK_EQ(CONFIGURATION_MODE, mode_);
284 DCHECK(!params.ready_task.is_null());
285 CHECK(started_) << "Scheduler must be running to configure.";
286 SDVLOG(2) << "Reconfiguring syncer.";
288 // Only one configuration is allowed at a time. Verify we're not waiting
289 // for a pending configure job.
290 DCHECK(!pending_configure_params_);
292 ModelSafeRoutingInfo restricted_routes;
293 BuildModelSafeParams(params.types_to_download,
294 params.routing_info,
295 &restricted_routes);
296 session_context_->SetRoutingInfo(restricted_routes);
298 // Only reconfigure if we have types to download.
299 if (!params.types_to_download.Empty()) {
300 pending_configure_params_.reset(new ConfigurationParams(params));
301 TrySyncSessionJob();
302 } else {
303 SDVLOG(2) << "No change in routing info, calling ready task directly.";
304 params.ready_task.Run();
308 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
309 DCHECK(CalledOnValidThread());
310 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
311 SDVLOG(1) << "Unable to run a job because we're throttled.";
312 return false;
315 if (wait_interval_
316 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
317 && priority != CANARY_PRIORITY) {
318 SDVLOG(1) << "Unable to run a job because we're backing off.";
319 return false;
322 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
323 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
324 return false;
327 return true;
330 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
331 DCHECK(CalledOnValidThread());
333 if (!CanRunJobNow(priority)) {
334 SDVLOG(1) << "Unable to run a nudge job right now";
335 return false;
338 const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
339 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
340 SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
341 return false;
344 if (mode_ == CONFIGURATION_MODE) {
345 SDVLOG(1) << "Not running nudge because we're in configuration mode.";
346 return false;
349 return true;
352 void SyncSchedulerImpl::ScheduleLocalNudge(
353 ModelTypeSet types,
354 const tracked_objects::Location& nudge_location) {
355 DCHECK(CalledOnValidThread());
356 DCHECK(!types.Empty());
358 SDVLOG_LOC(nudge_location, 2)
359 << "Scheduling sync because of local change to "
360 << ModelTypeSetToString(types);
361 UpdateNudgeTimeRecords(types);
362 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types);
363 ScheduleNudgeImpl(nudge_delay, nudge_location);
366 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
367 ModelTypeSet types,
368 const tracked_objects::Location& nudge_location) {
369 DCHECK(CalledOnValidThread());
370 DCHECK(!types.Empty());
372 SDVLOG_LOC(nudge_location, 2)
373 << "Scheduling sync because of local refresh request for "
374 << ModelTypeSetToString(types);
375 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types);
376 ScheduleNudgeImpl(nudge_delay, nudge_location);
379 void SyncSchedulerImpl::ScheduleInvalidationNudge(
380 syncer::ModelType model_type,
381 scoped_ptr<InvalidationInterface> invalidation,
382 const tracked_objects::Location& nudge_location) {
383 DCHECK(CalledOnValidThread());
385 SDVLOG_LOC(nudge_location, 2)
386 << "Scheduling sync because we received invalidation for "
387 << ModelTypeToString(model_type);
388 base::TimeDelta nudge_delay =
389 nudge_tracker_.RecordRemoteInvalidation(model_type, invalidation.Pass());
390 ScheduleNudgeImpl(nudge_delay, nudge_location);
393 void SyncSchedulerImpl::ScheduleInitialSyncNudge(syncer::ModelType model_type) {
394 DCHECK(CalledOnValidThread());
396 SDVLOG(2) << "Scheduling non-blocking initial sync for "
397 << ModelTypeToString(model_type);
398 nudge_tracker_.RecordInitialSyncRequired(model_type);
399 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE);
402 // TODO(zea): Consider adding separate throttling/backoff for datatype
403 // refresh requests.
404 void SyncSchedulerImpl::ScheduleNudgeImpl(
405 const TimeDelta& delay,
406 const tracked_objects::Location& nudge_location) {
407 DCHECK(CalledOnValidThread());
409 if (no_scheduling_allowed_) {
410 NOTREACHED() << "Illegal to schedule job while session in progress.";
411 return;
414 if (!started_) {
415 SDVLOG_LOC(nudge_location, 2)
416 << "Dropping nudge, scheduler is not running.";
417 return;
420 SDVLOG_LOC(nudge_location, 2)
421 << "In ScheduleNudgeImpl with delay "
422 << delay.InMilliseconds() << " ms";
424 if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
425 return;
427 TimeTicks incoming_run_time = TimeTicks::Now() + delay;
428 if (!scheduled_nudge_time_.is_null() &&
429 (scheduled_nudge_time_ < incoming_run_time)) {
430 // Old job arrives sooner than this one. Don't reschedule it.
431 return;
434 // Either there is no existing nudge in flight or the incoming nudge should be
435 // made to arrive first (preempt) the existing nudge. We reschedule in either
436 // case.
437 SDVLOG_LOC(nudge_location, 2)
438 << "Scheduling a nudge with "
439 << delay.InMilliseconds() << " ms delay";
440 scheduled_nudge_time_ = incoming_run_time;
441 pending_wakeup_timer_.Start(
442 nudge_location,
443 delay,
444 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
445 weak_ptr_factory_.GetWeakPtr()));
448 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
449 switch (mode) {
450 ENUM_CASE(CONFIGURATION_MODE);
451 ENUM_CASE(NORMAL_MODE);
453 return "";
456 void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms) {
457 DCHECK(CalledOnValidThread());
458 nudge_tracker_.SetDefaultNudgeDelay(delay_ms);
461 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
462 DCHECK(CalledOnValidThread());
463 DCHECK(CanRunNudgeJobNow(priority));
465 DVLOG(2) << "Will run normal mode sync cycle with types "
466 << ModelTypeSetToString(session_context_->GetEnabledTypes());
467 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
468 bool premature_exit = !syncer_->NormalSyncShare(
469 GetEnabledAndUnthrottledTypes(),
470 nudge_tracker_,
471 session.get());
472 AdjustPolling(FORCE_RESET);
473 // Don't run poll job till the next time poll timer fires.
474 do_poll_after_credentials_updated_ = false;
476 bool success = !premature_exit
477 && !sessions::HasSyncerError(
478 session->status_controller().model_neutral_state());
480 if (success) {
481 // That cycle took care of any outstanding work we had.
482 SDVLOG(2) << "Nudge succeeded.";
483 nudge_tracker_.RecordSuccessfulSyncCycle();
484 scheduled_nudge_time_ = base::TimeTicks();
486 // If we're here, then we successfully reached the server. End all backoff.
487 wait_interval_.reset();
488 NotifyRetryTime(base::Time());
489 } else {
490 HandleFailure(session->status_controller().model_neutral_state());
494 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
495 DCHECK(CalledOnValidThread());
496 DCHECK_EQ(mode_, CONFIGURATION_MODE);
497 DCHECK(pending_configure_params_ != NULL);
499 if (!CanRunJobNow(priority)) {
500 SDVLOG(2) << "Unable to run configure job right now.";
501 if (!pending_configure_params_->retry_task.is_null()) {
502 pending_configure_params_->retry_task.Run();
503 pending_configure_params_->retry_task.Reset();
505 return;
508 SDVLOG(2) << "Will run configure SyncShare with types "
509 << ModelTypeSetToString(session_context_->GetEnabledTypes());
510 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
511 bool premature_exit = !syncer_->ConfigureSyncShare(
512 pending_configure_params_->types_to_download,
513 pending_configure_params_->source,
514 session.get());
515 AdjustPolling(FORCE_RESET);
516 // Don't run poll job till the next time poll timer fires.
517 do_poll_after_credentials_updated_ = false;
519 bool success = !premature_exit
520 && !sessions::HasSyncerError(
521 session->status_controller().model_neutral_state());
523 if (success) {
524 SDVLOG(2) << "Configure succeeded.";
525 pending_configure_params_->ready_task.Run();
526 pending_configure_params_.reset();
528 // If we're here, then we successfully reached the server. End all backoff.
529 wait_interval_.reset();
530 NotifyRetryTime(base::Time());
531 } else {
532 HandleFailure(session->status_controller().model_neutral_state());
533 // Sync cycle might receive response from server that causes scheduler to
534 // stop and draws pending_configure_params_ invalid.
535 if (started_ && !pending_configure_params_->retry_task.is_null()) {
536 pending_configure_params_->retry_task.Run();
537 pending_configure_params_->retry_task.Reset();
542 void SyncSchedulerImpl::HandleFailure(
543 const sessions::ModelNeutralState& model_neutral_state) {
544 if (IsCurrentlyThrottled()) {
545 SDVLOG(2) << "Was throttled during previous sync cycle.";
546 RestartWaiting();
547 } else if (!IsBackingOff()) {
548 // Setup our backoff if this is our first such failure.
549 TimeDelta length = delay_provider_->GetDelay(
550 delay_provider_->GetInitialDelay(model_neutral_state));
551 wait_interval_.reset(
552 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
553 SDVLOG(2) << "Sync cycle failed. Will back off for "
554 << wait_interval_->length.InMilliseconds() << "ms.";
555 RestartWaiting();
559 void SyncSchedulerImpl::DoPollSyncSessionJob() {
560 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
562 SDVLOG(2) << "Polling with types "
563 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
564 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
565 syncer_->PollSyncShare(
566 GetEnabledAndUnthrottledTypes(),
567 session.get());
569 AdjustPolling(FORCE_RESET);
571 if (IsCurrentlyThrottled()) {
572 SDVLOG(2) << "Poll request got us throttled.";
573 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
574 // to do is start the timer.
575 RestartWaiting();
579 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
580 DCHECK(CalledOnValidThread());
581 base::TimeTicks now = TimeTicks::Now();
582 // Update timing information for how often datatypes are triggering nudges.
583 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
584 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
585 last_local_nudges_by_model_type_[iter.Get()] = now;
586 if (previous.is_null())
587 continue;
589 #define PER_DATA_TYPE_MACRO(type_str) \
590 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
591 SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
592 #undef PER_DATA_TYPE_MACRO
596 TimeDelta SyncSchedulerImpl::GetPollInterval() {
597 return (!session_context_->notifications_enabled() ||
598 !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
599 syncer_short_poll_interval_seconds_ :
600 syncer_long_poll_interval_seconds_;
603 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
604 DCHECK(CalledOnValidThread());
606 TimeDelta poll = GetPollInterval();
607 bool rate_changed = !poll_timer_.IsRunning() ||
608 poll != poll_timer_.GetCurrentDelay();
610 if (type == FORCE_RESET) {
611 last_poll_reset_ = base::TimeTicks::Now();
612 if (!rate_changed)
613 poll_timer_.Reset();
616 if (!rate_changed)
617 return;
619 // Adjust poll rate.
620 poll_timer_.Stop();
621 poll_timer_.Start(FROM_HERE, poll, this,
622 &SyncSchedulerImpl::PollTimerCallback);
625 void SyncSchedulerImpl::RestartWaiting() {
626 CHECK(wait_interval_.get());
627 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
628 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
629 SDVLOG(2) << "Starting WaitInterval timer of length "
630 << wait_interval_->length.InMilliseconds() << "ms.";
631 if (wait_interval_->mode == WaitInterval::THROTTLED) {
632 pending_wakeup_timer_.Start(
633 FROM_HERE,
634 wait_interval_->length,
635 base::Bind(&SyncSchedulerImpl::Unthrottle,
636 weak_ptr_factory_.GetWeakPtr()));
637 } else {
638 pending_wakeup_timer_.Start(
639 FROM_HERE,
640 wait_interval_->length,
641 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
642 weak_ptr_factory_.GetWeakPtr()));
646 void SyncSchedulerImpl::Stop() {
647 DCHECK(CalledOnValidThread());
648 SDVLOG(2) << "Stop called";
650 // Kill any in-flight method calls.
651 weak_ptr_factory_.InvalidateWeakPtrs();
652 wait_interval_.reset();
653 NotifyRetryTime(base::Time());
654 poll_timer_.Stop();
655 pending_wakeup_timer_.Stop();
656 pending_configure_params_.reset();
657 if (started_)
658 started_ = false;
661 // This is the only place where we invoke DoSyncSessionJob with canary
662 // privileges. Everyone else should use NORMAL_PRIORITY.
663 void SyncSchedulerImpl::TryCanaryJob() {
664 next_sync_session_job_priority_ = CANARY_PRIORITY;
665 TrySyncSessionJob();
668 void SyncSchedulerImpl::TrySyncSessionJob() {
669 // Post call to TrySyncSessionJobImpl on current thread. Later request for
670 // access token will be here.
671 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
672 &SyncSchedulerImpl::TrySyncSessionJobImpl,
673 weak_ptr_factory_.GetWeakPtr()));
676 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
677 JobPriority priority = next_sync_session_job_priority_;
678 next_sync_session_job_priority_ = NORMAL_PRIORITY;
680 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
682 DCHECK(CalledOnValidThread());
683 if (mode_ == CONFIGURATION_MODE) {
684 if (pending_configure_params_) {
685 SDVLOG(2) << "Found pending configure job";
686 DoConfigurationSyncSessionJob(priority);
688 } else if (CanRunNudgeJobNow(priority)) {
689 if (nudge_tracker_.IsSyncRequired()) {
690 SDVLOG(2) << "Found pending nudge job";
691 DoNudgeSyncSessionJob(priority);
692 } else if (do_poll_after_credentials_updated_ ||
693 ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
694 DoPollSyncSessionJob();
695 // Poll timer fires infrequently. Usually by this time access token is
696 // already expired and poll job will fail with auth error. Set flag to
697 // retry poll once ProfileSyncService gets new access token, TryCanaryJob
698 // will be called after access token is retrieved.
699 if (HttpResponse::SYNC_AUTH_ERROR ==
700 session_context_->connection_manager()->server_status()) {
701 do_poll_after_credentials_updated_ = true;
706 if (priority == CANARY_PRIORITY) {
707 // If this is canary job then whatever result was don't run poll job till
708 // the next time poll timer fires.
709 do_poll_after_credentials_updated_ = false;
712 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
713 // If we succeeded, our wait interval would have been cleared. If it hasn't
714 // been cleared, then we should increase our backoff interval and schedule
715 // another retry.
716 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
717 wait_interval_.reset(
718 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
719 SDVLOG(2) << "Sync cycle failed. Will back off for "
720 << wait_interval_->length.InMilliseconds() << "ms.";
721 RestartWaiting();
725 void SyncSchedulerImpl::PollTimerCallback() {
726 DCHECK(CalledOnValidThread());
727 if (no_scheduling_allowed_) {
728 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
729 // functions that are called only on the sync thread. This function is also
730 // called only on the sync thread, and only when it is posted by an expiring
731 // timer. If we find that no_scheduling_allowed_ is set here, then
732 // something is very wrong. Maybe someone mistakenly called us directly, or
733 // mishandled the book-keeping for no_scheduling_allowed_.
734 NOTREACHED() << "Illegal to schedule job while session in progress.";
735 return;
738 TrySyncSessionJob();
741 void SyncSchedulerImpl::RetryTimerCallback() {
742 TrySyncSessionJob();
745 void SyncSchedulerImpl::Unthrottle() {
746 DCHECK(CalledOnValidThread());
747 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
749 // We're no longer throttled, so clear the wait interval.
750 wait_interval_.reset();
751 NotifyRetryTime(base::Time());
752 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
754 // We treat this as a 'canary' in the sense that it was originally scheduled
755 // to run some time ago, failed, and we now want to retry, versus a job that
756 // was just created (e.g via ScheduleNudgeImpl). The main implication is
757 // that we're careful to update routing info (etc) with such potentially
758 // stale canary jobs.
759 TryCanaryJob();
762 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
763 DCHECK(CalledOnValidThread());
764 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
765 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
767 if (nudge_tracker_.IsAnyTypeThrottled()) {
768 const base::TimeTicks now = base::TimeTicks::Now();
769 base::TimeDelta time_until_next_unthrottle =
770 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
771 type_unthrottle_timer_.Start(
772 FROM_HERE,
773 time_until_next_unthrottle,
774 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
775 weak_ptr_factory_.GetWeakPtr(),
776 now + time_until_next_unthrottle));
779 // Maybe this is a good time to run a nudge job. Let's try it.
780 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
781 TrySyncSessionJob();
784 void SyncSchedulerImpl::PerformDelayedNudge() {
785 // Circumstances may have changed since we scheduled this delayed nudge.
786 // We must check to see if it's OK to run the job before we do so.
787 if (CanRunNudgeJobNow(NORMAL_PRIORITY))
788 TrySyncSessionJob();
790 // We're not responsible for setting up any retries here. The functions that
791 // first put us into a state that prevents successful sync cycles (eg. global
792 // throttling, type throttling, network errors, transient errors) will also
793 // setup the appropriate retry logic (eg. retry after timeout, exponential
794 // backoff, retry when the network changes).
797 void SyncSchedulerImpl::ExponentialBackoffRetry() {
798 TryCanaryJob();
801 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
802 FOR_EACH_OBSERVER(SyncEngineEventListener,
803 *session_context_->listeners(),
804 OnRetryTimeChanged(retry_time));
807 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
808 FOR_EACH_OBSERVER(SyncEngineEventListener,
809 *session_context_->listeners(),
810 OnThrottledTypesChanged(types));
813 bool SyncSchedulerImpl::IsBackingOff() const {
814 DCHECK(CalledOnValidThread());
815 return wait_interval_.get() && wait_interval_->mode ==
816 WaitInterval::EXPONENTIAL_BACKOFF;
819 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
820 DCHECK(CalledOnValidThread());
821 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
822 throttle_duration));
823 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
824 NotifyThrottledTypesChanged(ModelTypeSet::All());
827 void SyncSchedulerImpl::OnTypesThrottled(
828 ModelTypeSet types,
829 const base::TimeDelta& throttle_duration) {
830 base::TimeTicks now = base::TimeTicks::Now();
832 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
833 base::TimeDelta time_until_next_unthrottle =
834 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
835 type_unthrottle_timer_.Start(
836 FROM_HERE,
837 time_until_next_unthrottle,
838 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
839 weak_ptr_factory_.GetWeakPtr(),
840 now + time_until_next_unthrottle));
841 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
844 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
845 DCHECK(CalledOnValidThread());
846 return wait_interval_.get() && wait_interval_->mode ==
847 WaitInterval::THROTTLED;
850 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
851 const base::TimeDelta& new_interval) {
852 DCHECK(CalledOnValidThread());
853 syncer_short_poll_interval_seconds_ = new_interval;
856 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
857 const base::TimeDelta& new_interval) {
858 DCHECK(CalledOnValidThread());
859 syncer_long_poll_interval_seconds_ = new_interval;
862 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays(
863 const std::map<ModelType, base::TimeDelta>& nudge_delays) {
864 DCHECK(CalledOnValidThread());
865 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays);
868 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
869 if (size > 0)
870 nudge_tracker_.SetHintBufferSize(size);
871 else
872 NOTREACHED() << "Hint buffer size should be > 0.";
875 void SyncSchedulerImpl::OnSyncProtocolError(
876 const SyncProtocolError& sync_protocol_error) {
877 DCHECK(CalledOnValidThread());
878 if (ShouldRequestEarlyExit(sync_protocol_error)) {
879 SDVLOG(2) << "Sync Scheduler requesting early exit.";
880 Stop();
882 if (IsActionableError(sync_protocol_error)) {
883 SDVLOG(2) << "OnActionableError";
884 FOR_EACH_OBSERVER(SyncEngineEventListener,
885 *session_context_->listeners(),
886 OnActionableError(sync_protocol_error));
890 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
891 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
892 retry_timer_.Start(FROM_HERE, delay, this,
893 &SyncSchedulerImpl::RetryTimerCallback);
896 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
897 FOR_EACH_OBSERVER(SyncEngineEventListener,
898 *session_context_->listeners(),
899 OnMigrationRequested(types));
902 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
903 DCHECK(CalledOnValidThread());
904 session_context_->set_notifications_enabled(notifications_enabled);
905 if (notifications_enabled)
906 nudge_tracker_.OnInvalidationsEnabled();
907 else
908 nudge_tracker_.OnInvalidationsDisabled();
911 #undef SDVLOG_LOC
913 #undef SDVLOG
915 #undef SLOG
917 #undef ENUM_CASE
919 } // namespace syncer