1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/notifier/sync_system_resources.h"
11 #include "base/bind.h"
12 #include "base/logging.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/stl_util.h"
15 #include "base/strings/string_util.h"
16 #include "base/strings/stringprintf.h"
17 #include "google/cacheinvalidation/client_gateway.pb.h"
18 #include "google/cacheinvalidation/deps/callback.h"
19 #include "google/cacheinvalidation/include/types.h"
20 #include "jingle/notifier/listener/push_client.h"
21 #include "sync/notifier/gcm_network_channel.h"
22 #include "sync/notifier/gcm_network_channel_delegate.h"
23 #include "sync/notifier/invalidation_util.h"
24 #include "sync/notifier/push_client_channel.h"
28 SyncLogger::SyncLogger() {}
29 SyncLogger::~SyncLogger() {}
31 void SyncLogger::Log(LogLevel level
, const char* file
, int line
,
32 const char* format
, ...) {
33 logging::LogSeverity log_severity
= -2; // VLOG(2)
34 bool emit_log
= false;
37 log_severity
= -2; // VLOG(2)
38 emit_log
= VLOG_IS_ON(2);
41 log_severity
= -1; // VLOG(1)
42 emit_log
= VLOG_IS_ON(1);
45 log_severity
= logging::LOG_WARNING
;
46 emit_log
= LOG_IS_ON(WARNING
);
49 log_severity
= logging::LOG_ERROR
;
50 emit_log
= LOG_IS_ON(ERROR
);
57 base::StringAppendV(&result
, format
, ap
);
58 logging::LogMessage(file
, line
, log_severity
).stream() << result
;
63 void SyncLogger::SetSystemResources(invalidation::SystemResources
* resources
) {
67 SyncInvalidationScheduler::SyncInvalidationScheduler()
68 : created_on_loop_(base::MessageLoop::current()),
72 CHECK(created_on_loop_
);
75 SyncInvalidationScheduler::~SyncInvalidationScheduler() {
76 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
80 void SyncInvalidationScheduler::Start() {
81 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
85 weak_factory_
.InvalidateWeakPtrs();
88 void SyncInvalidationScheduler::Stop() {
89 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
92 weak_factory_
.InvalidateWeakPtrs();
93 STLDeleteElements(&posted_tasks_
);
94 posted_tasks_
.clear();
97 void SyncInvalidationScheduler::Schedule(invalidation::TimeDelta delay
,
98 invalidation::Closure
* task
) {
99 DCHECK(invalidation::IsCallbackRepeatable(task
));
100 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
107 posted_tasks_
.insert(task
);
108 base::MessageLoop::current()->PostDelayedTask(
109 FROM_HERE
, base::Bind(&SyncInvalidationScheduler::RunPostedTask
,
110 weak_factory_
.GetWeakPtr(), task
),
114 bool SyncInvalidationScheduler::IsRunningOnThread() const {
115 return created_on_loop_
== base::MessageLoop::current();
118 invalidation::Time
SyncInvalidationScheduler::GetCurrentTime() const {
119 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
120 return base::Time::Now();
123 void SyncInvalidationScheduler::SetSystemResources(
124 invalidation::SystemResources
* resources
) {
128 void SyncInvalidationScheduler::RunPostedTask(invalidation::Closure
* task
) {
129 CHECK_EQ(created_on_loop_
, base::MessageLoop::current());
131 posted_tasks_
.erase(task
);
135 SyncNetworkChannel::SyncNetworkChannel()
136 : invalidator_state_(DEFAULT_INVALIDATION_ERROR
),
137 scheduling_hash_(0) {
140 SyncNetworkChannel::~SyncNetworkChannel() {
141 STLDeleteElements(&network_status_receivers_
);
144 void SyncNetworkChannel::SendMessage(const std::string
& outgoing_message
) {
145 std::string encoded_message
;
146 EncodeMessage(&encoded_message
, outgoing_message
, service_context_
,
148 SendEncodedMessage(encoded_message
);
151 void SyncNetworkChannel::SetMessageReceiver(
152 invalidation::MessageCallback
* incoming_receiver
) {
153 incoming_receiver_
.reset(incoming_receiver
);
156 void SyncNetworkChannel::AddNetworkStatusReceiver(
157 invalidation::NetworkStatusCallback
* network_status_receiver
) {
158 network_status_receiver
->Run(invalidator_state_
== INVALIDATIONS_ENABLED
);
159 network_status_receivers_
.push_back(network_status_receiver
);
162 void SyncNetworkChannel::SetSystemResources(
163 invalidation::SystemResources
* resources
) {
167 void SyncNetworkChannel::AddObserver(Observer
* observer
) {
168 observers_
.AddObserver(observer
);
171 void SyncNetworkChannel::RemoveObserver(Observer
* observer
) {
172 observers_
.RemoveObserver(observer
);
175 scoped_ptr
<SyncNetworkChannel
> SyncNetworkChannel::CreatePushClientChannel(
176 const notifier::NotifierOptions
& notifier_options
) {
177 scoped_ptr
<notifier::PushClient
> push_client(
178 notifier::PushClient::CreateDefaultOnIOThread(notifier_options
));
179 return scoped_ptr
<SyncNetworkChannel
>(
180 new PushClientChannel(push_client
.Pass()));
183 scoped_ptr
<SyncNetworkChannel
> SyncNetworkChannel::CreateGCMNetworkChannel(
184 scoped_refptr
<net::URLRequestContextGetter
> request_context_getter
,
185 scoped_ptr
<GCMNetworkChannelDelegate
> delegate
) {
186 return scoped_ptr
<SyncNetworkChannel
>(new GCMNetworkChannel(
187 request_context_getter
, delegate
.Pass()));
190 const std::string
& SyncNetworkChannel::GetServiceContextForTest() const {
191 return service_context_
;
194 int64
SyncNetworkChannel::GetSchedulingHashForTest() const {
195 return scheduling_hash_
;
198 std::string
SyncNetworkChannel::EncodeMessageForTest(
199 const std::string
& message
, const std::string
& service_context
,
200 int64 scheduling_hash
) {
201 std::string encoded_message
;
202 EncodeMessage(&encoded_message
, message
, service_context
, scheduling_hash
);
203 return encoded_message
;
206 bool SyncNetworkChannel::DecodeMessageForTest(
207 const std::string
& data
,
208 std::string
* message
,
209 std::string
* service_context
,
210 int64
* scheduling_hash
) {
211 return DecodeMessage(data
, message
, service_context
, scheduling_hash
);
214 void SyncNetworkChannel::NotifyStateChange(InvalidatorState invalidator_state
) {
215 // Remember state for future NetworkStatusReceivers.
216 invalidator_state_
= invalidator_state
;
217 // Notify NetworkStatusReceivers in cacheinvalidation.
218 for (NetworkStatusReceiverList::const_iterator it
=
219 network_status_receivers_
.begin();
220 it
!= network_status_receivers_
.end(); ++it
) {
221 (*it
)->Run(invalidator_state_
== INVALIDATIONS_ENABLED
);
224 FOR_EACH_OBSERVER(Observer
, observers_
,
225 OnNetworkChannelStateChanged(invalidator_state_
));
228 void SyncNetworkChannel::DeliverIncomingMessage(const std::string
& data
) {
229 if (!incoming_receiver_
) {
230 DLOG(ERROR
) << "No receiver for incoming notification";
234 if (!DecodeMessage(data
,
235 &message
, &service_context_
, &scheduling_hash_
)) {
236 DLOG(ERROR
) << "Could not parse ClientGatewayMessage";
239 incoming_receiver_
->Run(message
);
242 void SyncNetworkChannel::EncodeMessage(
243 std::string
* encoded_message
,
244 const std::string
& message
,
245 const std::string
& service_context
,
246 int64 scheduling_hash
) {
247 ipc::invalidation::ClientGatewayMessage envelope
;
248 envelope
.set_is_client_to_server(true);
249 if (!service_context
.empty()) {
250 envelope
.set_service_context(service_context
);
251 envelope
.set_rpc_scheduling_hash(scheduling_hash
);
253 envelope
.set_network_message(message
);
254 envelope
.SerializeToString(encoded_message
);
258 bool SyncNetworkChannel::DecodeMessage(
259 const std::string
& data
,
260 std::string
* message
,
261 std::string
* service_context
,
262 int64
* scheduling_hash
) {
263 ipc::invalidation::ClientGatewayMessage envelope
;
264 if (!envelope
.ParseFromString(data
)) {
267 *message
= envelope
.network_message();
268 if (envelope
.has_service_context()) {
269 *service_context
= envelope
.service_context();
271 if (envelope
.has_rpc_scheduling_hash()) {
272 *scheduling_hash
= envelope
.rpc_scheduling_hash();
278 SyncStorage::SyncStorage(StateWriter
* state_writer
,
279 invalidation::Scheduler
* scheduler
)
280 : state_writer_(state_writer
),
281 scheduler_(scheduler
) {
282 DCHECK(state_writer_
);
286 SyncStorage::~SyncStorage() {}
288 void SyncStorage::WriteKey(const std::string
& key
, const std::string
& value
,
289 invalidation::WriteKeyCallback
* done
) {
290 CHECK(state_writer_
);
291 // TODO(ghc): actually write key,value associations, and don't invoke the
292 // callback until the operation completes.
293 state_writer_
->WriteState(value
);
294 cached_state_
= value
;
295 // According to the cache invalidation API folks, we can do this as
296 // long as we make sure to clear the persistent state that we start
297 // up the cache invalidation client with. However, we musn't do it
298 // right away, as we may be called under a lock that the callback
300 scheduler_
->Schedule(
301 invalidation::Scheduler::NoDelay(),
302 invalidation::NewPermanentCallback(
303 this, &SyncStorage::RunAndDeleteWriteKeyCallback
,
307 void SyncStorage::ReadKey(const std::string
& key
,
308 invalidation::ReadKeyCallback
* done
) {
309 DCHECK(scheduler_
->IsRunningOnThread()) << "not running on scheduler thread";
310 RunAndDeleteReadKeyCallback(done
, cached_state_
);
313 void SyncStorage::DeleteKey(const std::string
& key
,
314 invalidation::DeleteKeyCallback
* done
) {
315 // TODO(ghc): Implement.
316 LOG(WARNING
) << "ignoring call to DeleteKey(" << key
<< ", callback)";
319 void SyncStorage::ReadAllKeys(invalidation::ReadAllKeysCallback
* done
) {
320 // TODO(ghc): Implement.
321 LOG(WARNING
) << "ignoring call to ReadAllKeys(callback)";
324 void SyncStorage::SetSystemResources(
325 invalidation::SystemResources
* resources
) {
329 void SyncStorage::RunAndDeleteWriteKeyCallback(
330 invalidation::WriteKeyCallback
* callback
) {
332 invalidation::Status(invalidation::Status::SUCCESS
, std::string()));
336 void SyncStorage::RunAndDeleteReadKeyCallback(
337 invalidation::ReadKeyCallback
* callback
, const std::string
& value
) {
338 callback
->Run(std::make_pair(
339 invalidation::Status(invalidation::Status::SUCCESS
, std::string()),
344 SyncSystemResources::SyncSystemResources(
345 SyncNetworkChannel
* sync_network_channel
,
346 StateWriter
* state_writer
)
347 : is_started_(false),
348 logger_(new SyncLogger()),
349 internal_scheduler_(new SyncInvalidationScheduler()),
350 listener_scheduler_(new SyncInvalidationScheduler()),
351 storage_(new SyncStorage(state_writer
, internal_scheduler_
.get())),
352 sync_network_channel_(sync_network_channel
) {
355 SyncSystemResources::~SyncSystemResources() {
359 void SyncSystemResources::Start() {
360 internal_scheduler_
->Start();
361 listener_scheduler_
->Start();
365 void SyncSystemResources::Stop() {
366 internal_scheduler_
->Stop();
367 listener_scheduler_
->Stop();
370 bool SyncSystemResources::IsStarted() const {
374 void SyncSystemResources::set_platform(const std::string
& platform
) {
375 platform_
= platform
;
378 std::string
SyncSystemResources::platform() const {
382 SyncLogger
* SyncSystemResources::logger() {
383 return logger_
.get();
386 SyncStorage
* SyncSystemResources::storage() {
387 return storage_
.get();
390 SyncNetworkChannel
* SyncSystemResources::network() {
391 return sync_network_channel_
;
394 SyncInvalidationScheduler
* SyncSystemResources::internal_scheduler() {
395 return internal_scheduler_
.get();
398 SyncInvalidationScheduler
* SyncSystemResources::listener_scheduler() {
399 return listener_scheduler_
.get();
402 } // namespace syncer