1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/sync_scheduler_impl.h"
10 #include "base/auto_reset.h"
11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h"
19 #include "sync/protocol/proto_enum_conversions.h"
20 #include "sync/protocol/sync.pb.h"
21 #include "sync/util/data_type_histogram.h"
22 #include "sync/util/logging.h"
24 using base::TimeDelta
;
25 using base::TimeTicks
;
29 using sessions::SyncSession
;
30 using sessions::SyncSessionSnapshot
;
31 using sync_pb::GetUpdatesCallerInfo
;
35 bool IsConfigRelatedUpdateSourceValue(
36 GetUpdatesCallerInfo::GetUpdatesSource source
) {
38 case GetUpdatesCallerInfo::RECONFIGURATION
:
39 case GetUpdatesCallerInfo::MIGRATION
:
40 case GetUpdatesCallerInfo::NEW_CLIENT
:
41 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE
:
42 case GetUpdatesCallerInfo::PROGRAMMATIC
:
49 bool ShouldRequestEarlyExit(const SyncProtocolError
& error
) {
50 switch (error
.error_type
) {
58 case DISABLED_BY_ADMIN
:
60 // If we send terminate sync early then |sync_cycle_ended| notification
61 // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
62 // notification wouldnt be sent either. Then the UI layer would be left
63 // waiting forever. So assert we would send something.
64 DCHECK_NE(error
.action
, UNKNOWN_ACTION
);
66 case INVALID_CREDENTIAL
:
67 // The notification for this is handled by PostAndProcessHeaders|.
68 // Server does no have to send any action for this.
70 // Make the default a NOTREACHED. So if a new error is introduced we
71 // think about its expected functionality.
78 bool IsActionableError(
79 const SyncProtocolError
& error
) {
80 return (error
.action
!= UNKNOWN_ACTION
);
85 ConfigurationParams::ConfigurationParams()
86 : source(GetUpdatesCallerInfo::UNKNOWN
) {}
87 ConfigurationParams::ConfigurationParams(
88 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource
& source
,
89 ModelTypeSet types_to_download
,
90 const ModelSafeRoutingInfo
& routing_info
,
91 const base::Closure
& ready_task
,
92 const base::Closure
& retry_task
)
94 types_to_download(types_to_download
),
95 routing_info(routing_info
),
96 ready_task(ready_task
),
97 retry_task(retry_task
) {
98 DCHECK(!ready_task
.is_null());
100 ConfigurationParams::~ConfigurationParams() {}
102 SyncSchedulerImpl::WaitInterval::WaitInterval()
105 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode
, TimeDelta length
)
106 : mode(mode
), length(length
) {}
108 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
110 #define ENUM_CASE(x) case x: return #x; break;
112 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode
) {
115 ENUM_CASE(EXPONENTIAL_BACKOFF
);
116 ENUM_CASE(THROTTLED
);
122 GetUpdatesCallerInfo::GetUpdatesSource
GetUpdatesFromNudgeSource(
123 NudgeSource source
) {
125 case NUDGE_SOURCE_NOTIFICATION
:
126 return GetUpdatesCallerInfo::NOTIFICATION
;
127 case NUDGE_SOURCE_LOCAL
:
128 return GetUpdatesCallerInfo::LOCAL
;
129 case NUDGE_SOURCE_LOCAL_REFRESH
:
130 return GetUpdatesCallerInfo::DATATYPE_REFRESH
;
131 case NUDGE_SOURCE_UNKNOWN
:
132 return GetUpdatesCallerInfo::UNKNOWN
;
135 return GetUpdatesCallerInfo::UNKNOWN
;
139 // Helper macros to log with the syncer thread name; useful when there
140 // are multiple syncer threads involved.
142 #define SLOG(severity) LOG(severity) << name_ << ": "
144 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
146 #define SDVLOG_LOC(from_here, verbose_level) \
147 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
149 SyncSchedulerImpl::SyncSchedulerImpl(const std::string
& name
,
150 BackoffDelayProvider
* delay_provider
,
151 sessions::SyncSessionContext
* context
,
155 syncer_short_poll_interval_seconds_(
156 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds
)),
157 syncer_long_poll_interval_seconds_(
158 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds
)),
160 delay_provider_(delay_provider
),
162 session_context_(context
),
163 no_scheduling_allowed_(false),
164 do_poll_after_credentials_updated_(false),
165 next_sync_session_job_priority_(NORMAL_PRIORITY
),
166 weak_ptr_factory_(this),
167 weak_ptr_factory_for_weak_handle_(this) {
168 weak_handle_this_
= MakeWeakHandle(
169 weak_ptr_factory_for_weak_handle_
.GetWeakPtr());
172 SyncSchedulerImpl::~SyncSchedulerImpl() {
173 DCHECK(CalledOnValidThread());
177 void SyncSchedulerImpl::OnCredentialsUpdated() {
178 DCHECK(CalledOnValidThread());
180 if (HttpResponse::SYNC_AUTH_ERROR
==
181 session_context_
->connection_manager()->server_status()) {
182 OnServerConnectionErrorFixed();
186 void SyncSchedulerImpl::OnConnectionStatusChange() {
187 if (HttpResponse::CONNECTION_UNAVAILABLE
==
188 session_context_
->connection_manager()->server_status()) {
189 // Optimistically assume that the connection is fixed and try
191 OnServerConnectionErrorFixed();
195 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
196 // There could be a pending nudge or configuration job in several cases:
198 // 1. We're in exponential backoff.
199 // 2. We're silenced / throttled.
200 // 3. A nudge was saved previously due to not having a valid auth token.
201 // 4. A nudge was scheduled + saved while in configuration mode.
203 // In all cases except (2), we want to retry contacting the server. We
204 // call TryCanaryJob to achieve this, and note that nothing -- not even a
205 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
206 // has the authority to do that is the Unthrottle timer.
210 void SyncSchedulerImpl::Start(Mode mode
) {
211 DCHECK(CalledOnValidThread());
212 std::string thread_name
= base::MessageLoop::current()->thread_name();
213 if (thread_name
.empty())
214 thread_name
= "<Main thread>";
215 SDVLOG(2) << "Start called from thread "
216 << thread_name
<< " with mode " << GetModeString(mode
);
219 SendInitialSnapshot();
222 DCHECK(!session_context_
->account_name().empty());
223 DCHECK(syncer_
.get());
224 Mode old_mode
= mode_
;
226 AdjustPolling(UPDATE_INTERVAL
); // Will kick start poll timer if needed.
228 if (old_mode
!= mode_
&& mode_
== NORMAL_MODE
) {
229 // We just got back to normal mode. Let's try to run the work that was
230 // queued up while we were configuring.
232 // Update our current time before checking IsRetryRequired().
233 nudge_tracker_
.SetSyncCycleStartTime(base::TimeTicks::Now());
234 if (nudge_tracker_
.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY
)) {
240 ModelTypeSet
SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
241 ModelTypeSet enabled_types
= session_context_
->GetEnabledTypes();
242 ModelTypeSet enabled_protocol_types
=
243 Intersection(ProtocolTypes(), enabled_types
);
244 ModelTypeSet throttled_types
= nudge_tracker_
.GetThrottledTypes();
245 return Difference(enabled_protocol_types
, throttled_types
);
248 void SyncSchedulerImpl::SendInitialSnapshot() {
249 DCHECK(CalledOnValidThread());
250 scoped_ptr
<SyncSession
> dummy(SyncSession::Build(session_context_
, this));
251 SyncCycleEvent
event(SyncCycleEvent::STATUS_CHANGED
);
252 event
.snapshot
= dummy
->TakeSnapshot();
253 FOR_EACH_OBSERVER(SyncEngineEventListener
,
254 *session_context_
->listeners(),
255 OnSyncCycleEvent(event
));
260 // Helper to extract the routing info corresponding to types in
261 // |types_to_download| from |current_routes|.
262 void BuildModelSafeParams(
263 ModelTypeSet types_to_download
,
264 const ModelSafeRoutingInfo
& current_routes
,
265 ModelSafeRoutingInfo
* result_routes
) {
266 for (ModelTypeSet::Iterator iter
= types_to_download
.First(); iter
.Good();
268 ModelType type
= iter
.Get();
269 ModelSafeRoutingInfo::const_iterator route
= current_routes
.find(type
);
270 DCHECK(route
!= current_routes
.end());
271 ModelSafeGroup group
= route
->second
;
272 (*result_routes
)[type
] = group
;
278 void SyncSchedulerImpl::ScheduleConfiguration(
279 const ConfigurationParams
& params
) {
280 DCHECK(CalledOnValidThread());
281 DCHECK(IsConfigRelatedUpdateSourceValue(params
.source
));
282 DCHECK_EQ(CONFIGURATION_MODE
, mode_
);
283 DCHECK(!params
.ready_task
.is_null());
284 CHECK(started_
) << "Scheduler must be running to configure.";
285 SDVLOG(2) << "Reconfiguring syncer.";
287 // Only one configuration is allowed at a time. Verify we're not waiting
288 // for a pending configure job.
289 DCHECK(!pending_configure_params_
);
291 ModelSafeRoutingInfo restricted_routes
;
292 BuildModelSafeParams(params
.types_to_download
,
295 session_context_
->SetRoutingInfo(restricted_routes
);
297 // Only reconfigure if we have types to download.
298 if (!params
.types_to_download
.Empty()) {
299 pending_configure_params_
.reset(new ConfigurationParams(params
));
302 SDVLOG(2) << "No change in routing info, calling ready task directly.";
303 params
.ready_task
.Run();
307 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority
) {
308 DCHECK(CalledOnValidThread());
309 if (wait_interval_
&& wait_interval_
->mode
== WaitInterval::THROTTLED
) {
310 SDVLOG(1) << "Unable to run a job because we're throttled.";
315 && wait_interval_
->mode
== WaitInterval::EXPONENTIAL_BACKOFF
316 && priority
!= CANARY_PRIORITY
) {
317 SDVLOG(1) << "Unable to run a job because we're backing off.";
321 if (session_context_
->connection_manager()->HasInvalidAuthToken()) {
322 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
329 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority
) {
330 DCHECK(CalledOnValidThread());
332 if (!CanRunJobNow(priority
)) {
333 SDVLOG(1) << "Unable to run a nudge job right now";
337 const ModelTypeSet enabled_types
= session_context_
->GetEnabledTypes();
338 if (nudge_tracker_
.GetThrottledTypes().HasAll(enabled_types
)) {
339 SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
343 if (mode_
== CONFIGURATION_MODE
) {
344 SDVLOG(1) << "Not running nudge because we're in configuration mode.";
351 void SyncSchedulerImpl::ScheduleLocalNudge(
353 const tracked_objects::Location
& nudge_location
) {
354 DCHECK(CalledOnValidThread());
355 DCHECK(!types
.Empty());
357 SDVLOG_LOC(nudge_location
, 2)
358 << "Scheduling sync because of local change to "
359 << ModelTypeSetToString(types
);
360 UpdateNudgeTimeRecords(types
);
361 base::TimeDelta nudge_delay
= nudge_tracker_
.RecordLocalChange(types
);
362 ScheduleNudgeImpl(nudge_delay
, nudge_location
);
365 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
367 const tracked_objects::Location
& nudge_location
) {
368 DCHECK(CalledOnValidThread());
369 DCHECK(!types
.Empty());
371 SDVLOG_LOC(nudge_location
, 2)
372 << "Scheduling sync because of local refresh request for "
373 << ModelTypeSetToString(types
);
374 base::TimeDelta nudge_delay
= nudge_tracker_
.RecordLocalRefreshRequest(types
);
375 ScheduleNudgeImpl(nudge_delay
, nudge_location
);
378 void SyncSchedulerImpl::ScheduleInvalidationNudge(
379 syncer::ModelType model_type
,
380 scoped_ptr
<InvalidationInterface
> invalidation
,
381 const tracked_objects::Location
& nudge_location
) {
382 DCHECK(CalledOnValidThread());
384 SDVLOG_LOC(nudge_location
, 2)
385 << "Scheduling sync because we received invalidation for "
386 << ModelTypeToString(model_type
);
387 base::TimeDelta nudge_delay
=
388 nudge_tracker_
.RecordRemoteInvalidation(model_type
, invalidation
.Pass());
389 ScheduleNudgeImpl(nudge_delay
, nudge_location
);
392 void SyncSchedulerImpl::ScheduleInitialSyncNudge(syncer::ModelType model_type
) {
393 DCHECK(CalledOnValidThread());
395 SDVLOG(2) << "Scheduling non-blocking initial sync for "
396 << ModelTypeToString(model_type
);
397 nudge_tracker_
.RecordInitialSyncRequired(model_type
);
398 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE
);
401 // TODO(zea): Consider adding separate throttling/backoff for datatype
403 void SyncSchedulerImpl::ScheduleNudgeImpl(
404 const TimeDelta
& delay
,
405 const tracked_objects::Location
& nudge_location
) {
406 DCHECK(CalledOnValidThread());
408 if (no_scheduling_allowed_
) {
409 NOTREACHED() << "Illegal to schedule job while session in progress.";
414 SDVLOG_LOC(nudge_location
, 2)
415 << "Dropping nudge, scheduler is not running.";
419 SDVLOG_LOC(nudge_location
, 2)
420 << "In ScheduleNudgeImpl with delay "
421 << delay
.InMilliseconds() << " ms";
423 if (!CanRunNudgeJobNow(NORMAL_PRIORITY
))
426 TimeTicks incoming_run_time
= TimeTicks::Now() + delay
;
427 if (!scheduled_nudge_time_
.is_null() &&
428 (scheduled_nudge_time_
< incoming_run_time
)) {
429 // Old job arrives sooner than this one. Don't reschedule it.
433 // Either there is no existing nudge in flight or the incoming nudge should be
434 // made to arrive first (preempt) the existing nudge. We reschedule in either
436 SDVLOG_LOC(nudge_location
, 2)
437 << "Scheduling a nudge with "
438 << delay
.InMilliseconds() << " ms delay";
439 scheduled_nudge_time_
= incoming_run_time
;
440 pending_wakeup_timer_
.Start(
443 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge
,
444 weak_ptr_factory_
.GetWeakPtr()));
447 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode
) {
449 ENUM_CASE(CONFIGURATION_MODE
);
450 ENUM_CASE(NORMAL_MODE
);
455 void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms
) {
456 DCHECK(CalledOnValidThread());
457 nudge_tracker_
.SetDefaultNudgeDelay(delay_ms
);
460 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority
) {
461 DCHECK(CalledOnValidThread());
462 DCHECK(CanRunNudgeJobNow(priority
));
464 DVLOG(2) << "Will run normal mode sync cycle with types "
465 << ModelTypeSetToString(session_context_
->GetEnabledTypes());
466 scoped_ptr
<SyncSession
> session(SyncSession::Build(session_context_
, this));
467 bool premature_exit
= !syncer_
->NormalSyncShare(
468 GetEnabledAndUnthrottledTypes(),
471 AdjustPolling(FORCE_RESET
);
472 // Don't run poll job till the next time poll timer fires.
473 do_poll_after_credentials_updated_
= false;
475 bool success
= !premature_exit
476 && !sessions::HasSyncerError(
477 session
->status_controller().model_neutral_state());
480 // That cycle took care of any outstanding work we had.
481 SDVLOG(2) << "Nudge succeeded.";
482 nudge_tracker_
.RecordSuccessfulSyncCycle();
483 scheduled_nudge_time_
= base::TimeTicks();
485 // If we're here, then we successfully reached the server. End all backoff.
486 wait_interval_
.reset();
487 NotifyRetryTime(base::Time());
489 HandleFailure(session
->status_controller().model_neutral_state());
493 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority
) {
494 DCHECK(CalledOnValidThread());
495 DCHECK_EQ(mode_
, CONFIGURATION_MODE
);
496 DCHECK(pending_configure_params_
!= NULL
);
498 if (!CanRunJobNow(priority
)) {
499 SDVLOG(2) << "Unable to run configure job right now.";
500 if (!pending_configure_params_
->retry_task
.is_null()) {
501 pending_configure_params_
->retry_task
.Run();
502 pending_configure_params_
->retry_task
.Reset();
507 SDVLOG(2) << "Will run configure SyncShare with types "
508 << ModelTypeSetToString(session_context_
->GetEnabledTypes());
509 scoped_ptr
<SyncSession
> session(SyncSession::Build(session_context_
, this));
510 bool premature_exit
= !syncer_
->ConfigureSyncShare(
511 pending_configure_params_
->types_to_download
,
512 pending_configure_params_
->source
,
514 AdjustPolling(FORCE_RESET
);
515 // Don't run poll job till the next time poll timer fires.
516 do_poll_after_credentials_updated_
= false;
518 bool success
= !premature_exit
519 && !sessions::HasSyncerError(
520 session
->status_controller().model_neutral_state());
523 SDVLOG(2) << "Configure succeeded.";
524 pending_configure_params_
->ready_task
.Run();
525 pending_configure_params_
.reset();
527 // If we're here, then we successfully reached the server. End all backoff.
528 wait_interval_
.reset();
529 NotifyRetryTime(base::Time());
531 HandleFailure(session
->status_controller().model_neutral_state());
532 // Sync cycle might receive response from server that causes scheduler to
533 // stop and draws pending_configure_params_ invalid.
534 if (started_
&& !pending_configure_params_
->retry_task
.is_null()) {
535 pending_configure_params_
->retry_task
.Run();
536 pending_configure_params_
->retry_task
.Reset();
541 void SyncSchedulerImpl::HandleFailure(
542 const sessions::ModelNeutralState
& model_neutral_state
) {
543 if (IsCurrentlyThrottled()) {
544 SDVLOG(2) << "Was throttled during previous sync cycle.";
546 } else if (!IsBackingOff()) {
547 // Setup our backoff if this is our first such failure.
548 TimeDelta length
= delay_provider_
->GetDelay(
549 delay_provider_
->GetInitialDelay(model_neutral_state
));
550 wait_interval_
.reset(
551 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF
, length
));
552 SDVLOG(2) << "Sync cycle failed. Will back off for "
553 << wait_interval_
->length
.InMilliseconds() << "ms.";
558 void SyncSchedulerImpl::DoPollSyncSessionJob() {
559 base::AutoReset
<bool> protector(&no_scheduling_allowed_
, true);
561 SDVLOG(2) << "Polling with types "
562 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
563 scoped_ptr
<SyncSession
> session(SyncSession::Build(session_context_
, this));
564 syncer_
->PollSyncShare(
565 GetEnabledAndUnthrottledTypes(),
568 AdjustPolling(FORCE_RESET
);
570 if (IsCurrentlyThrottled()) {
571 SDVLOG(2) << "Poll request got us throttled.";
572 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
573 // to do is start the timer.
578 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types
) {
579 DCHECK(CalledOnValidThread());
580 base::TimeTicks now
= TimeTicks::Now();
581 // Update timing information for how often datatypes are triggering nudges.
582 for (ModelTypeSet::Iterator iter
= types
.First(); iter
.Good(); iter
.Inc()) {
583 base::TimeTicks previous
= last_local_nudges_by_model_type_
[iter
.Get()];
584 last_local_nudges_by_model_type_
[iter
.Get()] = now
;
585 if (previous
.is_null())
588 #define PER_DATA_TYPE_MACRO(type_str) \
589 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
590 SYNC_DATA_TYPE_HISTOGRAM(iter
.Get());
591 #undef PER_DATA_TYPE_MACRO
595 TimeDelta
SyncSchedulerImpl::GetPollInterval() {
596 return (!session_context_
->notifications_enabled() ||
597 !session_context_
->ShouldFetchUpdatesBeforeCommit()) ?
598 syncer_short_poll_interval_seconds_
:
599 syncer_long_poll_interval_seconds_
;
602 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type
) {
603 DCHECK(CalledOnValidThread());
605 TimeDelta poll
= GetPollInterval();
606 bool rate_changed
= !poll_timer_
.IsRunning() ||
607 poll
!= poll_timer_
.GetCurrentDelay();
609 if (type
== FORCE_RESET
) {
610 last_poll_reset_
= base::TimeTicks::Now();
620 poll_timer_
.Start(FROM_HERE
, poll
, this,
621 &SyncSchedulerImpl::PollTimerCallback
);
624 void SyncSchedulerImpl::RestartWaiting() {
625 CHECK(wait_interval_
.get());
626 DCHECK(wait_interval_
->length
>= TimeDelta::FromSeconds(0));
627 NotifyRetryTime(base::Time::Now() + wait_interval_
->length
);
628 SDVLOG(2) << "Starting WaitInterval timer of length "
629 << wait_interval_
->length
.InMilliseconds() << "ms.";
630 if (wait_interval_
->mode
== WaitInterval::THROTTLED
) {
631 pending_wakeup_timer_
.Start(
633 wait_interval_
->length
,
634 base::Bind(&SyncSchedulerImpl::Unthrottle
,
635 weak_ptr_factory_
.GetWeakPtr()));
637 pending_wakeup_timer_
.Start(
639 wait_interval_
->length
,
640 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry
,
641 weak_ptr_factory_
.GetWeakPtr()));
645 void SyncSchedulerImpl::Stop() {
646 DCHECK(CalledOnValidThread());
647 SDVLOG(2) << "Stop called";
649 // Kill any in-flight method calls.
650 weak_ptr_factory_
.InvalidateWeakPtrs();
651 wait_interval_
.reset();
652 NotifyRetryTime(base::Time());
654 pending_wakeup_timer_
.Stop();
655 pending_configure_params_
.reset();
660 // This is the only place where we invoke DoSyncSessionJob with canary
661 // privileges. Everyone else should use NORMAL_PRIORITY.
662 void SyncSchedulerImpl::TryCanaryJob() {
663 next_sync_session_job_priority_
= CANARY_PRIORITY
;
667 void SyncSchedulerImpl::TrySyncSessionJob() {
668 // Post call to TrySyncSessionJobImpl on current thread. Later request for
669 // access token will be here.
670 base::MessageLoop::current()->PostTask(FROM_HERE
, base::Bind(
671 &SyncSchedulerImpl::TrySyncSessionJobImpl
,
672 weak_ptr_factory_
.GetWeakPtr()));
675 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
676 JobPriority priority
= next_sync_session_job_priority_
;
677 next_sync_session_job_priority_
= NORMAL_PRIORITY
;
679 nudge_tracker_
.SetSyncCycleStartTime(base::TimeTicks::Now());
681 DCHECK(CalledOnValidThread());
682 if (mode_
== CONFIGURATION_MODE
) {
683 if (pending_configure_params_
) {
684 SDVLOG(2) << "Found pending configure job";
685 DoConfigurationSyncSessionJob(priority
);
687 } else if (CanRunNudgeJobNow(priority
)) {
688 if (nudge_tracker_
.IsSyncRequired()) {
689 SDVLOG(2) << "Found pending nudge job";
690 DoNudgeSyncSessionJob(priority
);
691 } else if (do_poll_after_credentials_updated_
||
692 ((base::TimeTicks::Now() - last_poll_reset_
) >= GetPollInterval())) {
693 DoPollSyncSessionJob();
694 // Poll timer fires infrequently. Usually by this time access token is
695 // already expired and poll job will fail with auth error. Set flag to
696 // retry poll once ProfileSyncService gets new access token, TryCanaryJob
697 // will be called after access token is retrieved.
698 if (HttpResponse::SYNC_AUTH_ERROR
==
699 session_context_
->connection_manager()->server_status()) {
700 do_poll_after_credentials_updated_
= true;
705 if (priority
== CANARY_PRIORITY
) {
706 // If this is canary job then whatever result was don't run poll job till
707 // the next time poll timer fires.
708 do_poll_after_credentials_updated_
= false;
711 if (IsBackingOff() && !pending_wakeup_timer_
.IsRunning()) {
712 // If we succeeded, our wait interval would have been cleared. If it hasn't
713 // been cleared, then we should increase our backoff interval and schedule
715 TimeDelta length
= delay_provider_
->GetDelay(wait_interval_
->length
);
716 wait_interval_
.reset(
717 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF
, length
));
718 SDVLOG(2) << "Sync cycle failed. Will back off for "
719 << wait_interval_
->length
.InMilliseconds() << "ms.";
724 void SyncSchedulerImpl::PollTimerCallback() {
725 DCHECK(CalledOnValidThread());
726 if (no_scheduling_allowed_
) {
727 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
728 // functions that are called only on the sync thread. This function is also
729 // called only on the sync thread, and only when it is posted by an expiring
730 // timer. If we find that no_scheduling_allowed_ is set here, then
731 // something is very wrong. Maybe someone mistakenly called us directly, or
732 // mishandled the book-keeping for no_scheduling_allowed_.
733 NOTREACHED() << "Illegal to schedule job while session in progress.";
740 void SyncSchedulerImpl::RetryTimerCallback() {
744 void SyncSchedulerImpl::Unthrottle() {
745 DCHECK(CalledOnValidThread());
746 DCHECK_EQ(WaitInterval::THROTTLED
, wait_interval_
->mode
);
748 // We're no longer throttled, so clear the wait interval.
749 wait_interval_
.reset();
750 NotifyRetryTime(base::Time());
751 NotifyThrottledTypesChanged(nudge_tracker_
.GetThrottledTypes());
753 // We treat this as a 'canary' in the sense that it was originally scheduled
754 // to run some time ago, failed, and we now want to retry, versus a job that
755 // was just created (e.g via ScheduleNudgeImpl). The main implication is
756 // that we're careful to update routing info (etc) with such potentially
757 // stale canary jobs.
761 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time
) {
762 DCHECK(CalledOnValidThread());
763 nudge_tracker_
.UpdateTypeThrottlingState(unthrottle_time
);
764 NotifyThrottledTypesChanged(nudge_tracker_
.GetThrottledTypes());
766 if (nudge_tracker_
.IsAnyTypeThrottled()) {
767 const base::TimeTicks now
= base::TimeTicks::Now();
768 base::TimeDelta time_until_next_unthrottle
=
769 nudge_tracker_
.GetTimeUntilNextUnthrottle(now
);
770 type_unthrottle_timer_
.Start(
772 time_until_next_unthrottle
,
773 base::Bind(&SyncSchedulerImpl::TypeUnthrottle
,
774 weak_ptr_factory_
.GetWeakPtr(),
775 now
+ time_until_next_unthrottle
));
778 // Maybe this is a good time to run a nudge job. Let's try it.
779 if (nudge_tracker_
.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY
))
783 void SyncSchedulerImpl::PerformDelayedNudge() {
784 // Circumstances may have changed since we scheduled this delayed nudge.
785 // We must check to see if it's OK to run the job before we do so.
786 if (CanRunNudgeJobNow(NORMAL_PRIORITY
))
789 // We're not responsible for setting up any retries here. The functions that
790 // first put us into a state that prevents successful sync cycles (eg. global
791 // throttling, type throttling, network errors, transient errors) will also
792 // setup the appropriate retry logic (eg. retry after timeout, exponential
793 // backoff, retry when the network changes).
796 void SyncSchedulerImpl::ExponentialBackoffRetry() {
800 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time
) {
801 FOR_EACH_OBSERVER(SyncEngineEventListener
,
802 *session_context_
->listeners(),
803 OnRetryTimeChanged(retry_time
));
806 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types
) {
807 FOR_EACH_OBSERVER(SyncEngineEventListener
,
808 *session_context_
->listeners(),
809 OnThrottledTypesChanged(types
));
812 bool SyncSchedulerImpl::IsBackingOff() const {
813 DCHECK(CalledOnValidThread());
814 return wait_interval_
.get() && wait_interval_
->mode
==
815 WaitInterval::EXPONENTIAL_BACKOFF
;
818 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta
& throttle_duration
) {
819 DCHECK(CalledOnValidThread());
820 wait_interval_
.reset(new WaitInterval(WaitInterval::THROTTLED
,
822 NotifyRetryTime(base::Time::Now() + wait_interval_
->length
);
823 NotifyThrottledTypesChanged(ModelTypeSet::All());
826 void SyncSchedulerImpl::OnTypesThrottled(
828 const base::TimeDelta
& throttle_duration
) {
829 base::TimeTicks now
= base::TimeTicks::Now();
831 nudge_tracker_
.SetTypesThrottledUntil(types
, throttle_duration
, now
);
832 base::TimeDelta time_until_next_unthrottle
=
833 nudge_tracker_
.GetTimeUntilNextUnthrottle(now
);
834 type_unthrottle_timer_
.Start(
836 time_until_next_unthrottle
,
837 base::Bind(&SyncSchedulerImpl::TypeUnthrottle
,
838 weak_ptr_factory_
.GetWeakPtr(),
839 now
+ time_until_next_unthrottle
));
840 NotifyThrottledTypesChanged(nudge_tracker_
.GetThrottledTypes());
843 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
844 DCHECK(CalledOnValidThread());
845 return wait_interval_
.get() && wait_interval_
->mode
==
846 WaitInterval::THROTTLED
;
849 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
850 const base::TimeDelta
& new_interval
) {
851 DCHECK(CalledOnValidThread());
852 syncer_short_poll_interval_seconds_
= new_interval
;
855 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
856 const base::TimeDelta
& new_interval
) {
857 DCHECK(CalledOnValidThread());
858 syncer_long_poll_interval_seconds_
= new_interval
;
861 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays(
862 const std::map
<ModelType
, base::TimeDelta
>& nudge_delays
) {
863 DCHECK(CalledOnValidThread());
864 nudge_tracker_
.OnReceivedCustomNudgeDelays(nudge_delays
);
867 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size
) {
869 nudge_tracker_
.SetHintBufferSize(size
);
871 NOTREACHED() << "Hint buffer size should be > 0.";
874 void SyncSchedulerImpl::OnSyncProtocolError(
875 const SyncProtocolError
& sync_protocol_error
) {
876 DCHECK(CalledOnValidThread());
877 if (ShouldRequestEarlyExit(sync_protocol_error
)) {
878 SDVLOG(2) << "Sync Scheduler requesting early exit.";
881 if (IsActionableError(sync_protocol_error
)) {
882 SDVLOG(2) << "OnActionableError";
883 FOR_EACH_OBSERVER(SyncEngineEventListener
,
884 *session_context_
->listeners(),
885 OnActionableError(sync_protocol_error
));
889 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta
& delay
) {
890 nudge_tracker_
.SetNextRetryTime(TimeTicks::Now() + delay
);
891 retry_timer_
.Start(FROM_HERE
, delay
, this,
892 &SyncSchedulerImpl::RetryTimerCallback
);
895 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types
) {
896 FOR_EACH_OBSERVER(SyncEngineEventListener
,
897 *session_context_
->listeners(),
898 OnMigrationRequested(types
));
901 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled
) {
902 DCHECK(CalledOnValidThread());
903 session_context_
->set_notifications_enabled(notifications_enabled
);
904 if (notifications_enabled
)
905 nudge_tracker_
.OnInvalidationsEnabled();
907 nudge_tracker_
.OnInvalidationsDisabled();
918 } // namespace syncer