[Media Router] Add integration tests and e2e tests for media router and presentation...
[chromium-blink-merge.git] / components / invalidation / sync_invalidation_listener.cc
blobd7dd8f4089adac44e0ee41c765bdf814380cf52b
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/invalidation/sync_invalidation_listener.h"
7 #include <vector>
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/compiler_specific.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/thread_task_runner_handle.h"
16 #include "base/tracked_objects.h"
17 #include "components/invalidation/invalidation_util.h"
18 #include "components/invalidation/object_id_invalidation_map.h"
19 #include "components/invalidation/registration_manager.h"
20 #include "google/cacheinvalidation/include/invalidation-client.h"
21 #include "google/cacheinvalidation/include/types.h"
22 #include "jingle/notifier/listener/push_client.h"
24 namespace {
26 const char kApplicationName[] = "chrome-sync";
28 } // namespace
30 namespace syncer {
32 SyncInvalidationListener::Delegate::~Delegate() {}
34 SyncInvalidationListener::SyncInvalidationListener(
35 scoped_ptr<SyncNetworkChannel> network_channel)
36 : sync_network_channel_(network_channel.Pass()),
37 sync_system_resources_(sync_network_channel_.get(), this),
38 delegate_(NULL),
39 ticl_state_(DEFAULT_INVALIDATION_ERROR),
40 push_client_state_(DEFAULT_INVALIDATION_ERROR),
41 weak_ptr_factory_(this) {
42 DCHECK(CalledOnValidThread());
43 sync_network_channel_->AddObserver(this);
46 SyncInvalidationListener::~SyncInvalidationListener() {
47 DCHECK(CalledOnValidThread());
48 sync_network_channel_->RemoveObserver(this);
49 Stop();
50 DCHECK(!delegate_);
53 void SyncInvalidationListener::Start(
54 const CreateInvalidationClientCallback& create_invalidation_client_callback,
55 const std::string& client_id,
56 const std::string& client_info,
57 const std::string& invalidation_bootstrap_data,
58 const UnackedInvalidationsMap& initial_unacked_invalidations,
59 const base::WeakPtr<InvalidationStateTracker>& invalidation_state_tracker,
60 const scoped_refptr<base::SequencedTaskRunner>&
61 invalidation_state_tracker_task_runner,
62 Delegate* delegate) {
63 DCHECK(CalledOnValidThread());
64 Stop();
66 sync_system_resources_.set_platform(client_info);
67 sync_system_resources_.Start();
69 // The Storage resource is implemented as a write-through cache. We populate
70 // it with the initial state on startup, so subsequent writes go to disk and
71 // update the in-memory cache, while reads just return the cached state.
72 sync_system_resources_.storage()->SetInitialState(
73 invalidation_bootstrap_data);
75 unacked_invalidations_map_ = initial_unacked_invalidations;
76 invalidation_state_tracker_ = invalidation_state_tracker;
77 invalidation_state_tracker_task_runner_ =
78 invalidation_state_tracker_task_runner;
79 DCHECK(invalidation_state_tracker_task_runner_.get());
81 DCHECK(!delegate_);
82 DCHECK(delegate);
83 delegate_ = delegate;
85 invalidation_client_.reset(create_invalidation_client_callback.Run(
86 &sync_system_resources_,
87 sync_network_channel_->GetInvalidationClientType(),
88 client_id,
89 kApplicationName,
90 this));
91 invalidation_client_->Start();
93 registration_manager_.reset(
94 new RegistrationManager(invalidation_client_.get()));
97 void SyncInvalidationListener::UpdateCredentials(
98 const std::string& email, const std::string& token) {
99 DCHECK(CalledOnValidThread());
100 sync_network_channel_->UpdateCredentials(email, token);
103 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
104 DCHECK(CalledOnValidThread());
105 registered_ids_ = ids;
106 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
107 // working XMPP connection (as observed by us), so check it instead
108 // of GetState() (see http://crbug.com/139424).
109 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
110 DoRegistrationUpdate();
114 void SyncInvalidationListener::Ready(
115 invalidation::InvalidationClient* client) {
116 DCHECK(CalledOnValidThread());
117 DCHECK_EQ(client, invalidation_client_.get());
118 ticl_state_ = INVALIDATIONS_ENABLED;
119 EmitStateChange();
120 DoRegistrationUpdate();
123 void SyncInvalidationListener::Invalidate(
124 invalidation::InvalidationClient* client,
125 const invalidation::Invalidation& invalidation,
126 const invalidation::AckHandle& ack_handle) {
127 DCHECK(CalledOnValidThread());
128 DCHECK_EQ(client, invalidation_client_.get());
129 client->Acknowledge(ack_handle);
131 const invalidation::ObjectId& id = invalidation.object_id();
133 std::string payload;
134 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
135 if (invalidation.has_payload())
136 payload = invalidation.payload();
138 DVLOG(2) << "Received invalidation with version " << invalidation.version()
139 << " for " << ObjectIdToString(id);
141 ObjectIdInvalidationMap invalidations;
142 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
143 inv.SetAckHandler(AsWeakPtr(), base::ThreadTaskRunnerHandle::Get());
144 invalidations.Insert(inv);
146 DispatchInvalidations(invalidations);
149 void SyncInvalidationListener::InvalidateUnknownVersion(
150 invalidation::InvalidationClient* client,
151 const invalidation::ObjectId& object_id,
152 const invalidation::AckHandle& ack_handle) {
153 DCHECK(CalledOnValidThread());
154 DCHECK_EQ(client, invalidation_client_.get());
155 DVLOG(1) << "InvalidateUnknownVersion";
156 client->Acknowledge(ack_handle);
158 ObjectIdInvalidationMap invalidations;
159 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
160 unknown_version.SetAckHandler(AsWeakPtr(),
161 base::ThreadTaskRunnerHandle::Get());
162 invalidations.Insert(unknown_version);
164 DispatchInvalidations(invalidations);
167 // This should behave as if we got an invalidation with version
168 // UNKNOWN_OBJECT_VERSION for all known data types.
169 void SyncInvalidationListener::InvalidateAll(
170 invalidation::InvalidationClient* client,
171 const invalidation::AckHandle& ack_handle) {
172 DCHECK(CalledOnValidThread());
173 DCHECK_EQ(client, invalidation_client_.get());
174 DVLOG(1) << "InvalidateAll";
175 client->Acknowledge(ack_handle);
177 ObjectIdInvalidationMap invalidations;
178 for (ObjectIdSet::iterator it = registered_ids_.begin();
179 it != registered_ids_.end(); ++it) {
180 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
181 unknown_version.SetAckHandler(AsWeakPtr(),
182 base::ThreadTaskRunnerHandle::Get());
183 invalidations.Insert(unknown_version);
186 DispatchInvalidations(invalidations);
189 // If a handler is registered, emit right away. Otherwise, save it for later.
190 void SyncInvalidationListener::DispatchInvalidations(
191 const ObjectIdInvalidationMap& invalidations) {
192 DCHECK(CalledOnValidThread());
194 ObjectIdInvalidationMap to_save = invalidations;
195 ObjectIdInvalidationMap to_emit =
196 invalidations.GetSubsetWithObjectIds(registered_ids_);
198 SaveInvalidations(to_save);
199 EmitSavedInvalidations(to_emit);
202 void SyncInvalidationListener::SaveInvalidations(
203 const ObjectIdInvalidationMap& to_save) {
204 ObjectIdSet objects_to_save = to_save.GetObjectIds();
205 for (ObjectIdSet::const_iterator it = objects_to_save.begin();
206 it != objects_to_save.end(); ++it) {
207 UnackedInvalidationsMap::iterator lookup =
208 unacked_invalidations_map_.find(*it);
209 if (lookup == unacked_invalidations_map_.end()) {
210 lookup = unacked_invalidations_map_.insert(
211 std::make_pair(*it, UnackedInvalidationSet(*it))).first;
213 lookup->second.AddSet(to_save.ForObject(*it));
216 invalidation_state_tracker_task_runner_->PostTask(
217 FROM_HERE,
218 base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
219 invalidation_state_tracker_,
220 unacked_invalidations_map_));
223 void SyncInvalidationListener::EmitSavedInvalidations(
224 const ObjectIdInvalidationMap& to_emit) {
225 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
226 delegate_->OnInvalidate(to_emit);
229 void SyncInvalidationListener::InformRegistrationStatus(
230 invalidation::InvalidationClient* client,
231 const invalidation::ObjectId& object_id,
232 InvalidationListener::RegistrationState new_state) {
233 DCHECK(CalledOnValidThread());
234 DCHECK_EQ(client, invalidation_client_.get());
235 DVLOG(1) << "InformRegistrationStatus: "
236 << ObjectIdToString(object_id) << " " << new_state;
238 if (new_state != InvalidationListener::REGISTERED) {
239 // Let |registration_manager_| handle the registration backoff policy.
240 registration_manager_->MarkRegistrationLost(object_id);
244 void SyncInvalidationListener::InformRegistrationFailure(
245 invalidation::InvalidationClient* client,
246 const invalidation::ObjectId& object_id,
247 bool is_transient,
248 const std::string& error_message) {
249 DCHECK(CalledOnValidThread());
250 DCHECK_EQ(client, invalidation_client_.get());
251 DVLOG(1) << "InformRegistrationFailure: "
252 << ObjectIdToString(object_id)
253 << "is_transient=" << is_transient
254 << ", message=" << error_message;
256 if (is_transient) {
257 // We don't care about |unknown_hint|; we let
258 // |registration_manager_| handle the registration backoff policy.
259 registration_manager_->MarkRegistrationLost(object_id);
260 } else {
261 // Non-transient failures require an action to resolve. This could happen
262 // because:
263 // - the server doesn't yet recognize the data type, which could happen for
264 // brand-new data types.
265 // - the user has changed his password and hasn't updated it yet locally.
266 // Either way, block future registration attempts for |object_id|. However,
267 // we don't forget any saved invalidation state since we may use it once the
268 // error is addressed.
269 registration_manager_->DisableId(object_id);
273 void SyncInvalidationListener::ReissueRegistrations(
274 invalidation::InvalidationClient* client,
275 const std::string& prefix,
276 int prefix_length) {
277 DCHECK(CalledOnValidThread());
278 DCHECK_EQ(client, invalidation_client_.get());
279 DVLOG(1) << "AllRegistrationsLost";
280 registration_manager_->MarkAllRegistrationsLost();
283 void SyncInvalidationListener::InformError(
284 invalidation::InvalidationClient* client,
285 const invalidation::ErrorInfo& error_info) {
286 DCHECK(CalledOnValidThread());
287 DCHECK_EQ(client, invalidation_client_.get());
288 LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
289 << error_info.error_message()
290 << " (transient = " << error_info.is_transient() << ")";
291 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
292 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
293 } else {
294 ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
296 EmitStateChange();
299 void SyncInvalidationListener::Acknowledge(
300 const invalidation::ObjectId& id,
301 const syncer::AckHandle& handle) {
302 UnackedInvalidationsMap::iterator lookup =
303 unacked_invalidations_map_.find(id);
304 if (lookup == unacked_invalidations_map_.end()) {
305 DLOG(WARNING) << "Received acknowledgement for untracked object ID";
306 return;
308 lookup->second.Acknowledge(handle);
309 invalidation_state_tracker_task_runner_->PostTask(
310 FROM_HERE,
311 base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
312 invalidation_state_tracker_,
313 unacked_invalidations_map_));
316 void SyncInvalidationListener::Drop(
317 const invalidation::ObjectId& id,
318 const syncer::AckHandle& handle) {
319 UnackedInvalidationsMap::iterator lookup =
320 unacked_invalidations_map_.find(id);
321 if (lookup == unacked_invalidations_map_.end()) {
322 DLOG(WARNING) << "Received drop for untracked object ID";
323 return;
325 lookup->second.Drop(handle);
326 invalidation_state_tracker_task_runner_->PostTask(
327 FROM_HERE,
328 base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
329 invalidation_state_tracker_,
330 unacked_invalidations_map_));
333 void SyncInvalidationListener::WriteState(const std::string& state) {
334 DCHECK(CalledOnValidThread());
335 DVLOG(1) << "WriteState";
336 invalidation_state_tracker_task_runner_->PostTask(
337 FROM_HERE,
338 base::Bind(&InvalidationStateTracker::SetBootstrapData,
339 invalidation_state_tracker_,
340 state));
343 void SyncInvalidationListener::DoRegistrationUpdate() {
344 DCHECK(CalledOnValidThread());
345 const ObjectIdSet& unregistered_ids =
346 registration_manager_->UpdateRegisteredIds(registered_ids_);
347 for (ObjectIdSet::iterator it = unregistered_ids.begin();
348 it != unregistered_ids.end(); ++it) {
349 unacked_invalidations_map_.erase(*it);
351 invalidation_state_tracker_task_runner_->PostTask(
352 FROM_HERE,
353 base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
354 invalidation_state_tracker_,
355 unacked_invalidations_map_));
357 ObjectIdInvalidationMap object_id_invalidation_map;
358 for (UnackedInvalidationsMap::iterator map_it =
359 unacked_invalidations_map_.begin();
360 map_it != unacked_invalidations_map_.end(); ++map_it) {
361 if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
362 continue;
364 map_it->second.ExportInvalidations(AsWeakPtr(),
365 base::ThreadTaskRunnerHandle::Get(),
366 &object_id_invalidation_map);
369 // There's no need to run these through DispatchInvalidations(); they've
370 // already been saved to storage (that's where we found them) so all we need
371 // to do now is emit them.
372 EmitSavedInvalidations(object_id_invalidation_map);
375 void SyncInvalidationListener::RequestDetailedStatus(
376 base::Callback<void(const base::DictionaryValue&)> callback) const {
377 DCHECK(CalledOnValidThread());
378 sync_network_channel_->RequestDetailedStatus(callback);
379 callback.Run(*CollectDebugData());
382 scoped_ptr<base::DictionaryValue>
383 SyncInvalidationListener::CollectDebugData() const {
384 scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue());
385 return_value->SetString(
386 "SyncInvalidationListener.PushClientState",
387 std::string(InvalidatorStateToString(push_client_state_)));
388 return_value->SetString("SyncInvalidationListener.TiclState",
389 std::string(InvalidatorStateToString(ticl_state_)));
390 scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue());
391 for (UnackedInvalidationsMap::const_iterator it =
392 unacked_invalidations_map_.begin();
393 it != unacked_invalidations_map_.end();
394 ++it) {
395 unacked_map->Set((it->first).name(), (it->second).ToValue().release());
397 return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap",
398 unacked_map.release());
399 return return_value.Pass();
402 void SyncInvalidationListener::StopForTest() {
403 DCHECK(CalledOnValidThread());
404 Stop();
407 void SyncInvalidationListener::Stop() {
408 DCHECK(CalledOnValidThread());
409 if (!invalidation_client_) {
410 return;
413 registration_manager_.reset();
414 sync_system_resources_.Stop();
415 invalidation_client_->Stop();
417 invalidation_client_.reset();
418 delegate_ = NULL;
420 ticl_state_ = DEFAULT_INVALIDATION_ERROR;
421 push_client_state_ = DEFAULT_INVALIDATION_ERROR;
424 InvalidatorState SyncInvalidationListener::GetState() const {
425 DCHECK(CalledOnValidThread());
426 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
427 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
428 // If either the ticl or the push client rejected our credentials,
429 // return INVALIDATION_CREDENTIALS_REJECTED.
430 return INVALIDATION_CREDENTIALS_REJECTED;
432 if (ticl_state_ == INVALIDATIONS_ENABLED &&
433 push_client_state_ == INVALIDATIONS_ENABLED) {
434 // If the ticl is ready and the push client notifications are
435 // enabled, return INVALIDATIONS_ENABLED.
436 return INVALIDATIONS_ENABLED;
438 // Otherwise, we have a transient error.
439 return TRANSIENT_INVALIDATION_ERROR;
442 void SyncInvalidationListener::EmitStateChange() {
443 DCHECK(CalledOnValidThread());
444 delegate_->OnInvalidatorStateChange(GetState());
447 base::WeakPtr<AckHandler> SyncInvalidationListener::AsWeakPtr() {
448 DCHECK(CalledOnValidThread());
449 base::WeakPtr<AckHandler> weak_ptr = weak_ptr_factory_.GetWeakPtr();
450 weak_ptr.get(); // Binds the pointer to this thread.
451 return weak_ptr;
454 void SyncInvalidationListener::OnNetworkChannelStateChanged(
455 InvalidatorState invalidator_state) {
456 DCHECK(CalledOnValidThread());
457 push_client_state_ = invalidator_state;
458 EmitStateChange();
461 } // namespace syncer