1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/invalidation/sync_system_resources.h"
11 #include "base/bind.h"
12 #include "base/logging.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/stl_util.h"
15 #include "base/strings/string_util.h"
16 #include "base/strings/stringprintf.h"
17 #include "components/invalidation/gcm_network_channel.h"
18 #include "components/invalidation/gcm_network_channel_delegate.h"
19 #include "components/invalidation/invalidation_util.h"
20 #include "components/invalidation/push_client_channel.h"
21 #include "google/cacheinvalidation/deps/callback.h"
22 #include "google/cacheinvalidation/include/types.h"
23 #include "jingle/notifier/listener/push_client.h"
27 SyncLogger::SyncLogger() {}
28 SyncLogger::~SyncLogger() {}
30 void SyncLogger::Log(LogLevel level
, const char* file
, int line
,
31 const char* format
, ...) {
32 logging::LogSeverity log_severity
= -2; // VLOG(2)
33 bool emit_log
= false;
36 log_severity
= -2; // VLOG(2)
37 emit_log
= VLOG_IS_ON(2);
40 log_severity
= -1; // VLOG(1)
41 emit_log
= VLOG_IS_ON(1);
44 log_severity
= logging::LOG_WARNING
;
45 emit_log
= LOG_IS_ON(WARNING
);
48 log_severity
= logging::LOG_ERROR
;
49 emit_log
= LOG_IS_ON(ERROR
);
56 base::StringAppendV(&result
, format
, ap
);
57 logging::LogMessage(file
, line
, log_severity
).stream() << result
;
62 void SyncLogger::SetSystemResources(invalidation::SystemResources
* resources
) {
66 SyncInvalidationScheduler::SyncInvalidationScheduler()
67 : created_on_loop_(base::MessageLoop::current()),
71 CHECK(created_on_loop_
);
74 SyncInvalidationScheduler::~SyncInvalidationScheduler() {
75 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
79 void SyncInvalidationScheduler::Start() {
80 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
84 weak_factory_
.InvalidateWeakPtrs();
87 void SyncInvalidationScheduler::Stop() {
88 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
91 weak_factory_
.InvalidateWeakPtrs();
92 STLDeleteElements(&posted_tasks_
);
95 void SyncInvalidationScheduler::Schedule(invalidation::TimeDelta delay
,
96 invalidation::Closure
* task
) {
97 DCHECK(invalidation::IsCallbackRepeatable(task
));
98 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
105 posted_tasks_
.insert(task
);
106 base::MessageLoop::current()->PostDelayedTask(
107 FROM_HERE
, base::Bind(&SyncInvalidationScheduler::RunPostedTask
,
108 weak_factory_
.GetWeakPtr(), task
),
112 bool SyncInvalidationScheduler::IsRunningOnThread() const {
113 return created_on_loop_
== base::MessageLoop::current();
116 invalidation::Time
SyncInvalidationScheduler::GetCurrentTime() const {
117 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
118 return base::Time::Now();
121 void SyncInvalidationScheduler::SetSystemResources(
122 invalidation::SystemResources
* resources
) {
126 void SyncInvalidationScheduler::RunPostedTask(invalidation::Closure
* task
) {
127 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
129 posted_tasks_
.erase(task
);
133 SyncNetworkChannel::SyncNetworkChannel()
134 : last_network_status_(false),
135 received_messages_count_(0) {}
137 SyncNetworkChannel::~SyncNetworkChannel() {
138 STLDeleteElements(&network_status_receivers_
);
141 void SyncNetworkChannel::SetMessageReceiver(
142 invalidation::MessageCallback
* incoming_receiver
) {
143 incoming_receiver_
.reset(incoming_receiver
);
146 void SyncNetworkChannel::AddNetworkStatusReceiver(
147 invalidation::NetworkStatusCallback
* network_status_receiver
) {
148 network_status_receiver
->Run(last_network_status_
);
149 network_status_receivers_
.push_back(network_status_receiver
);
152 void SyncNetworkChannel::SetSystemResources(
153 invalidation::SystemResources
* resources
) {
157 void SyncNetworkChannel::AddObserver(Observer
* observer
) {
158 observers_
.AddObserver(observer
);
161 void SyncNetworkChannel::RemoveObserver(Observer
* observer
) {
162 observers_
.RemoveObserver(observer
);
165 scoped_ptr
<SyncNetworkChannel
> SyncNetworkChannel::CreatePushClientChannel(
166 const notifier::NotifierOptions
& notifier_options
) {
167 scoped_ptr
<notifier::PushClient
> push_client(
168 notifier::PushClient::CreateDefaultOnIOThread(notifier_options
));
169 return scoped_ptr
<SyncNetworkChannel
>(
170 new PushClientChannel(push_client
.Pass()));
173 scoped_ptr
<SyncNetworkChannel
> SyncNetworkChannel::CreateGCMNetworkChannel(
174 scoped_refptr
<net::URLRequestContextGetter
> request_context_getter
,
175 scoped_ptr
<GCMNetworkChannelDelegate
> delegate
) {
176 return scoped_ptr
<SyncNetworkChannel
>(new GCMNetworkChannel(
177 request_context_getter
, delegate
.Pass()));
180 void SyncNetworkChannel::NotifyNetworkStatusChange(bool online
) {
181 // Remember network state for future NetworkStatusReceivers.
182 last_network_status_
= online
;
183 // Notify NetworkStatusReceivers in cacheinvalidation.
184 for (NetworkStatusReceiverList::const_iterator it
=
185 network_status_receivers_
.begin();
186 it
!= network_status_receivers_
.end(); ++it
) {
191 void SyncNetworkChannel::NotifyChannelStateChange(
192 InvalidatorState invalidator_state
) {
193 FOR_EACH_OBSERVER(Observer
, observers_
,
194 OnNetworkChannelStateChanged(invalidator_state
));
197 bool SyncNetworkChannel::DeliverIncomingMessage(const std::string
& message
) {
198 if (!incoming_receiver_
) {
199 DLOG(ERROR
) << "No receiver for incoming notification";
202 received_messages_count_
++;
203 incoming_receiver_
->Run(message
);
207 int SyncNetworkChannel::GetReceivedMessagesCount() const {
208 return received_messages_count_
;
211 SyncStorage::SyncStorage(StateWriter
* state_writer
,
212 invalidation::Scheduler
* scheduler
)
213 : state_writer_(state_writer
),
214 scheduler_(scheduler
) {
215 DCHECK(state_writer_
);
219 SyncStorage::~SyncStorage() {}
221 void SyncStorage::WriteKey(const std::string
& key
, const std::string
& value
,
222 invalidation::WriteKeyCallback
* done
) {
223 CHECK(state_writer_
);
224 // TODO(ghc): actually write key,value associations, and don't invoke the
225 // callback until the operation completes.
226 state_writer_
->WriteState(value
);
227 cached_state_
= value
;
228 // According to the cache invalidation API folks, we can do this as
229 // long as we make sure to clear the persistent state that we start
230 // up the cache invalidation client with. However, we musn't do it
231 // right away, as we may be called under a lock that the callback
233 scheduler_
->Schedule(
234 invalidation::Scheduler::NoDelay(),
235 invalidation::NewPermanentCallback(
236 this, &SyncStorage::RunAndDeleteWriteKeyCallback
,
240 void SyncStorage::ReadKey(const std::string
& key
,
241 invalidation::ReadKeyCallback
* done
) {
242 DCHECK(scheduler_
->IsRunningOnThread()) << "not running on scheduler thread";
243 RunAndDeleteReadKeyCallback(done
, cached_state_
);
246 void SyncStorage::DeleteKey(const std::string
& key
,
247 invalidation::DeleteKeyCallback
* done
) {
248 // TODO(ghc): Implement.
249 LOG(WARNING
) << "ignoring call to DeleteKey(" << key
<< ", callback)";
252 void SyncStorage::ReadAllKeys(invalidation::ReadAllKeysCallback
* done
) {
253 // TODO(ghc): Implement.
254 LOG(WARNING
) << "ignoring call to ReadAllKeys(callback)";
257 void SyncStorage::SetSystemResources(
258 invalidation::SystemResources
* resources
) {
262 void SyncStorage::RunAndDeleteWriteKeyCallback(
263 invalidation::WriteKeyCallback
* callback
) {
265 invalidation::Status(invalidation::Status::SUCCESS
, std::string()));
269 void SyncStorage::RunAndDeleteReadKeyCallback(
270 invalidation::ReadKeyCallback
* callback
, const std::string
& value
) {
271 callback
->Run(std::make_pair(
272 invalidation::Status(invalidation::Status::SUCCESS
, std::string()),
277 SyncSystemResources::SyncSystemResources(
278 SyncNetworkChannel
* sync_network_channel
,
279 StateWriter
* state_writer
)
280 : is_started_(false),
281 logger_(new SyncLogger()),
282 internal_scheduler_(new SyncInvalidationScheduler()),
283 listener_scheduler_(new SyncInvalidationScheduler()),
284 storage_(new SyncStorage(state_writer
, internal_scheduler_
.get())),
285 sync_network_channel_(sync_network_channel
) {
288 SyncSystemResources::~SyncSystemResources() {
292 void SyncSystemResources::Start() {
293 internal_scheduler_
->Start();
294 listener_scheduler_
->Start();
298 void SyncSystemResources::Stop() {
299 internal_scheduler_
->Stop();
300 listener_scheduler_
->Stop();
303 bool SyncSystemResources::IsStarted() const {
307 void SyncSystemResources::set_platform(const std::string
& platform
) {
308 platform_
= platform
;
311 std::string
SyncSystemResources::platform() const {
315 SyncLogger
* SyncSystemResources::logger() {
316 return logger_
.get();
319 SyncStorage
* SyncSystemResources::storage() {
320 return storage_
.get();
323 SyncNetworkChannel
* SyncSystemResources::network() {
324 return sync_network_channel_
;
327 SyncInvalidationScheduler
* SyncSystemResources::internal_scheduler() {
328 return internal_scheduler_
.get();
331 SyncInvalidationScheduler
* SyncSystemResources::listener_scheduler() {
332 return listener_scheduler_
.get();
335 } // namespace syncer