[Media Router] Add integration tests and e2e tests for media router and presentation...
[chromium-blink-merge.git] / components / invalidation / non_blocking_invalidator.cc
blob2ae6926c174f766600d8340de4a6552e73a9abfa
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/invalidation/non_blocking_invalidator.h"
7 #include <cstddef>
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/profiler/scoped_tracker.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "base/threading/thread.h"
16 #include "components/invalidation/gcm_network_channel_delegate.h"
17 #include "components/invalidation/invalidation_handler.h"
18 #include "components/invalidation/invalidation_notifier.h"
19 #include "components/invalidation/object_id_invalidation_map.h"
20 #include "components/invalidation/sync_system_resources.h"
21 #include "jingle/notifier/listener/push_client.h"
23 namespace syncer {
25 struct NonBlockingInvalidator::InitializeOptions {
26 InitializeOptions(
27 NetworkChannelCreator network_channel_creator,
28 const std::string& invalidator_client_id,
29 const UnackedInvalidationsMap& saved_invalidations,
30 const std::string& invalidation_bootstrap_data,
31 const base::WeakPtr<InvalidationStateTracker>& invalidation_state_tracker,
32 const scoped_refptr<base::SingleThreadTaskRunner>&
33 invalidation_state_tracker_task_runner,
34 const std::string& client_info,
35 scoped_refptr<net::URLRequestContextGetter> request_context_getter)
36 : network_channel_creator(network_channel_creator),
37 invalidator_client_id(invalidator_client_id),
38 saved_invalidations(saved_invalidations),
39 invalidation_bootstrap_data(invalidation_bootstrap_data),
40 invalidation_state_tracker(invalidation_state_tracker),
41 invalidation_state_tracker_task_runner(
42 invalidation_state_tracker_task_runner),
43 client_info(client_info),
44 request_context_getter(request_context_getter) {}
46 NetworkChannelCreator network_channel_creator;
47 std::string invalidator_client_id;
48 UnackedInvalidationsMap saved_invalidations;
49 std::string invalidation_bootstrap_data;
50 base::WeakPtr<InvalidationStateTracker> invalidation_state_tracker;
51 scoped_refptr<base::SingleThreadTaskRunner>
52 invalidation_state_tracker_task_runner;
53 std::string client_info;
54 scoped_refptr<net::URLRequestContextGetter> request_context_getter;
57 namespace {
58 // This class provides a wrapper for a logging class in order to receive
59 // callbacks across threads, without having to worry about owner threads.
60 class CallbackProxy {
61 public:
62 explicit CallbackProxy(
63 base::Callback<void(const base::DictionaryValue&)> callback);
64 ~CallbackProxy();
66 void Run(const base::DictionaryValue& value);
68 private:
69 static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback,
70 scoped_ptr<base::DictionaryValue> value);
72 base::Callback<void(const base::DictionaryValue&)> callback_;
73 scoped_refptr<base::SingleThreadTaskRunner> running_thread_;
75 DISALLOW_COPY_AND_ASSIGN(CallbackProxy);
78 CallbackProxy::CallbackProxy(
79 base::Callback<void(const base::DictionaryValue&)> callback)
80 : callback_(callback),
81 running_thread_(base::ThreadTaskRunnerHandle::Get()) {}
83 CallbackProxy::~CallbackProxy() {}
85 void CallbackProxy::DoRun(
86 base::Callback<void(const base::DictionaryValue&)> callback,
87 scoped_ptr<base::DictionaryValue> value) {
88 callback.Run(*value);
91 void CallbackProxy::Run(const base::DictionaryValue& value) {
92 scoped_ptr<base::DictionaryValue> copied(value.DeepCopy());
93 running_thread_->PostTask(
94 FROM_HERE,
95 base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied)));
99 class NonBlockingInvalidator::Core
100 : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>,
101 // InvalidationHandler to observe the InvalidationNotifier we create.
102 public InvalidationHandler {
103 public:
104 // Called on parent thread. |delegate_observer| should be initialized.
105 Core(const base::WeakPtr<NonBlockingInvalidator>& delegate_observer,
106 const scoped_refptr<base::SingleThreadTaskRunner>&
107 delegate_observer_task_runner);
109 // Helpers called on I/O thread.
110 void Initialize(
111 const NonBlockingInvalidator::InitializeOptions& initialize_options);
112 void Teardown();
113 void UpdateRegisteredIds(const ObjectIdSet& ids);
114 void UpdateCredentials(const std::string& email, const std::string& token);
115 void RequestDetailedStatus(
116 base::Callback<void(const base::DictionaryValue&)> callback) const;
118 // InvalidationHandler implementation (all called on I/O thread by
119 // InvalidationNotifier).
120 void OnInvalidatorStateChange(InvalidatorState reason) override;
121 void OnIncomingInvalidation(
122 const ObjectIdInvalidationMap& invalidation_map) override;
123 std::string GetOwnerName() const override;
125 private:
126 friend class
127 base::RefCountedThreadSafe<NonBlockingInvalidator::Core>;
128 // Called on parent or I/O thread.
129 ~Core() override;
131 // The variables below should be used only on the I/O thread.
132 const base::WeakPtr<NonBlockingInvalidator> delegate_observer_;
133 scoped_refptr<base::SingleThreadTaskRunner> delegate_observer_task_runner_;
134 scoped_ptr<InvalidationNotifier> invalidation_notifier_;
135 scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_;
137 DISALLOW_COPY_AND_ASSIGN(Core);
140 NonBlockingInvalidator::Core::Core(
141 const base::WeakPtr<NonBlockingInvalidator>& delegate_observer,
142 const scoped_refptr<base::SingleThreadTaskRunner>&
143 delegate_observer_task_runner)
144 : delegate_observer_(delegate_observer),
145 delegate_observer_task_runner_(delegate_observer_task_runner) {
146 DCHECK(delegate_observer_);
147 DCHECK(delegate_observer_task_runner_.get());
150 NonBlockingInvalidator::Core::~Core() {
153 void NonBlockingInvalidator::Core::Initialize(
154 const NonBlockingInvalidator::InitializeOptions& initialize_options) {
155 DCHECK(initialize_options.request_context_getter.get());
156 network_task_runner_ =
157 initialize_options.request_context_getter->GetNetworkTaskRunner();
158 DCHECK(network_task_runner_->BelongsToCurrentThread());
159 scoped_ptr<SyncNetworkChannel> network_channel =
160 initialize_options.network_channel_creator.Run();
161 invalidation_notifier_.reset(new InvalidationNotifier(
162 network_channel.Pass(),
163 initialize_options.invalidator_client_id,
164 initialize_options.saved_invalidations,
165 initialize_options.invalidation_bootstrap_data,
166 initialize_options.invalidation_state_tracker,
167 initialize_options.invalidation_state_tracker_task_runner,
168 initialize_options.client_info));
169 invalidation_notifier_->RegisterHandler(this);
172 void NonBlockingInvalidator::Core::Teardown() {
173 DCHECK(network_task_runner_->BelongsToCurrentThread());
174 invalidation_notifier_->UnregisterHandler(this);
175 invalidation_notifier_.reset();
176 network_task_runner_ = NULL;
179 void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) {
180 DCHECK(network_task_runner_->BelongsToCurrentThread());
181 invalidation_notifier_->UpdateRegisteredIds(this, ids);
184 void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email,
185 const std::string& token) {
186 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
187 tracked_objects::ScopedTracker tracking_profile(
188 FROM_HERE_WITH_EXPLICIT_FUNCTION(
189 "477117 NonBlockingInvalidator::Core::UpdateCredentials"));
190 DCHECK(network_task_runner_->BelongsToCurrentThread());
191 invalidation_notifier_->UpdateCredentials(email, token);
194 void NonBlockingInvalidator::Core::RequestDetailedStatus(
195 base::Callback<void(const base::DictionaryValue&)> callback) const {
196 DCHECK(network_task_runner_->BelongsToCurrentThread());
197 invalidation_notifier_->RequestDetailedStatus(callback);
200 void NonBlockingInvalidator::Core::OnInvalidatorStateChange(
201 InvalidatorState reason) {
202 DCHECK(network_task_runner_->BelongsToCurrentThread());
203 delegate_observer_task_runner_->PostTask(
204 FROM_HERE,
205 base::Bind(&NonBlockingInvalidator::OnInvalidatorStateChange,
206 delegate_observer_,
207 reason));
210 void NonBlockingInvalidator::Core::OnIncomingInvalidation(
211 const ObjectIdInvalidationMap& invalidation_map) {
212 DCHECK(network_task_runner_->BelongsToCurrentThread());
213 delegate_observer_task_runner_->PostTask(
214 FROM_HERE,
215 base::Bind(&NonBlockingInvalidator::OnIncomingInvalidation,
216 delegate_observer_,
217 invalidation_map));
220 std::string NonBlockingInvalidator::Core::GetOwnerName() const {
221 return "Sync";
224 NonBlockingInvalidator::NonBlockingInvalidator(
225 NetworkChannelCreator network_channel_creator,
226 const std::string& invalidator_client_id,
227 const UnackedInvalidationsMap& saved_invalidations,
228 const std::string& invalidation_bootstrap_data,
229 InvalidationStateTracker* invalidation_state_tracker,
230 const std::string& client_info,
231 const scoped_refptr<net::URLRequestContextGetter>& request_context_getter)
232 : invalidation_state_tracker_(invalidation_state_tracker),
233 parent_task_runner_(base::ThreadTaskRunnerHandle::Get()),
234 network_task_runner_(request_context_getter->GetNetworkTaskRunner()),
235 weak_ptr_factory_(this) {
236 base::WeakPtr<NonBlockingInvalidator> weak_ptr_this =
237 weak_ptr_factory_.GetWeakPtr();
238 weak_ptr_this.get(); // Bind to this thread.
240 core_ = new Core(weak_ptr_this, base::ThreadTaskRunnerHandle::Get());
242 InitializeOptions initialize_options(
243 network_channel_creator, invalidator_client_id, saved_invalidations,
244 invalidation_bootstrap_data, weak_ptr_this,
245 base::ThreadTaskRunnerHandle::Get(), client_info, request_context_getter);
247 if (!network_task_runner_->PostTask(
248 FROM_HERE,
249 base::Bind(
250 &NonBlockingInvalidator::Core::Initialize,
251 core_.get(),
252 initialize_options))) {
253 NOTREACHED();
257 NonBlockingInvalidator::~NonBlockingInvalidator() {
258 DCHECK(parent_task_runner_->BelongsToCurrentThread());
259 if (!network_task_runner_->PostTask(
260 FROM_HERE,
261 base::Bind(&NonBlockingInvalidator::Core::Teardown,
262 core_.get()))) {
263 DVLOG(1) << "Network thread stopped before invalidator is destroyed.";
267 void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) {
268 DCHECK(parent_task_runner_->BelongsToCurrentThread());
269 registrar_.RegisterHandler(handler);
272 bool NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler,
273 const ObjectIdSet& ids) {
274 DCHECK(parent_task_runner_->BelongsToCurrentThread());
275 if (!registrar_.UpdateRegisteredIds(handler, ids))
276 return false;
277 if (!network_task_runner_->PostTask(
278 FROM_HERE,
279 base::Bind(
280 &NonBlockingInvalidator::Core::UpdateRegisteredIds,
281 core_.get(),
282 registrar_.GetAllRegisteredIds()))) {
283 NOTREACHED();
285 return true;
288 void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) {
289 DCHECK(parent_task_runner_->BelongsToCurrentThread());
290 registrar_.UnregisterHandler(handler);
293 InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const {
294 DCHECK(parent_task_runner_->BelongsToCurrentThread());
295 return registrar_.GetInvalidatorState();
298 void NonBlockingInvalidator::UpdateCredentials(const std::string& email,
299 const std::string& token) {
300 DCHECK(parent_task_runner_->BelongsToCurrentThread());
301 if (!network_task_runner_->PostTask(
302 FROM_HERE,
303 base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials,
304 core_.get(), email, token))) {
305 NOTREACHED();
309 void NonBlockingInvalidator::RequestDetailedStatus(
310 base::Callback<void(const base::DictionaryValue&)> callback) const {
311 DCHECK(parent_task_runner_->BelongsToCurrentThread());
312 base::Callback<void(const base::DictionaryValue&)> proxy_callback =
313 base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback)));
314 if (!network_task_runner_->PostTask(
315 FROM_HERE,
316 base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus,
317 core_.get(),
318 proxy_callback))) {
319 NOTREACHED();
323 NetworkChannelCreator
324 NonBlockingInvalidator::MakePushClientChannelCreator(
325 const notifier::NotifierOptions& notifier_options) {
326 return base::Bind(SyncNetworkChannel::CreatePushClientChannel,
327 notifier_options);
330 NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator(
331 scoped_refptr<net::URLRequestContextGetter> request_context_getter,
332 scoped_ptr<GCMNetworkChannelDelegate> delegate) {
333 return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel,
334 request_context_getter,
335 base::Passed(&delegate));
338 void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) {
339 DCHECK(parent_task_runner_->BelongsToCurrentThread());
340 invalidation_state_tracker_->ClearAndSetNewClientId(data);
343 std::string NonBlockingInvalidator::GetInvalidatorClientId() const {
344 DCHECK(parent_task_runner_->BelongsToCurrentThread());
345 return invalidation_state_tracker_->GetInvalidatorClientId();
348 void NonBlockingInvalidator::SetBootstrapData(const std::string& data) {
349 DCHECK(parent_task_runner_->BelongsToCurrentThread());
350 invalidation_state_tracker_->SetBootstrapData(data);
353 std::string NonBlockingInvalidator::GetBootstrapData() const {
354 DCHECK(parent_task_runner_->BelongsToCurrentThread());
355 return invalidation_state_tracker_->GetBootstrapData();
358 void NonBlockingInvalidator::SetSavedInvalidations(
359 const UnackedInvalidationsMap& states) {
360 DCHECK(parent_task_runner_->BelongsToCurrentThread());
361 invalidation_state_tracker_->SetSavedInvalidations(states);
364 UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const {
365 DCHECK(parent_task_runner_->BelongsToCurrentThread());
366 return invalidation_state_tracker_->GetSavedInvalidations();
369 void NonBlockingInvalidator::Clear() {
370 DCHECK(parent_task_runner_->BelongsToCurrentThread());
371 invalidation_state_tracker_->Clear();
374 void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) {
375 DCHECK(parent_task_runner_->BelongsToCurrentThread());
376 registrar_.UpdateInvalidatorState(state);
379 void NonBlockingInvalidator::OnIncomingInvalidation(
380 const ObjectIdInvalidationMap& invalidation_map) {
381 DCHECK(parent_task_runner_->BelongsToCurrentThread());
382 registrar_.DispatchInvalidationsToHandlers(invalidation_map);
385 std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; }
387 } // namespace syncer