1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/invalidation/sync_invalidation_listener.h"
10 #include "base/callback.h"
11 #include "base/compiler_specific.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/thread_task_runner_handle.h"
16 #include "base/tracked_objects.h"
17 #include "components/invalidation/invalidation_util.h"
18 #include "components/invalidation/object_id_invalidation_map.h"
19 #include "components/invalidation/registration_manager.h"
20 #include "google/cacheinvalidation/include/invalidation-client.h"
21 #include "google/cacheinvalidation/include/types.h"
22 #include "jingle/notifier/listener/push_client.h"
26 const char kApplicationName
[] = "chrome-sync";
32 SyncInvalidationListener::Delegate::~Delegate() {}
34 SyncInvalidationListener::SyncInvalidationListener(
35 scoped_ptr
<SyncNetworkChannel
> network_channel
)
36 : sync_network_channel_(network_channel
.Pass()),
37 sync_system_resources_(sync_network_channel_
.get(), this),
39 ticl_state_(DEFAULT_INVALIDATION_ERROR
),
40 push_client_state_(DEFAULT_INVALIDATION_ERROR
),
41 weak_ptr_factory_(this) {
42 DCHECK(CalledOnValidThread());
43 sync_network_channel_
->AddObserver(this);
46 SyncInvalidationListener::~SyncInvalidationListener() {
47 DCHECK(CalledOnValidThread());
48 sync_network_channel_
->RemoveObserver(this);
53 void SyncInvalidationListener::Start(
54 const CreateInvalidationClientCallback
& create_invalidation_client_callback
,
55 const std::string
& client_id
,
56 const std::string
& client_info
,
57 const std::string
& invalidation_bootstrap_data
,
58 const UnackedInvalidationsMap
& initial_unacked_invalidations
,
59 const base::WeakPtr
<InvalidationStateTracker
>& invalidation_state_tracker
,
60 const scoped_refptr
<base::SequencedTaskRunner
>&
61 invalidation_state_tracker_task_runner
,
63 DCHECK(CalledOnValidThread());
66 sync_system_resources_
.set_platform(client_info
);
67 sync_system_resources_
.Start();
69 // The Storage resource is implemented as a write-through cache. We populate
70 // it with the initial state on startup, so subsequent writes go to disk and
71 // update the in-memory cache, while reads just return the cached state.
72 sync_system_resources_
.storage()->SetInitialState(
73 invalidation_bootstrap_data
);
75 unacked_invalidations_map_
= initial_unacked_invalidations
;
76 invalidation_state_tracker_
= invalidation_state_tracker
;
77 invalidation_state_tracker_task_runner_
=
78 invalidation_state_tracker_task_runner
;
79 DCHECK(invalidation_state_tracker_task_runner_
.get());
85 invalidation_client_
.reset(create_invalidation_client_callback
.Run(
86 &sync_system_resources_
,
87 sync_network_channel_
->GetInvalidationClientType(),
91 invalidation_client_
->Start();
93 registration_manager_
.reset(
94 new RegistrationManager(invalidation_client_
.get()));
97 void SyncInvalidationListener::UpdateCredentials(
98 const std::string
& email
, const std::string
& token
) {
99 DCHECK(CalledOnValidThread());
100 sync_network_channel_
->UpdateCredentials(email
, token
);
103 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet
& ids
) {
104 DCHECK(CalledOnValidThread());
105 registered_ids_
= ids
;
106 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
107 // working XMPP connection (as observed by us), so check it instead
108 // of GetState() (see http://crbug.com/139424).
109 if (ticl_state_
== INVALIDATIONS_ENABLED
&& registration_manager_
) {
110 DoRegistrationUpdate();
114 void SyncInvalidationListener::Ready(
115 invalidation::InvalidationClient
* client
) {
116 DCHECK(CalledOnValidThread());
117 DCHECK_EQ(client
, invalidation_client_
.get());
118 ticl_state_
= INVALIDATIONS_ENABLED
;
120 DoRegistrationUpdate();
123 void SyncInvalidationListener::Invalidate(
124 invalidation::InvalidationClient
* client
,
125 const invalidation::Invalidation
& invalidation
,
126 const invalidation::AckHandle
& ack_handle
) {
127 DCHECK(CalledOnValidThread());
128 DCHECK_EQ(client
, invalidation_client_
.get());
129 client
->Acknowledge(ack_handle
);
131 const invalidation::ObjectId
& id
= invalidation
.object_id();
134 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
135 if (invalidation
.has_payload())
136 payload
= invalidation
.payload();
138 DVLOG(2) << "Received invalidation with version " << invalidation
.version()
139 << " for " << ObjectIdToString(id
);
141 ObjectIdInvalidationMap invalidations
;
142 Invalidation inv
= Invalidation::Init(id
, invalidation
.version(), payload
);
143 inv
.SetAckHandler(AsWeakPtr(), base::ThreadTaskRunnerHandle::Get());
144 invalidations
.Insert(inv
);
146 DispatchInvalidations(invalidations
);
149 void SyncInvalidationListener::InvalidateUnknownVersion(
150 invalidation::InvalidationClient
* client
,
151 const invalidation::ObjectId
& object_id
,
152 const invalidation::AckHandle
& ack_handle
) {
153 DCHECK(CalledOnValidThread());
154 DCHECK_EQ(client
, invalidation_client_
.get());
155 DVLOG(1) << "InvalidateUnknownVersion";
156 client
->Acknowledge(ack_handle
);
158 ObjectIdInvalidationMap invalidations
;
159 Invalidation unknown_version
= Invalidation::InitUnknownVersion(object_id
);
160 unknown_version
.SetAckHandler(AsWeakPtr(),
161 base::ThreadTaskRunnerHandle::Get());
162 invalidations
.Insert(unknown_version
);
164 DispatchInvalidations(invalidations
);
167 // This should behave as if we got an invalidation with version
168 // UNKNOWN_OBJECT_VERSION for all known data types.
169 void SyncInvalidationListener::InvalidateAll(
170 invalidation::InvalidationClient
* client
,
171 const invalidation::AckHandle
& ack_handle
) {
172 DCHECK(CalledOnValidThread());
173 DCHECK_EQ(client
, invalidation_client_
.get());
174 DVLOG(1) << "InvalidateAll";
175 client
->Acknowledge(ack_handle
);
177 ObjectIdInvalidationMap invalidations
;
178 for (ObjectIdSet::iterator it
= registered_ids_
.begin();
179 it
!= registered_ids_
.end(); ++it
) {
180 Invalidation unknown_version
= Invalidation::InitUnknownVersion(*it
);
181 unknown_version
.SetAckHandler(AsWeakPtr(),
182 base::ThreadTaskRunnerHandle::Get());
183 invalidations
.Insert(unknown_version
);
186 DispatchInvalidations(invalidations
);
189 // If a handler is registered, emit right away. Otherwise, save it for later.
190 void SyncInvalidationListener::DispatchInvalidations(
191 const ObjectIdInvalidationMap
& invalidations
) {
192 DCHECK(CalledOnValidThread());
194 ObjectIdInvalidationMap to_save
= invalidations
;
195 ObjectIdInvalidationMap to_emit
=
196 invalidations
.GetSubsetWithObjectIds(registered_ids_
);
198 SaveInvalidations(to_save
);
199 EmitSavedInvalidations(to_emit
);
202 void SyncInvalidationListener::SaveInvalidations(
203 const ObjectIdInvalidationMap
& to_save
) {
204 ObjectIdSet objects_to_save
= to_save
.GetObjectIds();
205 for (ObjectIdSet::const_iterator it
= objects_to_save
.begin();
206 it
!= objects_to_save
.end(); ++it
) {
207 UnackedInvalidationsMap::iterator lookup
=
208 unacked_invalidations_map_
.find(*it
);
209 if (lookup
== unacked_invalidations_map_
.end()) {
210 lookup
= unacked_invalidations_map_
.insert(
211 std::make_pair(*it
, UnackedInvalidationSet(*it
))).first
;
213 lookup
->second
.AddSet(to_save
.ForObject(*it
));
216 invalidation_state_tracker_task_runner_
->PostTask(
218 base::Bind(&InvalidationStateTracker::SetSavedInvalidations
,
219 invalidation_state_tracker_
,
220 unacked_invalidations_map_
));
223 void SyncInvalidationListener::EmitSavedInvalidations(
224 const ObjectIdInvalidationMap
& to_emit
) {
225 DVLOG(2) << "Emitting invalidations: " << to_emit
.ToString();
226 delegate_
->OnInvalidate(to_emit
);
229 void SyncInvalidationListener::InformRegistrationStatus(
230 invalidation::InvalidationClient
* client
,
231 const invalidation::ObjectId
& object_id
,
232 InvalidationListener::RegistrationState new_state
) {
233 DCHECK(CalledOnValidThread());
234 DCHECK_EQ(client
, invalidation_client_
.get());
235 DVLOG(1) << "InformRegistrationStatus: "
236 << ObjectIdToString(object_id
) << " " << new_state
;
238 if (new_state
!= InvalidationListener::REGISTERED
) {
239 // Let |registration_manager_| handle the registration backoff policy.
240 registration_manager_
->MarkRegistrationLost(object_id
);
244 void SyncInvalidationListener::InformRegistrationFailure(
245 invalidation::InvalidationClient
* client
,
246 const invalidation::ObjectId
& object_id
,
248 const std::string
& error_message
) {
249 DCHECK(CalledOnValidThread());
250 DCHECK_EQ(client
, invalidation_client_
.get());
251 DVLOG(1) << "InformRegistrationFailure: "
252 << ObjectIdToString(object_id
)
253 << "is_transient=" << is_transient
254 << ", message=" << error_message
;
257 // We don't care about |unknown_hint|; we let
258 // |registration_manager_| handle the registration backoff policy.
259 registration_manager_
->MarkRegistrationLost(object_id
);
261 // Non-transient failures require an action to resolve. This could happen
263 // - the server doesn't yet recognize the data type, which could happen for
264 // brand-new data types.
265 // - the user has changed his password and hasn't updated it yet locally.
266 // Either way, block future registration attempts for |object_id|. However,
267 // we don't forget any saved invalidation state since we may use it once the
268 // error is addressed.
269 registration_manager_
->DisableId(object_id
);
273 void SyncInvalidationListener::ReissueRegistrations(
274 invalidation::InvalidationClient
* client
,
275 const std::string
& prefix
,
277 DCHECK(CalledOnValidThread());
278 DCHECK_EQ(client
, invalidation_client_
.get());
279 DVLOG(1) << "AllRegistrationsLost";
280 registration_manager_
->MarkAllRegistrationsLost();
283 void SyncInvalidationListener::InformError(
284 invalidation::InvalidationClient
* client
,
285 const invalidation::ErrorInfo
& error_info
) {
286 DCHECK(CalledOnValidThread());
287 DCHECK_EQ(client
, invalidation_client_
.get());
288 LOG(ERROR
) << "Ticl error " << error_info
.error_reason() << ": "
289 << error_info
.error_message()
290 << " (transient = " << error_info
.is_transient() << ")";
291 if (error_info
.error_reason() == invalidation::ErrorReason::AUTH_FAILURE
) {
292 ticl_state_
= INVALIDATION_CREDENTIALS_REJECTED
;
294 ticl_state_
= TRANSIENT_INVALIDATION_ERROR
;
299 void SyncInvalidationListener::Acknowledge(
300 const invalidation::ObjectId
& id
,
301 const syncer::AckHandle
& handle
) {
302 UnackedInvalidationsMap::iterator lookup
=
303 unacked_invalidations_map_
.find(id
);
304 if (lookup
== unacked_invalidations_map_
.end()) {
305 DLOG(WARNING
) << "Received acknowledgement for untracked object ID";
308 lookup
->second
.Acknowledge(handle
);
309 invalidation_state_tracker_task_runner_
->PostTask(
311 base::Bind(&InvalidationStateTracker::SetSavedInvalidations
,
312 invalidation_state_tracker_
,
313 unacked_invalidations_map_
));
316 void SyncInvalidationListener::Drop(
317 const invalidation::ObjectId
& id
,
318 const syncer::AckHandle
& handle
) {
319 UnackedInvalidationsMap::iterator lookup
=
320 unacked_invalidations_map_
.find(id
);
321 if (lookup
== unacked_invalidations_map_
.end()) {
322 DLOG(WARNING
) << "Received drop for untracked object ID";
325 lookup
->second
.Drop(handle
);
326 invalidation_state_tracker_task_runner_
->PostTask(
328 base::Bind(&InvalidationStateTracker::SetSavedInvalidations
,
329 invalidation_state_tracker_
,
330 unacked_invalidations_map_
));
333 void SyncInvalidationListener::WriteState(const std::string
& state
) {
334 DCHECK(CalledOnValidThread());
335 DVLOG(1) << "WriteState";
336 invalidation_state_tracker_task_runner_
->PostTask(
338 base::Bind(&InvalidationStateTracker::SetBootstrapData
,
339 invalidation_state_tracker_
,
343 void SyncInvalidationListener::DoRegistrationUpdate() {
344 DCHECK(CalledOnValidThread());
345 const ObjectIdSet
& unregistered_ids
=
346 registration_manager_
->UpdateRegisteredIds(registered_ids_
);
347 for (ObjectIdSet::iterator it
= unregistered_ids
.begin();
348 it
!= unregistered_ids
.end(); ++it
) {
349 unacked_invalidations_map_
.erase(*it
);
351 invalidation_state_tracker_task_runner_
->PostTask(
353 base::Bind(&InvalidationStateTracker::SetSavedInvalidations
,
354 invalidation_state_tracker_
,
355 unacked_invalidations_map_
));
357 ObjectIdInvalidationMap object_id_invalidation_map
;
358 for (UnackedInvalidationsMap::iterator map_it
=
359 unacked_invalidations_map_
.begin();
360 map_it
!= unacked_invalidations_map_
.end(); ++map_it
) {
361 if (registered_ids_
.find(map_it
->first
) == registered_ids_
.end()) {
364 map_it
->second
.ExportInvalidations(AsWeakPtr(),
365 base::ThreadTaskRunnerHandle::Get(),
366 &object_id_invalidation_map
);
369 // There's no need to run these through DispatchInvalidations(); they've
370 // already been saved to storage (that's where we found them) so all we need
371 // to do now is emit them.
372 EmitSavedInvalidations(object_id_invalidation_map
);
375 void SyncInvalidationListener::RequestDetailedStatus(
376 base::Callback
<void(const base::DictionaryValue
&)> callback
) const {
377 DCHECK(CalledOnValidThread());
378 sync_network_channel_
->RequestDetailedStatus(callback
);
379 callback
.Run(*CollectDebugData());
382 scoped_ptr
<base::DictionaryValue
>
383 SyncInvalidationListener::CollectDebugData() const {
384 scoped_ptr
<base::DictionaryValue
> return_value(new base::DictionaryValue());
385 return_value
->SetString(
386 "SyncInvalidationListener.PushClientState",
387 std::string(InvalidatorStateToString(push_client_state_
)));
388 return_value
->SetString("SyncInvalidationListener.TiclState",
389 std::string(InvalidatorStateToString(ticl_state_
)));
390 scoped_ptr
<base::DictionaryValue
> unacked_map(new base::DictionaryValue());
391 for (UnackedInvalidationsMap::const_iterator it
=
392 unacked_invalidations_map_
.begin();
393 it
!= unacked_invalidations_map_
.end();
395 unacked_map
->Set((it
->first
).name(), (it
->second
).ToValue().release());
397 return_value
->Set("SyncInvalidationListener.UnackedInvalidationsMap",
398 unacked_map
.release());
399 return return_value
.Pass();
402 void SyncInvalidationListener::StopForTest() {
403 DCHECK(CalledOnValidThread());
407 void SyncInvalidationListener::Stop() {
408 DCHECK(CalledOnValidThread());
409 if (!invalidation_client_
) {
413 registration_manager_
.reset();
414 sync_system_resources_
.Stop();
415 invalidation_client_
->Stop();
417 invalidation_client_
.reset();
420 ticl_state_
= DEFAULT_INVALIDATION_ERROR
;
421 push_client_state_
= DEFAULT_INVALIDATION_ERROR
;
424 InvalidatorState
SyncInvalidationListener::GetState() const {
425 DCHECK(CalledOnValidThread());
426 if (ticl_state_
== INVALIDATION_CREDENTIALS_REJECTED
||
427 push_client_state_
== INVALIDATION_CREDENTIALS_REJECTED
) {
428 // If either the ticl or the push client rejected our credentials,
429 // return INVALIDATION_CREDENTIALS_REJECTED.
430 return INVALIDATION_CREDENTIALS_REJECTED
;
432 if (ticl_state_
== INVALIDATIONS_ENABLED
&&
433 push_client_state_
== INVALIDATIONS_ENABLED
) {
434 // If the ticl is ready and the push client notifications are
435 // enabled, return INVALIDATIONS_ENABLED.
436 return INVALIDATIONS_ENABLED
;
438 // Otherwise, we have a transient error.
439 return TRANSIENT_INVALIDATION_ERROR
;
442 void SyncInvalidationListener::EmitStateChange() {
443 DCHECK(CalledOnValidThread());
444 delegate_
->OnInvalidatorStateChange(GetState());
447 base::WeakPtr
<AckHandler
> SyncInvalidationListener::AsWeakPtr() {
448 DCHECK(CalledOnValidThread());
449 base::WeakPtr
<AckHandler
> weak_ptr
= weak_ptr_factory_
.GetWeakPtr();
450 weak_ptr
.get(); // Binds the pointer to this thread.
454 void SyncInvalidationListener::OnNetworkChannelStateChanged(
455 InvalidatorState invalidator_state
) {
456 DCHECK(CalledOnValidThread());
457 push_client_state_
= invalidator_state
;
461 } // namespace syncer