Explicitly add python-numpy dependency to install-build-deps.
[chromium-blink-merge.git] / components / invalidation / non_blocking_invalidator.cc
blob1d520b857ab80e9f527ff15ad52cdc63f8597d73
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/invalidation/non_blocking_invalidator.h"
7 #include <cstddef>
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "base/threading/thread.h"
15 #include "components/invalidation/gcm_network_channel_delegate.h"
16 #include "components/invalidation/invalidation_handler.h"
17 #include "components/invalidation/invalidation_notifier.h"
18 #include "components/invalidation/object_id_invalidation_map.h"
19 #include "components/invalidation/sync_system_resources.h"
20 #include "jingle/notifier/listener/push_client.h"
22 namespace syncer {
24 struct NonBlockingInvalidator::InitializeOptions {
25 InitializeOptions(
26 NetworkChannelCreator network_channel_creator,
27 const std::string& invalidator_client_id,
28 const UnackedInvalidationsMap& saved_invalidations,
29 const std::string& invalidation_bootstrap_data,
30 const base::WeakPtr<InvalidationStateTracker>& invalidation_state_tracker,
31 const scoped_refptr<base::SingleThreadTaskRunner>&
32 invalidation_state_tracker_task_runner,
33 const std::string& client_info,
34 scoped_refptr<net::URLRequestContextGetter> request_context_getter)
35 : network_channel_creator(network_channel_creator),
36 invalidator_client_id(invalidator_client_id),
37 saved_invalidations(saved_invalidations),
38 invalidation_bootstrap_data(invalidation_bootstrap_data),
39 invalidation_state_tracker(invalidation_state_tracker),
40 invalidation_state_tracker_task_runner(
41 invalidation_state_tracker_task_runner),
42 client_info(client_info),
43 request_context_getter(request_context_getter) {}
45 NetworkChannelCreator network_channel_creator;
46 std::string invalidator_client_id;
47 UnackedInvalidationsMap saved_invalidations;
48 std::string invalidation_bootstrap_data;
49 base::WeakPtr<InvalidationStateTracker> invalidation_state_tracker;
50 scoped_refptr<base::SingleThreadTaskRunner>
51 invalidation_state_tracker_task_runner;
52 std::string client_info;
53 scoped_refptr<net::URLRequestContextGetter> request_context_getter;
56 namespace {
57 // This class provides a wrapper for a logging class in order to receive
58 // callbacks across threads, without having to worry about owner threads.
59 class CallbackProxy {
60 public:
61 explicit CallbackProxy(
62 base::Callback<void(const base::DictionaryValue&)> callback);
63 ~CallbackProxy();
65 void Run(const base::DictionaryValue& value);
67 private:
68 static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback,
69 scoped_ptr<base::DictionaryValue> value);
71 base::Callback<void(const base::DictionaryValue&)> callback_;
72 scoped_refptr<base::SingleThreadTaskRunner> running_thread_;
74 DISALLOW_COPY_AND_ASSIGN(CallbackProxy);
77 CallbackProxy::CallbackProxy(
78 base::Callback<void(const base::DictionaryValue&)> callback)
79 : callback_(callback),
80 running_thread_(base::ThreadTaskRunnerHandle::Get()) {}
82 CallbackProxy::~CallbackProxy() {}
84 void CallbackProxy::DoRun(
85 base::Callback<void(const base::DictionaryValue&)> callback,
86 scoped_ptr<base::DictionaryValue> value) {
87 callback.Run(*value);
90 void CallbackProxy::Run(const base::DictionaryValue& value) {
91 scoped_ptr<base::DictionaryValue> copied(value.DeepCopy());
92 running_thread_->PostTask(
93 FROM_HERE,
94 base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied)));
98 class NonBlockingInvalidator::Core
99 : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>,
100 // InvalidationHandler to observe the InvalidationNotifier we create.
101 public InvalidationHandler {
102 public:
103 // Called on parent thread. |delegate_observer| should be initialized.
104 Core(const base::WeakPtr<NonBlockingInvalidator>& delegate_observer,
105 const scoped_refptr<base::SingleThreadTaskRunner>&
106 delegate_observer_task_runner);
108 // Helpers called on I/O thread.
109 void Initialize(
110 const NonBlockingInvalidator::InitializeOptions& initialize_options);
111 void Teardown();
112 void UpdateRegisteredIds(const ObjectIdSet& ids);
113 void UpdateCredentials(const std::string& email, const std::string& token);
114 void RequestDetailedStatus(
115 base::Callback<void(const base::DictionaryValue&)> callback) const;
117 // InvalidationHandler implementation (all called on I/O thread by
118 // InvalidationNotifier).
119 void OnInvalidatorStateChange(InvalidatorState reason) override;
120 void OnIncomingInvalidation(
121 const ObjectIdInvalidationMap& invalidation_map) override;
122 std::string GetOwnerName() const override;
124 private:
125 friend class
126 base::RefCountedThreadSafe<NonBlockingInvalidator::Core>;
127 // Called on parent or I/O thread.
128 ~Core() override;
130 // The variables below should be used only on the I/O thread.
131 const base::WeakPtr<NonBlockingInvalidator> delegate_observer_;
132 scoped_refptr<base::SingleThreadTaskRunner> delegate_observer_task_runner_;
133 scoped_ptr<InvalidationNotifier> invalidation_notifier_;
134 scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_;
136 DISALLOW_COPY_AND_ASSIGN(Core);
139 NonBlockingInvalidator::Core::Core(
140 const base::WeakPtr<NonBlockingInvalidator>& delegate_observer,
141 const scoped_refptr<base::SingleThreadTaskRunner>&
142 delegate_observer_task_runner)
143 : delegate_observer_(delegate_observer),
144 delegate_observer_task_runner_(delegate_observer_task_runner) {
145 DCHECK(delegate_observer_);
146 DCHECK(delegate_observer_task_runner_.get());
149 NonBlockingInvalidator::Core::~Core() {
152 void NonBlockingInvalidator::Core::Initialize(
153 const NonBlockingInvalidator::InitializeOptions& initialize_options) {
154 DCHECK(initialize_options.request_context_getter.get());
155 network_task_runner_ =
156 initialize_options.request_context_getter->GetNetworkTaskRunner();
157 DCHECK(network_task_runner_->BelongsToCurrentThread());
158 scoped_ptr<SyncNetworkChannel> network_channel =
159 initialize_options.network_channel_creator.Run();
160 invalidation_notifier_.reset(new InvalidationNotifier(
161 network_channel.Pass(),
162 initialize_options.invalidator_client_id,
163 initialize_options.saved_invalidations,
164 initialize_options.invalidation_bootstrap_data,
165 initialize_options.invalidation_state_tracker,
166 initialize_options.invalidation_state_tracker_task_runner,
167 initialize_options.client_info));
168 invalidation_notifier_->RegisterHandler(this);
171 void NonBlockingInvalidator::Core::Teardown() {
172 DCHECK(network_task_runner_->BelongsToCurrentThread());
173 invalidation_notifier_->UnregisterHandler(this);
174 invalidation_notifier_.reset();
175 network_task_runner_ = NULL;
178 void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) {
179 DCHECK(network_task_runner_->BelongsToCurrentThread());
180 invalidation_notifier_->UpdateRegisteredIds(this, ids);
183 void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email,
184 const std::string& token) {
185 DCHECK(network_task_runner_->BelongsToCurrentThread());
186 invalidation_notifier_->UpdateCredentials(email, token);
189 void NonBlockingInvalidator::Core::RequestDetailedStatus(
190 base::Callback<void(const base::DictionaryValue&)> callback) const {
191 DCHECK(network_task_runner_->BelongsToCurrentThread());
192 invalidation_notifier_->RequestDetailedStatus(callback);
195 void NonBlockingInvalidator::Core::OnInvalidatorStateChange(
196 InvalidatorState reason) {
197 DCHECK(network_task_runner_->BelongsToCurrentThread());
198 delegate_observer_task_runner_->PostTask(
199 FROM_HERE,
200 base::Bind(&NonBlockingInvalidator::OnInvalidatorStateChange,
201 delegate_observer_,
202 reason));
205 void NonBlockingInvalidator::Core::OnIncomingInvalidation(
206 const ObjectIdInvalidationMap& invalidation_map) {
207 DCHECK(network_task_runner_->BelongsToCurrentThread());
208 delegate_observer_task_runner_->PostTask(
209 FROM_HERE,
210 base::Bind(&NonBlockingInvalidator::OnIncomingInvalidation,
211 delegate_observer_,
212 invalidation_map));
215 std::string NonBlockingInvalidator::Core::GetOwnerName() const {
216 return "Sync";
219 NonBlockingInvalidator::NonBlockingInvalidator(
220 NetworkChannelCreator network_channel_creator,
221 const std::string& invalidator_client_id,
222 const UnackedInvalidationsMap& saved_invalidations,
223 const std::string& invalidation_bootstrap_data,
224 InvalidationStateTracker* invalidation_state_tracker,
225 const std::string& client_info,
226 const scoped_refptr<net::URLRequestContextGetter>& request_context_getter)
227 : invalidation_state_tracker_(invalidation_state_tracker),
228 parent_task_runner_(base::ThreadTaskRunnerHandle::Get()),
229 network_task_runner_(request_context_getter->GetNetworkTaskRunner()),
230 weak_ptr_factory_(this) {
231 base::WeakPtr<NonBlockingInvalidator> weak_ptr_this =
232 weak_ptr_factory_.GetWeakPtr();
233 weak_ptr_this.get(); // Bind to this thread.
235 core_ = new Core(weak_ptr_this,
236 base::MessageLoopProxy::current());
238 InitializeOptions initialize_options(network_channel_creator,
239 invalidator_client_id,
240 saved_invalidations,
241 invalidation_bootstrap_data,
242 weak_ptr_this,
243 base::MessageLoopProxy::current(),
244 client_info,
245 request_context_getter);
247 if (!network_task_runner_->PostTask(
248 FROM_HERE,
249 base::Bind(
250 &NonBlockingInvalidator::Core::Initialize,
251 core_.get(),
252 initialize_options))) {
253 NOTREACHED();
257 NonBlockingInvalidator::~NonBlockingInvalidator() {
258 DCHECK(parent_task_runner_->BelongsToCurrentThread());
259 if (!network_task_runner_->PostTask(
260 FROM_HERE,
261 base::Bind(&NonBlockingInvalidator::Core::Teardown,
262 core_.get()))) {
263 DVLOG(1) << "Network thread stopped before invalidator is destroyed.";
267 void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) {
268 DCHECK(parent_task_runner_->BelongsToCurrentThread());
269 registrar_.RegisterHandler(handler);
272 void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler,
273 const ObjectIdSet& ids) {
274 DCHECK(parent_task_runner_->BelongsToCurrentThread());
275 registrar_.UpdateRegisteredIds(handler, ids);
276 if (!network_task_runner_->PostTask(
277 FROM_HERE,
278 base::Bind(
279 &NonBlockingInvalidator::Core::UpdateRegisteredIds,
280 core_.get(),
281 registrar_.GetAllRegisteredIds()))) {
282 NOTREACHED();
286 void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) {
287 DCHECK(parent_task_runner_->BelongsToCurrentThread());
288 registrar_.UnregisterHandler(handler);
291 InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const {
292 DCHECK(parent_task_runner_->BelongsToCurrentThread());
293 return registrar_.GetInvalidatorState();
296 void NonBlockingInvalidator::UpdateCredentials(const std::string& email,
297 const std::string& token) {
298 DCHECK(parent_task_runner_->BelongsToCurrentThread());
299 if (!network_task_runner_->PostTask(
300 FROM_HERE,
301 base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials,
302 core_.get(), email, token))) {
303 NOTREACHED();
307 void NonBlockingInvalidator::RequestDetailedStatus(
308 base::Callback<void(const base::DictionaryValue&)> callback) const {
309 DCHECK(parent_task_runner_->BelongsToCurrentThread());
310 base::Callback<void(const base::DictionaryValue&)> proxy_callback =
311 base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback)));
312 if (!network_task_runner_->PostTask(
313 FROM_HERE,
314 base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus,
315 core_.get(),
316 proxy_callback))) {
317 NOTREACHED();
321 NetworkChannelCreator
322 NonBlockingInvalidator::MakePushClientChannelCreator(
323 const notifier::NotifierOptions& notifier_options) {
324 return base::Bind(SyncNetworkChannel::CreatePushClientChannel,
325 notifier_options);
328 NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator(
329 scoped_refptr<net::URLRequestContextGetter> request_context_getter,
330 scoped_ptr<GCMNetworkChannelDelegate> delegate) {
331 return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel,
332 request_context_getter,
333 base::Passed(&delegate));
336 void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) {
337 DCHECK(parent_task_runner_->BelongsToCurrentThread());
338 invalidation_state_tracker_->ClearAndSetNewClientId(data);
341 std::string NonBlockingInvalidator::GetInvalidatorClientId() const {
342 DCHECK(parent_task_runner_->BelongsToCurrentThread());
343 return invalidation_state_tracker_->GetInvalidatorClientId();
346 void NonBlockingInvalidator::SetBootstrapData(const std::string& data) {
347 DCHECK(parent_task_runner_->BelongsToCurrentThread());
348 invalidation_state_tracker_->SetBootstrapData(data);
351 std::string NonBlockingInvalidator::GetBootstrapData() const {
352 DCHECK(parent_task_runner_->BelongsToCurrentThread());
353 return invalidation_state_tracker_->GetBootstrapData();
356 void NonBlockingInvalidator::SetSavedInvalidations(
357 const UnackedInvalidationsMap& states) {
358 DCHECK(parent_task_runner_->BelongsToCurrentThread());
359 invalidation_state_tracker_->SetSavedInvalidations(states);
362 UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const {
363 DCHECK(parent_task_runner_->BelongsToCurrentThread());
364 return invalidation_state_tracker_->GetSavedInvalidations();
367 void NonBlockingInvalidator::Clear() {
368 DCHECK(parent_task_runner_->BelongsToCurrentThread());
369 invalidation_state_tracker_->Clear();
372 void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) {
373 DCHECK(parent_task_runner_->BelongsToCurrentThread());
374 registrar_.UpdateInvalidatorState(state);
377 void NonBlockingInvalidator::OnIncomingInvalidation(
378 const ObjectIdInvalidationMap& invalidation_map) {
379 DCHECK(parent_task_runner_->BelongsToCurrentThread());
380 registrar_.DispatchInvalidationsToHandlers(invalidation_map);
383 std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; }
385 } // namespace syncer