1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/notifier/sync_invalidation_listener.h"
10 #include "base/callback.h"
11 #include "base/compiler_specific.h"
12 #include "base/logging.h"
13 #include "base/tracked_objects.h"
14 #include "google/cacheinvalidation/include/invalidation-client.h"
15 #include "google/cacheinvalidation/include/types.h"
16 #include "google/cacheinvalidation/types.pb.h"
17 #include "jingle/notifier/listener/push_client.h"
18 #include "sync/notifier/invalidation_util.h"
19 #include "sync/notifier/registration_manager.h"
23 const char kApplicationName
[] = "chrome-sync";
25 static const int64 kUnknownVersion
= -1;
31 SyncInvalidationListener::Delegate::~Delegate() {}
33 SyncInvalidationListener::SyncInvalidationListener(
34 base::TickClock
* tick_clock
,
35 scoped_ptr
<notifier::PushClient
> push_client
)
36 : ack_tracker_(tick_clock
, this),
37 push_client_(push_client
.get()),
38 sync_system_resources_(push_client
.Pass(), this),
40 ticl_state_(DEFAULT_INVALIDATION_ERROR
),
41 push_client_state_(DEFAULT_INVALIDATION_ERROR
),
42 weak_ptr_factory_(this) {
43 DCHECK(CalledOnValidThread());
44 push_client_
->AddObserver(this);
47 SyncInvalidationListener::~SyncInvalidationListener() {
48 DCHECK(CalledOnValidThread());
49 push_client_
->RemoveObserver(this);
54 void SyncInvalidationListener::Start(
55 const CreateInvalidationClientCallback
&
56 create_invalidation_client_callback
,
57 const std::string
& client_id
, const std::string
& client_info
,
58 const std::string
& invalidation_bootstrap_data
,
59 const InvalidationStateMap
& initial_invalidation_state_map
,
60 const WeakHandle
<InvalidationStateTracker
>& invalidation_state_tracker
,
62 DCHECK(CalledOnValidThread());
65 sync_system_resources_
.set_platform(client_info
);
66 sync_system_resources_
.Start();
68 // The Storage resource is implemented as a write-through cache. We populate
69 // it with the initial state on startup, so subsequent writes go to disk and
70 // update the in-memory cache, while reads just return the cached state.
71 sync_system_resources_
.storage()->SetInitialState(
72 invalidation_bootstrap_data
);
74 invalidation_state_map_
= initial_invalidation_state_map
;
75 if (invalidation_state_map_
.empty()) {
76 DVLOG(2) << "No initial max invalidation versions for any id";
78 for (InvalidationStateMap::const_iterator it
=
79 invalidation_state_map_
.begin();
80 it
!= invalidation_state_map_
.end(); ++it
) {
81 DVLOG(2) << "Initial max invalidation version for "
82 << ObjectIdToString(it
->first
) << " is "
83 << it
->second
.version
;
86 invalidation_state_tracker_
= invalidation_state_tracker
;
87 DCHECK(invalidation_state_tracker_
.IsInitialized());
94 int client_type
= ipc::invalidation::ClientType::CHROME_SYNC_IOS
;
96 int client_type
= ipc::invalidation::ClientType::CHROME_SYNC
;
98 invalidation_client_
.reset(
99 create_invalidation_client_callback
.Run(
100 &sync_system_resources_
, client_type
, client_id
,
101 kApplicationName
, this));
102 invalidation_client_
->Start();
104 registration_manager_
.reset(
105 new RegistrationManager(invalidation_client_
.get()));
107 // Set up reminders for any invalidations that have not been locally
109 ObjectIdSet unacknowledged_ids
;
110 for (InvalidationStateMap::const_iterator it
=
111 invalidation_state_map_
.begin();
112 it
!= invalidation_state_map_
.end(); ++it
) {
113 if (it
->second
.expected
.Equals(it
->second
.current
))
115 unacknowledged_ids
.insert(it
->first
);
117 if (!unacknowledged_ids
.empty())
118 ack_tracker_
.Track(unacknowledged_ids
);
121 void SyncInvalidationListener::UpdateCredentials(
122 const std::string
& email
, const std::string
& token
) {
123 DCHECK(CalledOnValidThread());
124 sync_system_resources_
.network()->UpdateCredentials(email
, token
);
127 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet
& ids
) {
128 DCHECK(CalledOnValidThread());
129 registered_ids_
= ids
;
130 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
131 // working XMPP connection (as observed by us), so check it instead
132 // of GetState() (see http://crbug.com/139424).
133 if (ticl_state_
== INVALIDATIONS_ENABLED
&& registration_manager_
) {
134 DoRegistrationUpdate();
138 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId
& id
,
139 const AckHandle
& ack_handle
) {
140 DCHECK(CalledOnValidThread());
141 InvalidationStateMap::iterator state_it
= invalidation_state_map_
.find(id
);
142 if (state_it
== invalidation_state_map_
.end())
144 invalidation_state_tracker_
.Call(
146 &InvalidationStateTracker::Acknowledge
,
149 state_it
->second
.current
= ack_handle
;
150 if (state_it
->second
.expected
.Equals(ack_handle
)) {
151 // If the received ack matches the expected ack, then we no longer need to
152 // keep track of |id| since it is up-to-date.
155 ack_tracker_
.Ack(ids
);
159 void SyncInvalidationListener::Ready(
160 invalidation::InvalidationClient
* client
) {
161 DCHECK(CalledOnValidThread());
162 DCHECK_EQ(client
, invalidation_client_
.get());
163 ticl_state_
= INVALIDATIONS_ENABLED
;
165 DoRegistrationUpdate();
168 void SyncInvalidationListener::Invalidate(
169 invalidation::InvalidationClient
* client
,
170 const invalidation::Invalidation
& invalidation
,
171 const invalidation::AckHandle
& ack_handle
) {
172 DCHECK(CalledOnValidThread());
173 DCHECK_EQ(client
, invalidation_client_
.get());
174 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation
);
176 const invalidation::ObjectId
& id
= invalidation
.object_id();
178 // The invalidation API spec allows for the possibility of redundant
179 // invalidations, so keep track of the max versions and drop
180 // invalidations with old versions.
182 // TODO(akalin): Now that we keep track of registered ids, we
183 // should drop invalidations for unregistered ids. We may also
184 // have to filter it at a higher level, as invalidations for
185 // newly-unregistered ids may already be in flight.
186 InvalidationStateMap::const_iterator it
= invalidation_state_map_
.find(id
);
187 if ((it
!= invalidation_state_map_
.end()) &&
188 (invalidation
.version() <= it
->second
.version
)) {
189 // Drop redundant invalidations.
190 client
->Acknowledge(ack_handle
);
195 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
196 if (invalidation
.has_payload())
197 payload
= invalidation
.payload();
199 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id
)
200 << " to " << invalidation
.version();
201 invalidation_state_map_
[id
].version
= invalidation
.version();
202 invalidation_state_map_
[id
].payload
= payload
;
203 invalidation_state_tracker_
.Call(
205 &InvalidationStateTracker::SetMaxVersionAndPayload
,
206 id
, invalidation
.version(), payload
);
210 PrepareInvalidation(ids
, invalidation
.version(), payload
, client
, ack_handle
);
213 void SyncInvalidationListener::InvalidateUnknownVersion(
214 invalidation::InvalidationClient
* client
,
215 const invalidation::ObjectId
& object_id
,
216 const invalidation::AckHandle
& ack_handle
) {
217 DCHECK(CalledOnValidThread());
218 DCHECK_EQ(client
, invalidation_client_
.get());
219 DVLOG(1) << "InvalidateUnknownVersion";
222 ids
.insert(object_id
);
231 // This should behave as if we got an invalidation with version
232 // UNKNOWN_OBJECT_VERSION for all known data types.
233 void SyncInvalidationListener::InvalidateAll(
234 invalidation::InvalidationClient
* client
,
235 const invalidation::AckHandle
& ack_handle
) {
236 DCHECK(CalledOnValidThread());
237 DCHECK_EQ(client
, invalidation_client_
.get());
238 DVLOG(1) << "InvalidateAll";
248 void SyncInvalidationListener::PrepareInvalidation(
249 const ObjectIdSet
& ids
,
251 const std::string
& payload
,
252 invalidation::InvalidationClient
* client
,
253 const invalidation::AckHandle
& ack_handle
) {
254 DCHECK(CalledOnValidThread());
256 // A server invalidation resets the local retry count.
257 ack_tracker_
.Ack(ids
);
258 invalidation_state_tracker_
.Call(
260 &InvalidationStateTracker::GenerateAckHandles
,
262 base::MessageLoopProxy::current(),
263 base::Bind(&SyncInvalidationListener::EmitInvalidation
,
264 weak_ptr_factory_
.GetWeakPtr(),
272 void SyncInvalidationListener::EmitInvalidation(
273 const ObjectIdSet
& ids
,
275 const std::string
& payload
,
276 invalidation::InvalidationClient
* client
,
277 const invalidation::AckHandle
& ack_handle
,
278 const AckHandleMap
& local_ack_handles
) {
279 DCHECK(CalledOnValidThread());
281 ObjectIdInvalidationMap invalidation_map
;
282 for (AckHandleMap::const_iterator it
= local_ack_handles
.begin();
283 it
!= local_ack_handles
.end(); ++it
) {
284 // Update in-memory copy of the invalidation state.
285 invalidation_state_map_
[it
->first
].expected
= it
->second
;
287 if (version
== kUnknownVersion
) {
288 Invalidation inv
= Invalidation::InitUnknownVersion(it
->first
);
289 inv
.set_ack_handle(it
->second
);
290 invalidation_map
.Insert(inv
);
292 Invalidation inv
= Invalidation::Init(it
->first
, version
, payload
);
293 inv
.set_ack_handle(it
->second
);
294 invalidation_map
.Insert(inv
);
297 ack_tracker_
.Track(ids
);
298 delegate_
->OnInvalidate(invalidation_map
);
299 client
->Acknowledge(ack_handle
);
302 void SyncInvalidationListener::OnTimeout(const ObjectIdSet
& ids
) {
303 ObjectIdInvalidationMap invalidation_map
;
304 for (ObjectIdSet::const_iterator it
= ids
.begin(); it
!= ids
.end(); ++it
) {
305 if (invalidation_state_map_
[*it
].version
== kUnknownVersion
) {
306 Invalidation inv
= Invalidation::InitUnknownVersion(*it
);
307 inv
.set_ack_handle(invalidation_state_map_
[*it
].expected
);
308 invalidation_map
.Insert(inv
);
310 Invalidation inv
= Invalidation::Init(
312 invalidation_state_map_
[*it
].version
,
313 invalidation_state_map_
[*it
].payload
);
314 inv
.set_ack_handle(invalidation_state_map_
[*it
].expected
);
315 invalidation_map
.Insert(inv
);
318 delegate_
->OnInvalidate(invalidation_map
);
321 void SyncInvalidationListener::InformRegistrationStatus(
322 invalidation::InvalidationClient
* client
,
323 const invalidation::ObjectId
& object_id
,
324 InvalidationListener::RegistrationState new_state
) {
325 DCHECK(CalledOnValidThread());
326 DCHECK_EQ(client
, invalidation_client_
.get());
327 DVLOG(1) << "InformRegistrationStatus: "
328 << ObjectIdToString(object_id
) << " " << new_state
;
330 if (new_state
!= InvalidationListener::REGISTERED
) {
331 // Let |registration_manager_| handle the registration backoff policy.
332 registration_manager_
->MarkRegistrationLost(object_id
);
336 void SyncInvalidationListener::InformRegistrationFailure(
337 invalidation::InvalidationClient
* client
,
338 const invalidation::ObjectId
& object_id
,
340 const std::string
& error_message
) {
341 DCHECK(CalledOnValidThread());
342 DCHECK_EQ(client
, invalidation_client_
.get());
343 DVLOG(1) << "InformRegistrationFailure: "
344 << ObjectIdToString(object_id
)
345 << "is_transient=" << is_transient
346 << ", message=" << error_message
;
349 // We don't care about |unknown_hint|; we let
350 // |registration_manager_| handle the registration backoff policy.
351 registration_manager_
->MarkRegistrationLost(object_id
);
353 // Non-transient failures require an action to resolve. This could happen
355 // - the server doesn't yet recognize the data type, which could happen for
356 // brand-new data types.
357 // - the user has changed his password and hasn't updated it yet locally.
358 // Either way, block future registration attempts for |object_id|. However,
359 // we don't forget any saved invalidation state since we may use it once the
360 // error is addressed.
361 registration_manager_
->DisableId(object_id
);
365 void SyncInvalidationListener::ReissueRegistrations(
366 invalidation::InvalidationClient
* client
,
367 const std::string
& prefix
,
369 DCHECK(CalledOnValidThread());
370 DCHECK_EQ(client
, invalidation_client_
.get());
371 DVLOG(1) << "AllRegistrationsLost";
372 registration_manager_
->MarkAllRegistrationsLost();
375 void SyncInvalidationListener::InformError(
376 invalidation::InvalidationClient
* client
,
377 const invalidation::ErrorInfo
& error_info
) {
378 DCHECK(CalledOnValidThread());
379 DCHECK_EQ(client
, invalidation_client_
.get());
380 LOG(ERROR
) << "Ticl error " << error_info
.error_reason() << ": "
381 << error_info
.error_message()
382 << " (transient = " << error_info
.is_transient() << ")";
383 if (error_info
.error_reason() == invalidation::ErrorReason::AUTH_FAILURE
) {
384 ticl_state_
= INVALIDATION_CREDENTIALS_REJECTED
;
386 ticl_state_
= TRANSIENT_INVALIDATION_ERROR
;
391 void SyncInvalidationListener::WriteState(const std::string
& state
) {
392 DCHECK(CalledOnValidThread());
393 DVLOG(1) << "WriteState";
394 invalidation_state_tracker_
.Call(
395 FROM_HERE
, &InvalidationStateTracker::SetBootstrapData
, state
);
398 void SyncInvalidationListener::DoRegistrationUpdate() {
399 DCHECK(CalledOnValidThread());
400 const ObjectIdSet
& unregistered_ids
=
401 registration_manager_
->UpdateRegisteredIds(registered_ids_
);
402 for (ObjectIdSet::const_iterator it
= unregistered_ids
.begin();
403 it
!= unregistered_ids
.end(); ++it
) {
404 invalidation_state_map_
.erase(*it
);
406 invalidation_state_tracker_
.Call(
407 FROM_HERE
, &InvalidationStateTracker::Forget
, unregistered_ids
);
408 ack_tracker_
.Ack(unregistered_ids
);
411 void SyncInvalidationListener::StopForTest() {
412 DCHECK(CalledOnValidThread());
416 InvalidationStateMap
SyncInvalidationListener::GetStateMapForTest() const {
417 DCHECK(CalledOnValidThread());
418 return invalidation_state_map_
;
421 AckTracker
* SyncInvalidationListener::GetAckTrackerForTest() {
422 return &ack_tracker_
;
425 void SyncInvalidationListener::Stop() {
426 DCHECK(CalledOnValidThread());
427 if (!invalidation_client_
) {
431 ack_tracker_
.Clear();
433 registration_manager_
.reset();
434 sync_system_resources_
.Stop();
435 invalidation_client_
->Stop();
437 invalidation_client_
.reset();
440 invalidation_state_tracker_
.Reset();
441 invalidation_state_map_
.clear();
442 ticl_state_
= DEFAULT_INVALIDATION_ERROR
;
443 push_client_state_
= DEFAULT_INVALIDATION_ERROR
;
446 InvalidatorState
SyncInvalidationListener::GetState() const {
447 DCHECK(CalledOnValidThread());
448 if (ticl_state_
== INVALIDATION_CREDENTIALS_REJECTED
||
449 push_client_state_
== INVALIDATION_CREDENTIALS_REJECTED
) {
450 // If either the ticl or the push client rejected our credentials,
451 // return INVALIDATION_CREDENTIALS_REJECTED.
452 return INVALIDATION_CREDENTIALS_REJECTED
;
454 if (ticl_state_
== INVALIDATIONS_ENABLED
&&
455 push_client_state_
== INVALIDATIONS_ENABLED
) {
456 // If the ticl is ready and the push client notifications are
457 // enabled, return INVALIDATIONS_ENABLED.
458 return INVALIDATIONS_ENABLED
;
460 // Otherwise, we have a transient error.
461 return TRANSIENT_INVALIDATION_ERROR
;
464 void SyncInvalidationListener::EmitStateChange() {
465 DCHECK(CalledOnValidThread());
466 delegate_
->OnInvalidatorStateChange(GetState());
469 void SyncInvalidationListener::OnNotificationsEnabled() {
470 DCHECK(CalledOnValidThread());
471 push_client_state_
= INVALIDATIONS_ENABLED
;
475 void SyncInvalidationListener::OnNotificationsDisabled(
476 notifier::NotificationsDisabledReason reason
) {
477 DCHECK(CalledOnValidThread());
478 push_client_state_
= FromNotifierReason(reason
);
482 void SyncInvalidationListener::OnIncomingNotification(
483 const notifier::Notification
& notification
) {
484 DCHECK(CalledOnValidThread());
485 // Do nothing, since this is already handled by |invalidation_client_|.
488 } // namespace syncer