Extension syncing: Introduce a NeedsSync pref
[chromium-blink-merge.git] / components / copresence / copresence_manager_impl.cc
blob25b8a5cb68c9178438909ef00e427b79fe844b35
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/copresence/copresence_manager_impl.h"
7 #include <map>
8 #include <vector>
10 #include "base/bind.h"
11 #include "base/strings/stringprintf.h"
12 #include "base/time/time.h"
13 #include "base/timer/timer.h"
14 #include "components/audio_modem/public/whispernet_client.h"
15 #include "components/copresence/copresence_state_impl.h"
16 #include "components/copresence/handlers/directive_handler_impl.h"
17 #include "components/copresence/handlers/gcm_handler_impl.h"
18 #include "components/copresence/proto/rpcs.pb.h"
19 #include "components/copresence/rpc/rpc_handler.h"
21 using google::protobuf::RepeatedPtrField;
23 using audio_modem::AUDIBLE;
24 using audio_modem::AudioToken;
25 using audio_modem::INAUDIBLE;
27 namespace {
29 const int kPollTimerIntervalMs = 3000; // milliseconds.
30 const int kAudioCheckIntervalMs = 1000; // milliseconds.
32 const int kQueuedMessageTimeout = 10; // seconds.
33 const int kMaxQueuedMessages = 1000;
35 } // namespace
37 namespace copresence {
39 bool SupportedTokenMedium(const TokenObservation& token) {
40 for (const TokenSignals& signals : token.signals()) {
41 if (signals.medium() == AUDIO_ULTRASOUND_PASSBAND ||
42 signals.medium() == AUDIO_AUDIBLE_DTMF)
43 return true;
45 return false;
49 // Public functions.
51 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate)
52 : delegate_(delegate),
53 whispernet_init_callback_(base::Bind(
54 &CopresenceManagerImpl::WhispernetInitComplete,
55 // This callback gets cancelled when we are destroyed.
56 base::Unretained(this))),
57 init_failed_(false),
58 state_(new CopresenceStateImpl),
59 directive_handler_(new DirectiveHandlerImpl(
60 // The directive handler and its descendants
61 // will be destructed before the CopresenceState instance.
62 base::Bind(&CopresenceStateImpl::UpdateDirectives,
63 base::Unretained(state_.get())))),
64 poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
65 audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
66 queued_messages_by_token_(
67 base::TimeDelta::FromSeconds(kQueuedMessageTimeout),
68 kMaxQueuedMessages) {
69 DCHECK(delegate_);
70 DCHECK(delegate_->GetWhispernetClient());
71 // TODO(ckehoe): Handle whispernet initialization in the whispernet component.
72 delegate_->GetWhispernetClient()->Initialize(
73 whispernet_init_callback_.callback());
75 MessagesCallback messages_callback = base::Bind(
76 &CopresenceManagerImpl::DispatchMessages,
77 // This will only be passed to objects that we own.
78 base::Unretained(this));
80 if (delegate->GetGCMDriver())
81 gcm_handler_.reset(new GCMHandlerImpl(delegate->GetGCMDriver(),
82 directive_handler_.get(),
83 messages_callback));
85 rpc_handler_.reset(new RpcHandler(delegate,
86 directive_handler_.get(),
87 state_.get(),
88 gcm_handler_.get(),
89 messages_callback));
91 directive_handler_->Start(delegate_->GetWhispernetClient(),
92 base::Bind(&CopresenceManagerImpl::ReceivedTokens,
93 base::Unretained(this)));
96 CopresenceManagerImpl::~CopresenceManagerImpl() {
97 whispernet_init_callback_.Cancel();
100 CopresenceState* CopresenceManagerImpl::state() {
101 return state_.get();
104 // Returns false if any operations were malformed.
105 void CopresenceManagerImpl::ExecuteReportRequest(
106 const ReportRequest& request,
107 const std::string& app_id,
108 const std::string& auth_token,
109 const StatusCallback& callback) {
110 // If initialization has failed, reject all requests.
111 if (init_failed_) {
112 callback.Run(FAIL);
113 return;
116 // We'll need to modify the ReportRequest, so we make our own copy to send.
117 scoped_ptr<ReportRequest> request_copy(new ReportRequest(request));
118 rpc_handler_->SendReportRequest(
119 request_copy.Pass(), app_id, auth_token, callback);
123 // Private functions.
125 void CopresenceManagerImpl::WhispernetInitComplete(bool success) {
126 if (success) {
127 DVLOG(3) << "Whispernet initialized successfully.";
128 poll_timer_->Start(FROM_HERE,
129 base::TimeDelta::FromMilliseconds(kPollTimerIntervalMs),
130 base::Bind(&CopresenceManagerImpl::PollForMessages,
131 base::Unretained(this)));
132 audio_check_timer_->Start(
133 FROM_HERE, base::TimeDelta::FromMilliseconds(kAudioCheckIntervalMs),
134 base::Bind(&CopresenceManagerImpl::AudioCheck, base::Unretained(this)));
135 } else {
136 LOG(ERROR) << "Whispernet initialization failed!";
137 init_failed_ = true;
141 void CopresenceManagerImpl::ReceivedTokens(
142 const std::vector<AudioToken>& tokens) {
143 rpc_handler_->ReportTokens(tokens);
145 for (const AudioToken audio_token : tokens) {
146 const std::string& token_id = audio_token.token;
147 DVLOG(3) << "Heard token: " << token_id;
149 // Update the CopresenceState.
150 ReceivedToken token(
151 token_id,
152 audio_token.audible ? AUDIO_AUDIBLE_DTMF : AUDIO_ULTRASOUND_PASSBAND,
153 base::Time::Now());
154 state_->UpdateReceivedToken(token);
156 // Deliver messages that were pre-sent on this token.
157 if (queued_messages_by_token_.HasKey(token_id)) {
158 // Not const because we have to remove the required tokens for delivery.
159 // We're going to delete this whole vector at the end anyway.
160 RepeatedPtrField<SubscribedMessage>* messages =
161 queued_messages_by_token_.GetMutableValue(token_id);
162 DCHECK_GT(messages->size(), 0)
163 << "Empty entry in queued_messages_by_token_";
165 // These messages still have their required tokens stored, and
166 // DispatchMessages() will still check for them. If we don't remove
167 // the tokens before delivery, we'll just end up re-queuing the message.
168 for (SubscribedMessage& message : *messages)
169 message.mutable_required_token()->Clear();
171 DVLOG(3) << "Delivering " << messages->size()
172 << " message(s) pre-sent on token " << token_id;
173 DispatchMessages(*messages);
175 // The messages have been delivered, so we don't need to keep them
176 // in the queue. Note that the token will still be reported
177 // to the server (above), so we'll keep getting the message.
178 // But we can now drop our local copy of it.
179 int erase_count = queued_messages_by_token_.Erase(token_id);
180 DCHECK_GT(erase_count, 0);
185 void CopresenceManagerImpl::AudioCheck() {
186 if (!directive_handler_->GetCurrentAudioToken(AUDIBLE).empty() &&
187 !directive_handler_->IsAudioTokenHeard(AUDIBLE)) {
188 delegate_->HandleStatusUpdate(AUDIO_FAIL);
189 } else if (!directive_handler_->GetCurrentAudioToken(INAUDIBLE).empty() &&
190 !directive_handler_->IsAudioTokenHeard(INAUDIBLE)) {
191 delegate_->HandleStatusUpdate(AUDIO_FAIL);
195 // Report our currently playing tokens to the server.
196 void CopresenceManagerImpl::PollForMessages() {
197 const std::string& audible_token =
198 directive_handler_->GetCurrentAudioToken(AUDIBLE);
199 const std::string& inaudible_token =
200 directive_handler_->GetCurrentAudioToken(INAUDIBLE);
202 std::vector<AudioToken> tokens;
203 if (!audible_token.empty())
204 tokens.push_back(AudioToken(audible_token, true));
205 if (!inaudible_token.empty())
206 tokens.push_back(AudioToken(inaudible_token, false));
208 if (!tokens.empty())
209 rpc_handler_->ReportTokens(tokens);
212 void CopresenceManagerImpl::DispatchMessages(
213 const RepeatedPtrField<SubscribedMessage>& messages) {
214 if (messages.size() == 0)
215 return;
217 // Index the messages by subscription id.
218 std::map<std::string, std::vector<Message>> messages_by_subscription;
219 DVLOG(3) << "Processing " << messages.size() << " received message(s).";
220 int immediate_message_count = 0;
221 for (const SubscribedMessage& message : messages) {
222 // If tokens are required for this message, queue it.
223 // Otherwise stage it for delivery.
224 if (message.required_token_size() > 0) {
225 int supported_token_count = 0;
226 for (const TokenObservation& token : message.required_token()) {
227 if (SupportedTokenMedium(token)) {
228 if (!queued_messages_by_token_.HasKey(token.token_id())) {
229 queued_messages_by_token_.Add(
230 token.token_id(), RepeatedPtrField<SubscribedMessage>());
232 RepeatedPtrField<SubscribedMessage>* queued_messages =
233 queued_messages_by_token_.GetMutableValue(token.token_id());
234 DCHECK(queued_messages);
235 queued_messages->Add()->CopyFrom(message);
236 supported_token_count++;
240 if (supported_token_count > 0) {
241 DVLOG(3) << "Queued message under " << supported_token_count
242 << "token(s).";
243 } else {
244 VLOG(2) << "Discarded message that requires one of "
245 << message.required_token_size()
246 << " token(s), all on unsupported mediums.";
248 } else {
249 immediate_message_count++;
250 for (const std::string& subscription_id : message.subscription_id()) {
251 messages_by_subscription[subscription_id].push_back(
252 message.published_message());
257 // Send the messages for each subscription.
258 DVLOG(3) << "Dispatching " << immediate_message_count << "message(s) for "
259 << messages_by_subscription.size() << " subscription(s).";
260 for (const auto& map_entry : messages_by_subscription) {
261 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
262 // it in here and get rid of the app id registry from the main API class.
263 const std::string& subscription = map_entry.first;
264 const std::vector<Message>& messages = map_entry.second;
265 delegate_->HandleMessages(std::string(), subscription, messages);
269 } // namespace copresence