Revert of Add a chromium based simple QUIC client. (patchset #10 id:180001 of https...
[chromium-blink-merge.git] / sync / engine / sync_scheduler_impl.cc
blob6f76be0918cf930c80ff57493fb3070e352c7321
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/sync_scheduler_impl.h"
7 #include <algorithm>
8 #include <cstring>
10 #include "base/auto_reset.h"
11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h"
19 #include "sync/protocol/proto_enum_conversions.h"
20 #include "sync/protocol/sync.pb.h"
21 #include "sync/util/data_type_histogram.h"
22 #include "sync/util/logging.h"
24 using base::TimeDelta;
25 using base::TimeTicks;
27 namespace syncer {
29 using sessions::SyncSession;
30 using sessions::SyncSessionSnapshot;
31 using sync_pb::GetUpdatesCallerInfo;
33 namespace {
35 bool IsConfigRelatedUpdateSourceValue(
36 GetUpdatesCallerInfo::GetUpdatesSource source) {
37 switch (source) {
38 case GetUpdatesCallerInfo::RECONFIGURATION:
39 case GetUpdatesCallerInfo::MIGRATION:
40 case GetUpdatesCallerInfo::NEW_CLIENT:
41 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
42 case GetUpdatesCallerInfo::PROGRAMMATIC:
43 return true;
44 default:
45 return false;
49 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
50 switch (error.error_type) {
51 case SYNC_SUCCESS:
52 case MIGRATION_DONE:
53 case THROTTLED:
54 case TRANSIENT_ERROR:
55 return false;
56 case NOT_MY_BIRTHDAY:
57 case CLEAR_PENDING:
58 case DISABLED_BY_ADMIN:
59 case USER_ROLLBACK:
60 // If we send terminate sync early then |sync_cycle_ended| notification
61 // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
62 // notification wouldnt be sent either. Then the UI layer would be left
63 // waiting forever. So assert we would send something.
64 DCHECK_NE(error.action, UNKNOWN_ACTION);
65 return true;
66 case INVALID_CREDENTIAL:
67 // The notification for this is handled by PostAndProcessHeaders|.
68 // Server does no have to send any action for this.
69 return true;
70 // Make the default a NOTREACHED. So if a new error is introduced we
71 // think about its expected functionality.
72 default:
73 NOTREACHED();
74 return false;
78 bool IsActionableError(
79 const SyncProtocolError& error) {
80 return (error.action != UNKNOWN_ACTION);
83 } // namespace
85 ConfigurationParams::ConfigurationParams()
86 : source(GetUpdatesCallerInfo::UNKNOWN) {}
87 ConfigurationParams::ConfigurationParams(
88 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
89 ModelTypeSet types_to_download,
90 const ModelSafeRoutingInfo& routing_info,
91 const base::Closure& ready_task,
92 const base::Closure& retry_task)
93 : source(source),
94 types_to_download(types_to_download),
95 routing_info(routing_info),
96 ready_task(ready_task),
97 retry_task(retry_task) {
98 DCHECK(!ready_task.is_null());
100 ConfigurationParams::~ConfigurationParams() {}
102 SyncSchedulerImpl::WaitInterval::WaitInterval()
103 : mode(UNKNOWN) {}
105 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
106 : mode(mode), length(length) {}
108 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
110 #define ENUM_CASE(x) case x: return #x; break;
112 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
113 switch (mode) {
114 ENUM_CASE(UNKNOWN);
115 ENUM_CASE(EXPONENTIAL_BACKOFF);
116 ENUM_CASE(THROTTLED);
118 NOTREACHED();
119 return "";
122 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
123 NudgeSource source) {
124 switch (source) {
125 case NUDGE_SOURCE_NOTIFICATION:
126 return GetUpdatesCallerInfo::NOTIFICATION;
127 case NUDGE_SOURCE_LOCAL:
128 return GetUpdatesCallerInfo::LOCAL;
129 case NUDGE_SOURCE_LOCAL_REFRESH:
130 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
131 case NUDGE_SOURCE_UNKNOWN:
132 return GetUpdatesCallerInfo::UNKNOWN;
133 default:
134 NOTREACHED();
135 return GetUpdatesCallerInfo::UNKNOWN;
139 // Helper macros to log with the syncer thread name; useful when there
140 // are multiple syncer threads involved.
142 #define SLOG(severity) LOG(severity) << name_ << ": "
144 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
146 #define SDVLOG_LOC(from_here, verbose_level) \
147 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
149 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
150 BackoffDelayProvider* delay_provider,
151 sessions::SyncSessionContext* context,
152 Syncer* syncer)
153 : name_(name),
154 started_(false),
155 syncer_short_poll_interval_seconds_(
156 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
157 syncer_long_poll_interval_seconds_(
158 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
159 mode_(NORMAL_MODE),
160 delay_provider_(delay_provider),
161 syncer_(syncer),
162 session_context_(context),
163 no_scheduling_allowed_(false),
164 do_poll_after_credentials_updated_(false),
165 next_sync_session_job_priority_(NORMAL_PRIORITY),
166 weak_ptr_factory_(this),
167 weak_ptr_factory_for_weak_handle_(this) {
168 weak_handle_this_ = MakeWeakHandle(
169 weak_ptr_factory_for_weak_handle_.GetWeakPtr());
172 SyncSchedulerImpl::~SyncSchedulerImpl() {
173 DCHECK(CalledOnValidThread());
174 Stop();
177 void SyncSchedulerImpl::OnCredentialsUpdated() {
178 DCHECK(CalledOnValidThread());
180 if (HttpResponse::SYNC_AUTH_ERROR ==
181 session_context_->connection_manager()->server_status()) {
182 OnServerConnectionErrorFixed();
186 void SyncSchedulerImpl::OnConnectionStatusChange() {
187 if (HttpResponse::CONNECTION_UNAVAILABLE ==
188 session_context_->connection_manager()->server_status()) {
189 // Optimistically assume that the connection is fixed and try
190 // connecting.
191 OnServerConnectionErrorFixed();
195 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
196 // There could be a pending nudge or configuration job in several cases:
198 // 1. We're in exponential backoff.
199 // 2. We're silenced / throttled.
200 // 3. A nudge was saved previously due to not having a valid auth token.
201 // 4. A nudge was scheduled + saved while in configuration mode.
203 // In all cases except (2), we want to retry contacting the server. We
204 // call TryCanaryJob to achieve this, and note that nothing -- not even a
205 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
206 // has the authority to do that is the Unthrottle timer.
207 TryCanaryJob();
210 void SyncSchedulerImpl::Start(Mode mode) {
211 DCHECK(CalledOnValidThread());
212 std::string thread_name = base::MessageLoop::current()->thread_name();
213 if (thread_name.empty())
214 thread_name = "<Main thread>";
215 SDVLOG(2) << "Start called from thread "
216 << thread_name << " with mode " << GetModeString(mode);
217 if (!started_) {
218 started_ = true;
219 SendInitialSnapshot();
222 DCHECK(!session_context_->account_name().empty());
223 DCHECK(syncer_.get());
224 Mode old_mode = mode_;
225 mode_ = mode;
226 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed.
228 if (old_mode != mode_ && mode_ == NORMAL_MODE) {
229 // We just got back to normal mode. Let's try to run the work that was
230 // queued up while we were configuring.
232 // Update our current time before checking IsRetryRequired().
233 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
234 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
235 TrySyncSessionJob();
240 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
241 ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
242 ModelTypeSet enabled_protocol_types =
243 Intersection(ProtocolTypes(), enabled_types);
244 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
245 return Difference(enabled_protocol_types, throttled_types);
248 void SyncSchedulerImpl::SendInitialSnapshot() {
249 DCHECK(CalledOnValidThread());
250 scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
251 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
252 event.snapshot = dummy->TakeSnapshot();
253 FOR_EACH_OBSERVER(SyncEngineEventListener,
254 *session_context_->listeners(),
255 OnSyncCycleEvent(event));
258 namespace {
260 // Helper to extract the routing info corresponding to types in
261 // |types_to_download| from |current_routes|.
262 void BuildModelSafeParams(
263 ModelTypeSet types_to_download,
264 const ModelSafeRoutingInfo& current_routes,
265 ModelSafeRoutingInfo* result_routes) {
266 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
267 iter.Inc()) {
268 ModelType type = iter.Get();
269 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
270 DCHECK(route != current_routes.end());
271 ModelSafeGroup group = route->second;
272 (*result_routes)[type] = group;
276 } // namespace.
278 void SyncSchedulerImpl::ScheduleConfiguration(
279 const ConfigurationParams& params) {
280 DCHECK(CalledOnValidThread());
281 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
282 DCHECK_EQ(CONFIGURATION_MODE, mode_);
283 DCHECK(!params.ready_task.is_null());
284 CHECK(started_) << "Scheduler must be running to configure.";
285 SDVLOG(2) << "Reconfiguring syncer.";
287 // Only one configuration is allowed at a time. Verify we're not waiting
288 // for a pending configure job.
289 DCHECK(!pending_configure_params_);
291 ModelSafeRoutingInfo restricted_routes;
292 BuildModelSafeParams(params.types_to_download,
293 params.routing_info,
294 &restricted_routes);
295 session_context_->SetRoutingInfo(restricted_routes);
297 // Only reconfigure if we have types to download.
298 if (!params.types_to_download.Empty()) {
299 pending_configure_params_.reset(new ConfigurationParams(params));
300 TrySyncSessionJob();
301 } else {
302 SDVLOG(2) << "No change in routing info, calling ready task directly.";
303 params.ready_task.Run();
307 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
308 DCHECK(CalledOnValidThread());
309 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
310 SDVLOG(1) << "Unable to run a job because we're throttled.";
311 return false;
314 if (wait_interval_
315 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
316 && priority != CANARY_PRIORITY) {
317 SDVLOG(1) << "Unable to run a job because we're backing off.";
318 return false;
321 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
322 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
323 return false;
326 return true;
329 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
330 DCHECK(CalledOnValidThread());
332 if (!CanRunJobNow(priority)) {
333 SDVLOG(1) << "Unable to run a nudge job right now";
334 return false;
337 const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
338 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
339 SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
340 return false;
343 if (mode_ == CONFIGURATION_MODE) {
344 SDVLOG(1) << "Not running nudge because we're in configuration mode.";
345 return false;
348 return true;
351 void SyncSchedulerImpl::ScheduleLocalNudge(
352 ModelTypeSet types,
353 const tracked_objects::Location& nudge_location) {
354 DCHECK(CalledOnValidThread());
355 DCHECK(!types.Empty());
357 SDVLOG_LOC(nudge_location, 2)
358 << "Scheduling sync because of local change to "
359 << ModelTypeSetToString(types);
360 UpdateNudgeTimeRecords(types);
361 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types);
362 ScheduleNudgeImpl(nudge_delay, nudge_location);
365 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
366 ModelTypeSet types,
367 const tracked_objects::Location& nudge_location) {
368 DCHECK(CalledOnValidThread());
369 DCHECK(!types.Empty());
371 SDVLOG_LOC(nudge_location, 2)
372 << "Scheduling sync because of local refresh request for "
373 << ModelTypeSetToString(types);
374 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types);
375 ScheduleNudgeImpl(nudge_delay, nudge_location);
378 void SyncSchedulerImpl::ScheduleInvalidationNudge(
379 syncer::ModelType model_type,
380 scoped_ptr<InvalidationInterface> invalidation,
381 const tracked_objects::Location& nudge_location) {
382 DCHECK(CalledOnValidThread());
384 SDVLOG_LOC(nudge_location, 2)
385 << "Scheduling sync because we received invalidation for "
386 << ModelTypeToString(model_type);
387 base::TimeDelta nudge_delay =
388 nudge_tracker_.RecordRemoteInvalidation(model_type, invalidation.Pass());
389 ScheduleNudgeImpl(nudge_delay, nudge_location);
392 void SyncSchedulerImpl::ScheduleInitialSyncNudge(syncer::ModelType model_type) {
393 DCHECK(CalledOnValidThread());
395 SDVLOG(2) << "Scheduling non-blocking initial sync for "
396 << ModelTypeToString(model_type);
397 nudge_tracker_.RecordInitialSyncRequired(model_type);
398 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE);
401 // TODO(zea): Consider adding separate throttling/backoff for datatype
402 // refresh requests.
403 void SyncSchedulerImpl::ScheduleNudgeImpl(
404 const TimeDelta& delay,
405 const tracked_objects::Location& nudge_location) {
406 DCHECK(CalledOnValidThread());
408 if (no_scheduling_allowed_) {
409 NOTREACHED() << "Illegal to schedule job while session in progress.";
410 return;
413 if (!started_) {
414 SDVLOG_LOC(nudge_location, 2)
415 << "Dropping nudge, scheduler is not running.";
416 return;
419 SDVLOG_LOC(nudge_location, 2)
420 << "In ScheduleNudgeImpl with delay "
421 << delay.InMilliseconds() << " ms";
423 if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
424 return;
426 TimeTicks incoming_run_time = TimeTicks::Now() + delay;
427 if (!scheduled_nudge_time_.is_null() &&
428 (scheduled_nudge_time_ < incoming_run_time)) {
429 // Old job arrives sooner than this one. Don't reschedule it.
430 return;
433 // Either there is no existing nudge in flight or the incoming nudge should be
434 // made to arrive first (preempt) the existing nudge. We reschedule in either
435 // case.
436 SDVLOG_LOC(nudge_location, 2)
437 << "Scheduling a nudge with "
438 << delay.InMilliseconds() << " ms delay";
439 scheduled_nudge_time_ = incoming_run_time;
440 pending_wakeup_timer_.Start(
441 nudge_location,
442 delay,
443 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
444 weak_ptr_factory_.GetWeakPtr()));
447 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
448 switch (mode) {
449 ENUM_CASE(CONFIGURATION_MODE);
450 ENUM_CASE(NORMAL_MODE);
452 return "";
455 void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms) {
456 DCHECK(CalledOnValidThread());
457 nudge_tracker_.SetDefaultNudgeDelay(delay_ms);
460 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
461 DCHECK(CalledOnValidThread());
462 DCHECK(CanRunNudgeJobNow(priority));
464 DVLOG(2) << "Will run normal mode sync cycle with types "
465 << ModelTypeSetToString(session_context_->GetEnabledTypes());
466 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
467 bool premature_exit = !syncer_->NormalSyncShare(
468 GetEnabledAndUnthrottledTypes(), &nudge_tracker_, session.get());
469 AdjustPolling(FORCE_RESET);
470 // Don't run poll job till the next time poll timer fires.
471 do_poll_after_credentials_updated_ = false;
473 bool success = !premature_exit
474 && !sessions::HasSyncerError(
475 session->status_controller().model_neutral_state());
477 if (success) {
478 // That cycle took care of any outstanding work we had.
479 SDVLOG(2) << "Nudge succeeded.";
480 nudge_tracker_.RecordSuccessfulSyncCycle();
481 scheduled_nudge_time_ = base::TimeTicks();
483 // If we're here, then we successfully reached the server. End all backoff.
484 wait_interval_.reset();
485 NotifyRetryTime(base::Time());
486 } else {
487 HandleFailure(session->status_controller().model_neutral_state());
491 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
492 DCHECK(CalledOnValidThread());
493 DCHECK_EQ(mode_, CONFIGURATION_MODE);
494 DCHECK(pending_configure_params_ != NULL);
496 if (!CanRunJobNow(priority)) {
497 SDVLOG(2) << "Unable to run configure job right now.";
498 if (!pending_configure_params_->retry_task.is_null()) {
499 pending_configure_params_->retry_task.Run();
500 pending_configure_params_->retry_task.Reset();
502 return;
505 SDVLOG(2) << "Will run configure SyncShare with types "
506 << ModelTypeSetToString(session_context_->GetEnabledTypes());
507 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
508 bool premature_exit = !syncer_->ConfigureSyncShare(
509 pending_configure_params_->types_to_download,
510 pending_configure_params_->source,
511 session.get());
512 AdjustPolling(FORCE_RESET);
513 // Don't run poll job till the next time poll timer fires.
514 do_poll_after_credentials_updated_ = false;
516 bool success = !premature_exit
517 && !sessions::HasSyncerError(
518 session->status_controller().model_neutral_state());
520 if (success) {
521 SDVLOG(2) << "Configure succeeded.";
522 pending_configure_params_->ready_task.Run();
523 pending_configure_params_.reset();
525 // If we're here, then we successfully reached the server. End all backoff.
526 wait_interval_.reset();
527 NotifyRetryTime(base::Time());
528 } else {
529 HandleFailure(session->status_controller().model_neutral_state());
530 // Sync cycle might receive response from server that causes scheduler to
531 // stop and draws pending_configure_params_ invalid.
532 if (started_ && !pending_configure_params_->retry_task.is_null()) {
533 pending_configure_params_->retry_task.Run();
534 pending_configure_params_->retry_task.Reset();
539 void SyncSchedulerImpl::HandleFailure(
540 const sessions::ModelNeutralState& model_neutral_state) {
541 if (IsCurrentlyThrottled()) {
542 SDVLOG(2) << "Was throttled during previous sync cycle.";
543 RestartWaiting();
544 } else if (!IsBackingOff()) {
545 // Setup our backoff if this is our first such failure.
546 TimeDelta length = delay_provider_->GetDelay(
547 delay_provider_->GetInitialDelay(model_neutral_state));
548 wait_interval_.reset(
549 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
550 SDVLOG(2) << "Sync cycle failed. Will back off for "
551 << wait_interval_->length.InMilliseconds() << "ms.";
552 RestartWaiting();
556 void SyncSchedulerImpl::DoPollSyncSessionJob() {
557 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
559 SDVLOG(2) << "Polling with types "
560 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
561 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
562 syncer_->PollSyncShare(
563 GetEnabledAndUnthrottledTypes(),
564 session.get());
566 AdjustPolling(FORCE_RESET);
568 if (IsCurrentlyThrottled()) {
569 SDVLOG(2) << "Poll request got us throttled.";
570 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
571 // to do is start the timer.
572 RestartWaiting();
576 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
577 DCHECK(CalledOnValidThread());
578 base::TimeTicks now = TimeTicks::Now();
579 // Update timing information for how often datatypes are triggering nudges.
580 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
581 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
582 last_local_nudges_by_model_type_[iter.Get()] = now;
583 if (previous.is_null())
584 continue;
586 #define PER_DATA_TYPE_MACRO(type_str) \
587 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
588 SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
589 #undef PER_DATA_TYPE_MACRO
593 TimeDelta SyncSchedulerImpl::GetPollInterval() {
594 return (!session_context_->notifications_enabled() ||
595 !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
596 syncer_short_poll_interval_seconds_ :
597 syncer_long_poll_interval_seconds_;
600 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
601 DCHECK(CalledOnValidThread());
603 TimeDelta poll = GetPollInterval();
604 bool rate_changed = !poll_timer_.IsRunning() ||
605 poll != poll_timer_.GetCurrentDelay();
607 if (type == FORCE_RESET) {
608 last_poll_reset_ = base::TimeTicks::Now();
609 if (!rate_changed)
610 poll_timer_.Reset();
613 if (!rate_changed)
614 return;
616 // Adjust poll rate.
617 poll_timer_.Stop();
618 poll_timer_.Start(FROM_HERE, poll, this,
619 &SyncSchedulerImpl::PollTimerCallback);
622 void SyncSchedulerImpl::RestartWaiting() {
623 CHECK(wait_interval_.get());
624 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
625 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
626 SDVLOG(2) << "Starting WaitInterval timer of length "
627 << wait_interval_->length.InMilliseconds() << "ms.";
628 if (wait_interval_->mode == WaitInterval::THROTTLED) {
629 pending_wakeup_timer_.Start(
630 FROM_HERE,
631 wait_interval_->length,
632 base::Bind(&SyncSchedulerImpl::Unthrottle,
633 weak_ptr_factory_.GetWeakPtr()));
634 } else {
635 pending_wakeup_timer_.Start(
636 FROM_HERE,
637 wait_interval_->length,
638 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
639 weak_ptr_factory_.GetWeakPtr()));
643 void SyncSchedulerImpl::Stop() {
644 DCHECK(CalledOnValidThread());
645 SDVLOG(2) << "Stop called";
647 // Kill any in-flight method calls.
648 weak_ptr_factory_.InvalidateWeakPtrs();
649 wait_interval_.reset();
650 NotifyRetryTime(base::Time());
651 poll_timer_.Stop();
652 pending_wakeup_timer_.Stop();
653 pending_configure_params_.reset();
654 if (started_)
655 started_ = false;
658 // This is the only place where we invoke DoSyncSessionJob with canary
659 // privileges. Everyone else should use NORMAL_PRIORITY.
660 void SyncSchedulerImpl::TryCanaryJob() {
661 next_sync_session_job_priority_ = CANARY_PRIORITY;
662 TrySyncSessionJob();
665 void SyncSchedulerImpl::TrySyncSessionJob() {
666 // Post call to TrySyncSessionJobImpl on current thread. Later request for
667 // access token will be here.
668 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
669 &SyncSchedulerImpl::TrySyncSessionJobImpl,
670 weak_ptr_factory_.GetWeakPtr()));
673 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
674 JobPriority priority = next_sync_session_job_priority_;
675 next_sync_session_job_priority_ = NORMAL_PRIORITY;
677 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
679 DCHECK(CalledOnValidThread());
680 if (mode_ == CONFIGURATION_MODE) {
681 if (pending_configure_params_) {
682 SDVLOG(2) << "Found pending configure job";
683 DoConfigurationSyncSessionJob(priority);
685 } else if (CanRunNudgeJobNow(priority)) {
686 if (nudge_tracker_.IsSyncRequired()) {
687 SDVLOG(2) << "Found pending nudge job";
688 DoNudgeSyncSessionJob(priority);
689 } else if (do_poll_after_credentials_updated_ ||
690 ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
691 DoPollSyncSessionJob();
692 // Poll timer fires infrequently. Usually by this time access token is
693 // already expired and poll job will fail with auth error. Set flag to
694 // retry poll once ProfileSyncService gets new access token, TryCanaryJob
695 // will be called after access token is retrieved.
696 if (HttpResponse::SYNC_AUTH_ERROR ==
697 session_context_->connection_manager()->server_status()) {
698 do_poll_after_credentials_updated_ = true;
703 if (priority == CANARY_PRIORITY) {
704 // If this is canary job then whatever result was don't run poll job till
705 // the next time poll timer fires.
706 do_poll_after_credentials_updated_ = false;
709 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
710 // If we succeeded, our wait interval would have been cleared. If it hasn't
711 // been cleared, then we should increase our backoff interval and schedule
712 // another retry.
713 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
714 wait_interval_.reset(
715 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
716 SDVLOG(2) << "Sync cycle failed. Will back off for "
717 << wait_interval_->length.InMilliseconds() << "ms.";
718 RestartWaiting();
722 void SyncSchedulerImpl::PollTimerCallback() {
723 DCHECK(CalledOnValidThread());
724 if (no_scheduling_allowed_) {
725 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
726 // functions that are called only on the sync thread. This function is also
727 // called only on the sync thread, and only when it is posted by an expiring
728 // timer. If we find that no_scheduling_allowed_ is set here, then
729 // something is very wrong. Maybe someone mistakenly called us directly, or
730 // mishandled the book-keeping for no_scheduling_allowed_.
731 NOTREACHED() << "Illegal to schedule job while session in progress.";
732 return;
735 TrySyncSessionJob();
738 void SyncSchedulerImpl::RetryTimerCallback() {
739 TrySyncSessionJob();
742 void SyncSchedulerImpl::Unthrottle() {
743 DCHECK(CalledOnValidThread());
744 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
746 // We're no longer throttled, so clear the wait interval.
747 wait_interval_.reset();
748 NotifyRetryTime(base::Time());
749 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
751 // We treat this as a 'canary' in the sense that it was originally scheduled
752 // to run some time ago, failed, and we now want to retry, versus a job that
753 // was just created (e.g via ScheduleNudgeImpl). The main implication is
754 // that we're careful to update routing info (etc) with such potentially
755 // stale canary jobs.
756 TryCanaryJob();
759 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
760 DCHECK(CalledOnValidThread());
761 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
762 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
764 if (nudge_tracker_.IsAnyTypeThrottled()) {
765 const base::TimeTicks now = base::TimeTicks::Now();
766 base::TimeDelta time_until_next_unthrottle =
767 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
768 type_unthrottle_timer_.Start(
769 FROM_HERE,
770 time_until_next_unthrottle,
771 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
772 weak_ptr_factory_.GetWeakPtr(),
773 now + time_until_next_unthrottle));
776 // Maybe this is a good time to run a nudge job. Let's try it.
777 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
778 TrySyncSessionJob();
781 void SyncSchedulerImpl::PerformDelayedNudge() {
782 // Circumstances may have changed since we scheduled this delayed nudge.
783 // We must check to see if it's OK to run the job before we do so.
784 if (CanRunNudgeJobNow(NORMAL_PRIORITY))
785 TrySyncSessionJob();
787 // We're not responsible for setting up any retries here. The functions that
788 // first put us into a state that prevents successful sync cycles (eg. global
789 // throttling, type throttling, network errors, transient errors) will also
790 // setup the appropriate retry logic (eg. retry after timeout, exponential
791 // backoff, retry when the network changes).
794 void SyncSchedulerImpl::ExponentialBackoffRetry() {
795 TryCanaryJob();
798 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
799 FOR_EACH_OBSERVER(SyncEngineEventListener,
800 *session_context_->listeners(),
801 OnRetryTimeChanged(retry_time));
804 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
805 FOR_EACH_OBSERVER(SyncEngineEventListener,
806 *session_context_->listeners(),
807 OnThrottledTypesChanged(types));
810 bool SyncSchedulerImpl::IsBackingOff() const {
811 DCHECK(CalledOnValidThread());
812 return wait_interval_.get() && wait_interval_->mode ==
813 WaitInterval::EXPONENTIAL_BACKOFF;
816 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
817 DCHECK(CalledOnValidThread());
818 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
819 throttle_duration));
820 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
821 NotifyThrottledTypesChanged(ModelTypeSet::All());
824 void SyncSchedulerImpl::OnTypesThrottled(
825 ModelTypeSet types,
826 const base::TimeDelta& throttle_duration) {
827 base::TimeTicks now = base::TimeTicks::Now();
829 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
830 base::TimeDelta time_until_next_unthrottle =
831 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
832 type_unthrottle_timer_.Start(
833 FROM_HERE,
834 time_until_next_unthrottle,
835 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
836 weak_ptr_factory_.GetWeakPtr(),
837 now + time_until_next_unthrottle));
838 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
841 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
842 DCHECK(CalledOnValidThread());
843 return wait_interval_.get() && wait_interval_->mode ==
844 WaitInterval::THROTTLED;
847 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
848 const base::TimeDelta& new_interval) {
849 DCHECK(CalledOnValidThread());
850 syncer_short_poll_interval_seconds_ = new_interval;
853 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
854 const base::TimeDelta& new_interval) {
855 DCHECK(CalledOnValidThread());
856 syncer_long_poll_interval_seconds_ = new_interval;
859 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays(
860 const std::map<ModelType, base::TimeDelta>& nudge_delays) {
861 DCHECK(CalledOnValidThread());
862 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays);
865 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
866 if (size > 0)
867 nudge_tracker_.SetHintBufferSize(size);
868 else
869 NOTREACHED() << "Hint buffer size should be > 0.";
872 void SyncSchedulerImpl::OnSyncProtocolError(
873 const SyncProtocolError& sync_protocol_error) {
874 DCHECK(CalledOnValidThread());
875 if (ShouldRequestEarlyExit(sync_protocol_error)) {
876 SDVLOG(2) << "Sync Scheduler requesting early exit.";
877 Stop();
879 if (IsActionableError(sync_protocol_error)) {
880 SDVLOG(2) << "OnActionableError";
881 FOR_EACH_OBSERVER(SyncEngineEventListener,
882 *session_context_->listeners(),
883 OnActionableError(sync_protocol_error));
887 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
888 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
889 retry_timer_.Start(FROM_HERE, delay, this,
890 &SyncSchedulerImpl::RetryTimerCallback);
893 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
894 FOR_EACH_OBSERVER(SyncEngineEventListener,
895 *session_context_->listeners(),
896 OnMigrationRequested(types));
899 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
900 DCHECK(CalledOnValidThread());
901 session_context_->set_notifications_enabled(notifications_enabled);
902 if (notifications_enabled)
903 nudge_tracker_.OnInvalidationsEnabled();
904 else
905 nudge_tracker_.OnInvalidationsDisabled();
908 #undef SDVLOG_LOC
910 #undef SDVLOG
912 #undef SLOG
914 #undef ENUM_CASE
916 } // namespace syncer