1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/invalidation/sync_invalidation_listener.h"
10 #include "base/callback.h"
11 #include "base/compiler_specific.h"
12 #include "base/logging.h"
13 #include "base/tracked_objects.h"
14 #include "components/invalidation/invalidation_util.h"
15 #include "components/invalidation/object_id_invalidation_map.h"
16 #include "components/invalidation/registration_manager.h"
17 #include "google/cacheinvalidation/include/invalidation-client.h"
18 #include "google/cacheinvalidation/include/types.h"
19 #include "jingle/notifier/listener/push_client.h"
23 const char kApplicationName
[] = "chrome-sync";
29 SyncInvalidationListener::Delegate::~Delegate() {}
31 SyncInvalidationListener::SyncInvalidationListener(
32 scoped_ptr
<SyncNetworkChannel
> network_channel
)
33 : sync_network_channel_(network_channel
.Pass()),
34 sync_system_resources_(sync_network_channel_
.get(), this),
36 ticl_state_(DEFAULT_INVALIDATION_ERROR
),
37 push_client_state_(DEFAULT_INVALIDATION_ERROR
),
38 weak_ptr_factory_(this) {
39 DCHECK(CalledOnValidThread());
40 sync_network_channel_
->AddObserver(this);
43 SyncInvalidationListener::~SyncInvalidationListener() {
44 DCHECK(CalledOnValidThread());
45 sync_network_channel_
->RemoveObserver(this);
50 void SyncInvalidationListener::Start(
51 const CreateInvalidationClientCallback
& create_invalidation_client_callback
,
52 const std::string
& client_id
,
53 const std::string
& client_info
,
54 const std::string
& invalidation_bootstrap_data
,
55 const UnackedInvalidationsMap
& initial_unacked_invalidations
,
56 const base::WeakPtr
<InvalidationStateTracker
>& invalidation_state_tracker
,
57 const scoped_refptr
<base::SequencedTaskRunner
>&
58 invalidation_state_tracker_task_runner
,
60 DCHECK(CalledOnValidThread());
63 sync_system_resources_
.set_platform(client_info
);
64 sync_system_resources_
.Start();
66 // The Storage resource is implemented as a write-through cache. We populate
67 // it with the initial state on startup, so subsequent writes go to disk and
68 // update the in-memory cache, while reads just return the cached state.
69 sync_system_resources_
.storage()->SetInitialState(
70 invalidation_bootstrap_data
);
72 unacked_invalidations_map_
= initial_unacked_invalidations
;
73 invalidation_state_tracker_
= invalidation_state_tracker
;
74 invalidation_state_tracker_task_runner_
=
75 invalidation_state_tracker_task_runner
;
76 DCHECK(invalidation_state_tracker_task_runner_
.get());
82 invalidation_client_
.reset(create_invalidation_client_callback
.Run(
83 &sync_system_resources_
,
84 sync_network_channel_
->GetInvalidationClientType(),
88 invalidation_client_
->Start();
90 registration_manager_
.reset(
91 new RegistrationManager(invalidation_client_
.get()));
94 void SyncInvalidationListener::UpdateCredentials(
95 const std::string
& email
, const std::string
& token
) {
96 DCHECK(CalledOnValidThread());
97 sync_network_channel_
->UpdateCredentials(email
, token
);
100 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet
& ids
) {
101 DCHECK(CalledOnValidThread());
102 registered_ids_
= ids
;
103 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
104 // working XMPP connection (as observed by us), so check it instead
105 // of GetState() (see http://crbug.com/139424).
106 if (ticl_state_
== INVALIDATIONS_ENABLED
&& registration_manager_
) {
107 DoRegistrationUpdate();
111 void SyncInvalidationListener::Ready(
112 invalidation::InvalidationClient
* client
) {
113 DCHECK(CalledOnValidThread());
114 DCHECK_EQ(client
, invalidation_client_
.get());
115 ticl_state_
= INVALIDATIONS_ENABLED
;
117 DoRegistrationUpdate();
120 void SyncInvalidationListener::Invalidate(
121 invalidation::InvalidationClient
* client
,
122 const invalidation::Invalidation
& invalidation
,
123 const invalidation::AckHandle
& ack_handle
) {
124 DCHECK(CalledOnValidThread());
125 DCHECK_EQ(client
, invalidation_client_
.get());
126 client
->Acknowledge(ack_handle
);
128 const invalidation::ObjectId
& id
= invalidation
.object_id();
131 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
132 if (invalidation
.has_payload())
133 payload
= invalidation
.payload();
135 DVLOG(2) << "Received invalidation with version " << invalidation
.version()
136 << " for " << ObjectIdToString(id
);
138 ObjectIdInvalidationMap invalidations
;
139 Invalidation inv
= Invalidation::Init(id
, invalidation
.version(), payload
);
140 inv
.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current());
141 invalidations
.Insert(inv
);
143 DispatchInvalidations(invalidations
);
146 void SyncInvalidationListener::InvalidateUnknownVersion(
147 invalidation::InvalidationClient
* client
,
148 const invalidation::ObjectId
& object_id
,
149 const invalidation::AckHandle
& ack_handle
) {
150 DCHECK(CalledOnValidThread());
151 DCHECK_EQ(client
, invalidation_client_
.get());
152 DVLOG(1) << "InvalidateUnknownVersion";
153 client
->Acknowledge(ack_handle
);
155 ObjectIdInvalidationMap invalidations
;
156 Invalidation unknown_version
= Invalidation::InitUnknownVersion(object_id
);
157 unknown_version
.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current());
158 invalidations
.Insert(unknown_version
);
160 DispatchInvalidations(invalidations
);
163 // This should behave as if we got an invalidation with version
164 // UNKNOWN_OBJECT_VERSION for all known data types.
165 void SyncInvalidationListener::InvalidateAll(
166 invalidation::InvalidationClient
* client
,
167 const invalidation::AckHandle
& ack_handle
) {
168 DCHECK(CalledOnValidThread());
169 DCHECK_EQ(client
, invalidation_client_
.get());
170 DVLOG(1) << "InvalidateAll";
171 client
->Acknowledge(ack_handle
);
173 ObjectIdInvalidationMap invalidations
;
174 for (ObjectIdSet::iterator it
= registered_ids_
.begin();
175 it
!= registered_ids_
.end(); ++it
) {
176 Invalidation unknown_version
= Invalidation::InitUnknownVersion(*it
);
177 unknown_version
.SetAckHandler(AsWeakPtr(),
178 base::MessageLoopProxy::current());
179 invalidations
.Insert(unknown_version
);
182 DispatchInvalidations(invalidations
);
185 // If a handler is registered, emit right away. Otherwise, save it for later.
186 void SyncInvalidationListener::DispatchInvalidations(
187 const ObjectIdInvalidationMap
& invalidations
) {
188 DCHECK(CalledOnValidThread());
190 ObjectIdInvalidationMap to_save
= invalidations
;
191 ObjectIdInvalidationMap to_emit
=
192 invalidations
.GetSubsetWithObjectIds(registered_ids_
);
194 SaveInvalidations(to_save
);
195 EmitSavedInvalidations(to_emit
);
198 void SyncInvalidationListener::SaveInvalidations(
199 const ObjectIdInvalidationMap
& to_save
) {
200 ObjectIdSet objects_to_save
= to_save
.GetObjectIds();
201 for (ObjectIdSet::const_iterator it
= objects_to_save
.begin();
202 it
!= objects_to_save
.end(); ++it
) {
203 UnackedInvalidationsMap::iterator lookup
=
204 unacked_invalidations_map_
.find(*it
);
205 if (lookup
== unacked_invalidations_map_
.end()) {
206 lookup
= unacked_invalidations_map_
.insert(
207 std::make_pair(*it
, UnackedInvalidationSet(*it
))).first
;
209 lookup
->second
.AddSet(to_save
.ForObject(*it
));
212 invalidation_state_tracker_task_runner_
->PostTask(
214 base::Bind(&InvalidationStateTracker::SetSavedInvalidations
,
215 invalidation_state_tracker_
,
216 unacked_invalidations_map_
));
219 void SyncInvalidationListener::EmitSavedInvalidations(
220 const ObjectIdInvalidationMap
& to_emit
) {
221 DVLOG(2) << "Emitting invalidations: " << to_emit
.ToString();
222 delegate_
->OnInvalidate(to_emit
);
225 void SyncInvalidationListener::InformRegistrationStatus(
226 invalidation::InvalidationClient
* client
,
227 const invalidation::ObjectId
& object_id
,
228 InvalidationListener::RegistrationState new_state
) {
229 DCHECK(CalledOnValidThread());
230 DCHECK_EQ(client
, invalidation_client_
.get());
231 DVLOG(1) << "InformRegistrationStatus: "
232 << ObjectIdToString(object_id
) << " " << new_state
;
234 if (new_state
!= InvalidationListener::REGISTERED
) {
235 // Let |registration_manager_| handle the registration backoff policy.
236 registration_manager_
->MarkRegistrationLost(object_id
);
240 void SyncInvalidationListener::InformRegistrationFailure(
241 invalidation::InvalidationClient
* client
,
242 const invalidation::ObjectId
& object_id
,
244 const std::string
& error_message
) {
245 DCHECK(CalledOnValidThread());
246 DCHECK_EQ(client
, invalidation_client_
.get());
247 DVLOG(1) << "InformRegistrationFailure: "
248 << ObjectIdToString(object_id
)
249 << "is_transient=" << is_transient
250 << ", message=" << error_message
;
253 // We don't care about |unknown_hint|; we let
254 // |registration_manager_| handle the registration backoff policy.
255 registration_manager_
->MarkRegistrationLost(object_id
);
257 // Non-transient failures require an action to resolve. This could happen
259 // - the server doesn't yet recognize the data type, which could happen for
260 // brand-new data types.
261 // - the user has changed his password and hasn't updated it yet locally.
262 // Either way, block future registration attempts for |object_id|. However,
263 // we don't forget any saved invalidation state since we may use it once the
264 // error is addressed.
265 registration_manager_
->DisableId(object_id
);
269 void SyncInvalidationListener::ReissueRegistrations(
270 invalidation::InvalidationClient
* client
,
271 const std::string
& prefix
,
273 DCHECK(CalledOnValidThread());
274 DCHECK_EQ(client
, invalidation_client_
.get());
275 DVLOG(1) << "AllRegistrationsLost";
276 registration_manager_
->MarkAllRegistrationsLost();
279 void SyncInvalidationListener::InformError(
280 invalidation::InvalidationClient
* client
,
281 const invalidation::ErrorInfo
& error_info
) {
282 DCHECK(CalledOnValidThread());
283 DCHECK_EQ(client
, invalidation_client_
.get());
284 LOG(ERROR
) << "Ticl error " << error_info
.error_reason() << ": "
285 << error_info
.error_message()
286 << " (transient = " << error_info
.is_transient() << ")";
287 if (error_info
.error_reason() == invalidation::ErrorReason::AUTH_FAILURE
) {
288 ticl_state_
= INVALIDATION_CREDENTIALS_REJECTED
;
290 ticl_state_
= TRANSIENT_INVALIDATION_ERROR
;
295 void SyncInvalidationListener::Acknowledge(
296 const invalidation::ObjectId
& id
,
297 const syncer::AckHandle
& handle
) {
298 UnackedInvalidationsMap::iterator lookup
=
299 unacked_invalidations_map_
.find(id
);
300 if (lookup
== unacked_invalidations_map_
.end()) {
301 DLOG(WARNING
) << "Received acknowledgement for untracked object ID";
304 lookup
->second
.Acknowledge(handle
);
305 invalidation_state_tracker_task_runner_
->PostTask(
307 base::Bind(&InvalidationStateTracker::SetSavedInvalidations
,
308 invalidation_state_tracker_
,
309 unacked_invalidations_map_
));
312 void SyncInvalidationListener::Drop(
313 const invalidation::ObjectId
& id
,
314 const syncer::AckHandle
& handle
) {
315 UnackedInvalidationsMap::iterator lookup
=
316 unacked_invalidations_map_
.find(id
);
317 if (lookup
== unacked_invalidations_map_
.end()) {
318 DLOG(WARNING
) << "Received drop for untracked object ID";
321 lookup
->second
.Drop(handle
);
322 invalidation_state_tracker_task_runner_
->PostTask(
324 base::Bind(&InvalidationStateTracker::SetSavedInvalidations
,
325 invalidation_state_tracker_
,
326 unacked_invalidations_map_
));
329 void SyncInvalidationListener::WriteState(const std::string
& state
) {
330 DCHECK(CalledOnValidThread());
331 DVLOG(1) << "WriteState";
332 invalidation_state_tracker_task_runner_
->PostTask(
334 base::Bind(&InvalidationStateTracker::SetBootstrapData
,
335 invalidation_state_tracker_
,
339 void SyncInvalidationListener::DoRegistrationUpdate() {
340 DCHECK(CalledOnValidThread());
341 const ObjectIdSet
& unregistered_ids
=
342 registration_manager_
->UpdateRegisteredIds(registered_ids_
);
343 for (ObjectIdSet::iterator it
= unregistered_ids
.begin();
344 it
!= unregistered_ids
.end(); ++it
) {
345 unacked_invalidations_map_
.erase(*it
);
347 invalidation_state_tracker_task_runner_
->PostTask(
349 base::Bind(&InvalidationStateTracker::SetSavedInvalidations
,
350 invalidation_state_tracker_
,
351 unacked_invalidations_map_
));
353 ObjectIdInvalidationMap object_id_invalidation_map
;
354 for (UnackedInvalidationsMap::iterator map_it
=
355 unacked_invalidations_map_
.begin();
356 map_it
!= unacked_invalidations_map_
.end(); ++map_it
) {
357 if (registered_ids_
.find(map_it
->first
) == registered_ids_
.end()) {
360 map_it
->second
.ExportInvalidations(AsWeakPtr(),
361 base::MessageLoopProxy::current(),
362 &object_id_invalidation_map
);
365 // There's no need to run these through DispatchInvalidations(); they've
366 // already been saved to storage (that's where we found them) so all we need
367 // to do now is emit them.
368 EmitSavedInvalidations(object_id_invalidation_map
);
371 void SyncInvalidationListener::RequestDetailedStatus(
372 base::Callback
<void(const base::DictionaryValue
&)> callback
) const {
373 DCHECK(CalledOnValidThread());
374 sync_network_channel_
->RequestDetailedStatus(callback
);
375 callback
.Run(*CollectDebugData());
378 scoped_ptr
<base::DictionaryValue
>
379 SyncInvalidationListener::CollectDebugData() const {
380 scoped_ptr
<base::DictionaryValue
> return_value(new base::DictionaryValue());
381 return_value
->SetString(
382 "SyncInvalidationListener.PushClientState",
383 std::string(InvalidatorStateToString(push_client_state_
)));
384 return_value
->SetString("SyncInvalidationListener.TiclState",
385 std::string(InvalidatorStateToString(ticl_state_
)));
386 scoped_ptr
<base::DictionaryValue
> unacked_map(new base::DictionaryValue());
387 for (UnackedInvalidationsMap::const_iterator it
=
388 unacked_invalidations_map_
.begin();
389 it
!= unacked_invalidations_map_
.end();
391 unacked_map
->Set((it
->first
).name(), (it
->second
).ToValue().release());
393 return_value
->Set("SyncInvalidationListener.UnackedInvalidationsMap",
394 unacked_map
.release());
395 return return_value
.Pass();
398 void SyncInvalidationListener::StopForTest() {
399 DCHECK(CalledOnValidThread());
403 void SyncInvalidationListener::Stop() {
404 DCHECK(CalledOnValidThread());
405 if (!invalidation_client_
) {
409 registration_manager_
.reset();
410 sync_system_resources_
.Stop();
411 invalidation_client_
->Stop();
413 invalidation_client_
.reset();
416 ticl_state_
= DEFAULT_INVALIDATION_ERROR
;
417 push_client_state_
= DEFAULT_INVALIDATION_ERROR
;
420 InvalidatorState
SyncInvalidationListener::GetState() const {
421 DCHECK(CalledOnValidThread());
422 if (ticl_state_
== INVALIDATION_CREDENTIALS_REJECTED
||
423 push_client_state_
== INVALIDATION_CREDENTIALS_REJECTED
) {
424 // If either the ticl or the push client rejected our credentials,
425 // return INVALIDATION_CREDENTIALS_REJECTED.
426 return INVALIDATION_CREDENTIALS_REJECTED
;
428 if (ticl_state_
== INVALIDATIONS_ENABLED
&&
429 push_client_state_
== INVALIDATIONS_ENABLED
) {
430 // If the ticl is ready and the push client notifications are
431 // enabled, return INVALIDATIONS_ENABLED.
432 return INVALIDATIONS_ENABLED
;
434 // Otherwise, we have a transient error.
435 return TRANSIENT_INVALIDATION_ERROR
;
438 void SyncInvalidationListener::EmitStateChange() {
439 DCHECK(CalledOnValidThread());
440 delegate_
->OnInvalidatorStateChange(GetState());
443 base::WeakPtr
<AckHandler
> SyncInvalidationListener::AsWeakPtr() {
444 DCHECK(CalledOnValidThread());
445 base::WeakPtr
<AckHandler
> weak_ptr
= weak_ptr_factory_
.GetWeakPtr();
446 weak_ptr
.get(); // Binds the pointer to this thread.
450 void SyncInvalidationListener::OnNetworkChannelStateChanged(
451 InvalidatorState invalidator_state
) {
452 DCHECK(CalledOnValidThread());
453 push_client_state_
= invalidator_state
;
457 } // namespace syncer