Blink roll 25b6bd3a7a131ffe68d809546ad1a20707915cdc:3a503f41ae42e5b79cfcd2ff10e65afde...
[chromium-blink-merge.git] / components / invalidation / sync_invalidation_listener.cc
blob7259f73ebd2f2bbdbd00a3e1bcdff14a6c8ef6c9
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/invalidation/sync_invalidation_listener.h"
7 #include <vector>
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/compiler_specific.h"
12 #include "base/logging.h"
13 #include "base/tracked_objects.h"
14 #include "components/invalidation/invalidation_util.h"
15 #include "components/invalidation/object_id_invalidation_map.h"
16 #include "components/invalidation/registration_manager.h"
17 #include "google/cacheinvalidation/include/invalidation-client.h"
18 #include "google/cacheinvalidation/include/types.h"
19 #include "jingle/notifier/listener/push_client.h"
21 namespace {
23 const char kApplicationName[] = "chrome-sync";
25 } // namespace
27 namespace syncer {
29 SyncInvalidationListener::Delegate::~Delegate() {}
31 SyncInvalidationListener::SyncInvalidationListener(
32 scoped_ptr<SyncNetworkChannel> network_channel)
33 : sync_network_channel_(network_channel.Pass()),
34 sync_system_resources_(sync_network_channel_.get(), this),
35 delegate_(NULL),
36 ticl_state_(DEFAULT_INVALIDATION_ERROR),
37 push_client_state_(DEFAULT_INVALIDATION_ERROR),
38 weak_ptr_factory_(this) {
39 DCHECK(CalledOnValidThread());
40 sync_network_channel_->AddObserver(this);
43 SyncInvalidationListener::~SyncInvalidationListener() {
44 DCHECK(CalledOnValidThread());
45 sync_network_channel_->RemoveObserver(this);
46 Stop();
47 DCHECK(!delegate_);
50 void SyncInvalidationListener::Start(
51 const CreateInvalidationClientCallback& create_invalidation_client_callback,
52 const std::string& client_id,
53 const std::string& client_info,
54 const std::string& invalidation_bootstrap_data,
55 const UnackedInvalidationsMap& initial_unacked_invalidations,
56 const base::WeakPtr<InvalidationStateTracker>& invalidation_state_tracker,
57 const scoped_refptr<base::SequencedTaskRunner>&
58 invalidation_state_tracker_task_runner,
59 Delegate* delegate) {
60 DCHECK(CalledOnValidThread());
61 Stop();
63 sync_system_resources_.set_platform(client_info);
64 sync_system_resources_.Start();
66 // The Storage resource is implemented as a write-through cache. We populate
67 // it with the initial state on startup, so subsequent writes go to disk and
68 // update the in-memory cache, while reads just return the cached state.
69 sync_system_resources_.storage()->SetInitialState(
70 invalidation_bootstrap_data);
72 unacked_invalidations_map_ = initial_unacked_invalidations;
73 invalidation_state_tracker_ = invalidation_state_tracker;
74 invalidation_state_tracker_task_runner_ =
75 invalidation_state_tracker_task_runner;
76 DCHECK(invalidation_state_tracker_task_runner_.get());
78 DCHECK(!delegate_);
79 DCHECK(delegate);
80 delegate_ = delegate;
82 invalidation_client_.reset(create_invalidation_client_callback.Run(
83 &sync_system_resources_,
84 sync_network_channel_->GetInvalidationClientType(),
85 client_id,
86 kApplicationName,
87 this));
88 invalidation_client_->Start();
90 registration_manager_.reset(
91 new RegistrationManager(invalidation_client_.get()));
94 void SyncInvalidationListener::UpdateCredentials(
95 const std::string& email, const std::string& token) {
96 DCHECK(CalledOnValidThread());
97 sync_network_channel_->UpdateCredentials(email, token);
100 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
101 DCHECK(CalledOnValidThread());
102 registered_ids_ = ids;
103 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
104 // working XMPP connection (as observed by us), so check it instead
105 // of GetState() (see http://crbug.com/139424).
106 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
107 DoRegistrationUpdate();
111 void SyncInvalidationListener::Ready(
112 invalidation::InvalidationClient* client) {
113 DCHECK(CalledOnValidThread());
114 DCHECK_EQ(client, invalidation_client_.get());
115 ticl_state_ = INVALIDATIONS_ENABLED;
116 EmitStateChange();
117 DoRegistrationUpdate();
120 void SyncInvalidationListener::Invalidate(
121 invalidation::InvalidationClient* client,
122 const invalidation::Invalidation& invalidation,
123 const invalidation::AckHandle& ack_handle) {
124 DCHECK(CalledOnValidThread());
125 DCHECK_EQ(client, invalidation_client_.get());
126 client->Acknowledge(ack_handle);
128 const invalidation::ObjectId& id = invalidation.object_id();
130 std::string payload;
131 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
132 if (invalidation.has_payload())
133 payload = invalidation.payload();
135 DVLOG(2) << "Received invalidation with version " << invalidation.version()
136 << " for " << ObjectIdToString(id);
138 ObjectIdInvalidationMap invalidations;
139 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
140 inv.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current());
141 invalidations.Insert(inv);
143 DispatchInvalidations(invalidations);
146 void SyncInvalidationListener::InvalidateUnknownVersion(
147 invalidation::InvalidationClient* client,
148 const invalidation::ObjectId& object_id,
149 const invalidation::AckHandle& ack_handle) {
150 DCHECK(CalledOnValidThread());
151 DCHECK_EQ(client, invalidation_client_.get());
152 DVLOG(1) << "InvalidateUnknownVersion";
153 client->Acknowledge(ack_handle);
155 ObjectIdInvalidationMap invalidations;
156 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
157 unknown_version.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current());
158 invalidations.Insert(unknown_version);
160 DispatchInvalidations(invalidations);
163 // This should behave as if we got an invalidation with version
164 // UNKNOWN_OBJECT_VERSION for all known data types.
165 void SyncInvalidationListener::InvalidateAll(
166 invalidation::InvalidationClient* client,
167 const invalidation::AckHandle& ack_handle) {
168 DCHECK(CalledOnValidThread());
169 DCHECK_EQ(client, invalidation_client_.get());
170 DVLOG(1) << "InvalidateAll";
171 client->Acknowledge(ack_handle);
173 ObjectIdInvalidationMap invalidations;
174 for (ObjectIdSet::iterator it = registered_ids_.begin();
175 it != registered_ids_.end(); ++it) {
176 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
177 unknown_version.SetAckHandler(AsWeakPtr(),
178 base::MessageLoopProxy::current());
179 invalidations.Insert(unknown_version);
182 DispatchInvalidations(invalidations);
185 // If a handler is registered, emit right away. Otherwise, save it for later.
186 void SyncInvalidationListener::DispatchInvalidations(
187 const ObjectIdInvalidationMap& invalidations) {
188 DCHECK(CalledOnValidThread());
190 ObjectIdInvalidationMap to_save = invalidations;
191 ObjectIdInvalidationMap to_emit =
192 invalidations.GetSubsetWithObjectIds(registered_ids_);
194 SaveInvalidations(to_save);
195 EmitSavedInvalidations(to_emit);
198 void SyncInvalidationListener::SaveInvalidations(
199 const ObjectIdInvalidationMap& to_save) {
200 ObjectIdSet objects_to_save = to_save.GetObjectIds();
201 for (ObjectIdSet::const_iterator it = objects_to_save.begin();
202 it != objects_to_save.end(); ++it) {
203 UnackedInvalidationsMap::iterator lookup =
204 unacked_invalidations_map_.find(*it);
205 if (lookup == unacked_invalidations_map_.end()) {
206 lookup = unacked_invalidations_map_.insert(
207 std::make_pair(*it, UnackedInvalidationSet(*it))).first;
209 lookup->second.AddSet(to_save.ForObject(*it));
212 invalidation_state_tracker_task_runner_->PostTask(
213 FROM_HERE,
214 base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
215 invalidation_state_tracker_,
216 unacked_invalidations_map_));
219 void SyncInvalidationListener::EmitSavedInvalidations(
220 const ObjectIdInvalidationMap& to_emit) {
221 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
222 delegate_->OnInvalidate(to_emit);
225 void SyncInvalidationListener::InformRegistrationStatus(
226 invalidation::InvalidationClient* client,
227 const invalidation::ObjectId& object_id,
228 InvalidationListener::RegistrationState new_state) {
229 DCHECK(CalledOnValidThread());
230 DCHECK_EQ(client, invalidation_client_.get());
231 DVLOG(1) << "InformRegistrationStatus: "
232 << ObjectIdToString(object_id) << " " << new_state;
234 if (new_state != InvalidationListener::REGISTERED) {
235 // Let |registration_manager_| handle the registration backoff policy.
236 registration_manager_->MarkRegistrationLost(object_id);
240 void SyncInvalidationListener::InformRegistrationFailure(
241 invalidation::InvalidationClient* client,
242 const invalidation::ObjectId& object_id,
243 bool is_transient,
244 const std::string& error_message) {
245 DCHECK(CalledOnValidThread());
246 DCHECK_EQ(client, invalidation_client_.get());
247 DVLOG(1) << "InformRegistrationFailure: "
248 << ObjectIdToString(object_id)
249 << "is_transient=" << is_transient
250 << ", message=" << error_message;
252 if (is_transient) {
253 // We don't care about |unknown_hint|; we let
254 // |registration_manager_| handle the registration backoff policy.
255 registration_manager_->MarkRegistrationLost(object_id);
256 } else {
257 // Non-transient failures require an action to resolve. This could happen
258 // because:
259 // - the server doesn't yet recognize the data type, which could happen for
260 // brand-new data types.
261 // - the user has changed his password and hasn't updated it yet locally.
262 // Either way, block future registration attempts for |object_id|. However,
263 // we don't forget any saved invalidation state since we may use it once the
264 // error is addressed.
265 registration_manager_->DisableId(object_id);
269 void SyncInvalidationListener::ReissueRegistrations(
270 invalidation::InvalidationClient* client,
271 const std::string& prefix,
272 int prefix_length) {
273 DCHECK(CalledOnValidThread());
274 DCHECK_EQ(client, invalidation_client_.get());
275 DVLOG(1) << "AllRegistrationsLost";
276 registration_manager_->MarkAllRegistrationsLost();
279 void SyncInvalidationListener::InformError(
280 invalidation::InvalidationClient* client,
281 const invalidation::ErrorInfo& error_info) {
282 DCHECK(CalledOnValidThread());
283 DCHECK_EQ(client, invalidation_client_.get());
284 LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
285 << error_info.error_message()
286 << " (transient = " << error_info.is_transient() << ")";
287 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
288 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
289 } else {
290 ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
292 EmitStateChange();
295 void SyncInvalidationListener::Acknowledge(
296 const invalidation::ObjectId& id,
297 const syncer::AckHandle& handle) {
298 UnackedInvalidationsMap::iterator lookup =
299 unacked_invalidations_map_.find(id);
300 if (lookup == unacked_invalidations_map_.end()) {
301 DLOG(WARNING) << "Received acknowledgement for untracked object ID";
302 return;
304 lookup->second.Acknowledge(handle);
305 invalidation_state_tracker_task_runner_->PostTask(
306 FROM_HERE,
307 base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
308 invalidation_state_tracker_,
309 unacked_invalidations_map_));
312 void SyncInvalidationListener::Drop(
313 const invalidation::ObjectId& id,
314 const syncer::AckHandle& handle) {
315 UnackedInvalidationsMap::iterator lookup =
316 unacked_invalidations_map_.find(id);
317 if (lookup == unacked_invalidations_map_.end()) {
318 DLOG(WARNING) << "Received drop for untracked object ID";
319 return;
321 lookup->second.Drop(handle);
322 invalidation_state_tracker_task_runner_->PostTask(
323 FROM_HERE,
324 base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
325 invalidation_state_tracker_,
326 unacked_invalidations_map_));
329 void SyncInvalidationListener::WriteState(const std::string& state) {
330 DCHECK(CalledOnValidThread());
331 DVLOG(1) << "WriteState";
332 invalidation_state_tracker_task_runner_->PostTask(
333 FROM_HERE,
334 base::Bind(&InvalidationStateTracker::SetBootstrapData,
335 invalidation_state_tracker_,
336 state));
339 void SyncInvalidationListener::DoRegistrationUpdate() {
340 DCHECK(CalledOnValidThread());
341 const ObjectIdSet& unregistered_ids =
342 registration_manager_->UpdateRegisteredIds(registered_ids_);
343 for (ObjectIdSet::iterator it = unregistered_ids.begin();
344 it != unregistered_ids.end(); ++it) {
345 unacked_invalidations_map_.erase(*it);
347 invalidation_state_tracker_task_runner_->PostTask(
348 FROM_HERE,
349 base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
350 invalidation_state_tracker_,
351 unacked_invalidations_map_));
353 ObjectIdInvalidationMap object_id_invalidation_map;
354 for (UnackedInvalidationsMap::iterator map_it =
355 unacked_invalidations_map_.begin();
356 map_it != unacked_invalidations_map_.end(); ++map_it) {
357 if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
358 continue;
360 map_it->second.ExportInvalidations(AsWeakPtr(),
361 base::MessageLoopProxy::current(),
362 &object_id_invalidation_map);
365 // There's no need to run these through DispatchInvalidations(); they've
366 // already been saved to storage (that's where we found them) so all we need
367 // to do now is emit them.
368 EmitSavedInvalidations(object_id_invalidation_map);
371 void SyncInvalidationListener::RequestDetailedStatus(
372 base::Callback<void(const base::DictionaryValue&)> callback) const {
373 DCHECK(CalledOnValidThread());
374 sync_network_channel_->RequestDetailedStatus(callback);
375 callback.Run(*CollectDebugData());
378 scoped_ptr<base::DictionaryValue>
379 SyncInvalidationListener::CollectDebugData() const {
380 scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue());
381 return_value->SetString(
382 "SyncInvalidationListener.PushClientState",
383 std::string(InvalidatorStateToString(push_client_state_)));
384 return_value->SetString("SyncInvalidationListener.TiclState",
385 std::string(InvalidatorStateToString(ticl_state_)));
386 scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue());
387 for (UnackedInvalidationsMap::const_iterator it =
388 unacked_invalidations_map_.begin();
389 it != unacked_invalidations_map_.end();
390 ++it) {
391 unacked_map->Set((it->first).name(), (it->second).ToValue().release());
393 return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap",
394 unacked_map.release());
395 return return_value.Pass();
398 void SyncInvalidationListener::StopForTest() {
399 DCHECK(CalledOnValidThread());
400 Stop();
403 void SyncInvalidationListener::Stop() {
404 DCHECK(CalledOnValidThread());
405 if (!invalidation_client_) {
406 return;
409 registration_manager_.reset();
410 sync_system_resources_.Stop();
411 invalidation_client_->Stop();
413 invalidation_client_.reset();
414 delegate_ = NULL;
416 ticl_state_ = DEFAULT_INVALIDATION_ERROR;
417 push_client_state_ = DEFAULT_INVALIDATION_ERROR;
420 InvalidatorState SyncInvalidationListener::GetState() const {
421 DCHECK(CalledOnValidThread());
422 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
423 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
424 // If either the ticl or the push client rejected our credentials,
425 // return INVALIDATION_CREDENTIALS_REJECTED.
426 return INVALIDATION_CREDENTIALS_REJECTED;
428 if (ticl_state_ == INVALIDATIONS_ENABLED &&
429 push_client_state_ == INVALIDATIONS_ENABLED) {
430 // If the ticl is ready and the push client notifications are
431 // enabled, return INVALIDATIONS_ENABLED.
432 return INVALIDATIONS_ENABLED;
434 // Otherwise, we have a transient error.
435 return TRANSIENT_INVALIDATION_ERROR;
438 void SyncInvalidationListener::EmitStateChange() {
439 DCHECK(CalledOnValidThread());
440 delegate_->OnInvalidatorStateChange(GetState());
443 base::WeakPtr<AckHandler> SyncInvalidationListener::AsWeakPtr() {
444 DCHECK(CalledOnValidThread());
445 base::WeakPtr<AckHandler> weak_ptr = weak_ptr_factory_.GetWeakPtr();
446 weak_ptr.get(); // Binds the pointer to this thread.
447 return weak_ptr;
450 void SyncInvalidationListener::OnNetworkChannelStateChanged(
451 InvalidatorState invalidator_state) {
452 DCHECK(CalledOnValidThread());
453 push_client_state_ = invalidator_state;
454 EmitStateChange();
457 } // namespace syncer