1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/copresence/copresence_manager_impl.h"
10 #include "base/bind.h"
11 #include "base/strings/stringprintf.h"
12 #include "base/time/time.h"
13 #include "base/timer/timer.h"
14 #include "components/audio_modem/public/whispernet_client.h"
15 #include "components/copresence/copresence_state_impl.h"
16 #include "components/copresence/handlers/directive_handler_impl.h"
17 #include "components/copresence/handlers/gcm_handler_impl.h"
18 #include "components/copresence/proto/rpcs.pb.h"
19 #include "components/copresence/rpc/rpc_handler.h"
21 using google::protobuf::RepeatedPtrField
;
23 using audio_modem::AUDIBLE
;
24 using audio_modem::AudioToken
;
25 using audio_modem::INAUDIBLE
;
29 const int kPollTimerIntervalMs
= 3000; // milliseconds.
30 const int kAudioCheckIntervalMs
= 1000; // milliseconds.
32 const int kQueuedMessageTimeout
= 10; // seconds.
33 const int kMaxQueuedMessages
= 1000;
37 namespace copresence
{
39 bool SupportedTokenMedium(const TokenObservation
& token
) {
40 for (const TokenSignals
& signals
: token
.signals()) {
41 if (signals
.medium() == AUDIO_ULTRASOUND_PASSBAND
||
42 signals
.medium() == AUDIO_AUDIBLE_DTMF
)
51 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate
* delegate
)
52 : delegate_(delegate
),
53 whispernet_init_callback_(base::Bind(
54 &CopresenceManagerImpl::WhispernetInitComplete
,
55 // This callback gets cancelled when we are destroyed.
56 base::Unretained(this))),
58 state_(new CopresenceStateImpl
),
59 directive_handler_(new DirectiveHandlerImpl(
60 // The directive handler and its descendants
61 // will be destructed before the CopresenceState instance.
62 base::Bind(&CopresenceStateImpl::UpdateDirectives
,
63 base::Unretained(state_
.get())))),
64 poll_timer_(new base::RepeatingTimer
<CopresenceManagerImpl
>),
65 audio_check_timer_(new base::RepeatingTimer
<CopresenceManagerImpl
>),
66 queued_messages_by_token_(
67 base::TimeDelta::FromSeconds(kQueuedMessageTimeout
),
70 DCHECK(delegate_
->GetWhispernetClient());
71 // TODO(ckehoe): Handle whispernet initialization in the whispernet component.
72 delegate_
->GetWhispernetClient()->Initialize(
73 whispernet_init_callback_
.callback());
75 MessagesCallback messages_callback
= base::Bind(
76 &CopresenceManagerImpl::DispatchMessages
,
77 // This will only be passed to objects that we own.
78 base::Unretained(this));
80 if (delegate
->GetGCMDriver())
81 gcm_handler_
.reset(new GCMHandlerImpl(delegate
->GetGCMDriver(),
82 directive_handler_
.get(),
85 rpc_handler_
.reset(new RpcHandler(delegate
,
87 directive_handler_
.get(),
91 directive_handler_
->Start(delegate_
->GetWhispernetClient(),
92 base::Bind(&CopresenceManagerImpl::ReceivedTokens
,
93 base::Unretained(this)));
96 CopresenceManagerImpl::~CopresenceManagerImpl() {
97 whispernet_init_callback_
.Cancel();
100 CopresenceState
* CopresenceManagerImpl::state() {
104 // Returns false if any operations were malformed.
105 void CopresenceManagerImpl::ExecuteReportRequest(
106 const ReportRequest
& request
,
107 const std::string
& app_id
,
108 const std::string
& auth_token
,
109 const StatusCallback
& callback
) {
110 // If initialization has failed, reject all requests.
116 // We'll need to modify the ReportRequest, so we make our own copy to send.
117 scoped_ptr
<ReportRequest
> request_copy(new ReportRequest(request
));
118 rpc_handler_
->SendReportRequest(
119 request_copy
.Pass(), app_id
, auth_token
, callback
);
123 // Private functions.
125 void CopresenceManagerImpl::WhispernetInitComplete(bool success
) {
127 DVLOG(3) << "Whispernet initialized successfully.";
128 poll_timer_
->Start(FROM_HERE
,
129 base::TimeDelta::FromMilliseconds(kPollTimerIntervalMs
),
130 base::Bind(&CopresenceManagerImpl::PollForMessages
,
131 base::Unretained(this)));
132 audio_check_timer_
->Start(
133 FROM_HERE
, base::TimeDelta::FromMilliseconds(kAudioCheckIntervalMs
),
134 base::Bind(&CopresenceManagerImpl::AudioCheck
, base::Unretained(this)));
136 LOG(ERROR
) << "Whispernet initialization failed!";
141 void CopresenceManagerImpl::ReceivedTokens(
142 const std::vector
<AudioToken
>& tokens
) {
143 rpc_handler_
->ReportTokens(tokens
);
145 for (const AudioToken audio_token
: tokens
) {
146 const std::string
& token_id
= audio_token
.token
;
147 DVLOG(3) << "Heard token: " << token_id
;
149 // Update the CopresenceState.
152 audio_token
.audible
? AUDIO_AUDIBLE_DTMF
: AUDIO_ULTRASOUND_PASSBAND
,
154 state_
->UpdateReceivedToken(token
);
156 // Deliver messages that were pre-sent on this token.
157 if (queued_messages_by_token_
.HasKey(token_id
)) {
158 // Not const because we have to remove the required tokens for delivery.
159 // We're going to delete this whole vector at the end anyway.
160 RepeatedPtrField
<SubscribedMessage
>* messages
=
161 queued_messages_by_token_
.GetMutableValue(token_id
);
162 DCHECK_GT(messages
->size(), 0)
163 << "Empty entry in queued_messages_by_token_";
165 // These messages still have their required tokens stored, and
166 // DispatchMessages() will still check for them. If we don't remove
167 // the tokens before delivery, we'll just end up re-queuing the message.
168 for (SubscribedMessage
& message
: *messages
)
169 message
.mutable_required_token()->Clear();
171 DVLOG(3) << "Delivering " << messages
->size()
172 << " message(s) pre-sent on token " << token_id
;
173 DispatchMessages(*messages
);
175 // The messages have been delivered, so we don't need to keep them
176 // in the queue. Note that the token will still be reported
177 // to the server (above), so we'll keep getting the message.
178 // But we can now drop our local copy of it.
179 int erase_count
= queued_messages_by_token_
.Erase(token_id
);
180 DCHECK_GT(erase_count
, 0);
185 void CopresenceManagerImpl::AudioCheck() {
186 if (!directive_handler_
->GetCurrentAudioToken(AUDIBLE
).empty() &&
187 !directive_handler_
->IsAudioTokenHeard(AUDIBLE
)) {
188 delegate_
->HandleStatusUpdate(AUDIO_FAIL
);
189 } else if (!directive_handler_
->GetCurrentAudioToken(INAUDIBLE
).empty() &&
190 !directive_handler_
->IsAudioTokenHeard(INAUDIBLE
)) {
191 delegate_
->HandleStatusUpdate(AUDIO_FAIL
);
195 // Report our currently playing tokens to the server.
196 void CopresenceManagerImpl::PollForMessages() {
197 const std::string
& audible_token
=
198 directive_handler_
->GetCurrentAudioToken(AUDIBLE
);
199 const std::string
& inaudible_token
=
200 directive_handler_
->GetCurrentAudioToken(INAUDIBLE
);
202 std::vector
<AudioToken
> tokens
;
203 if (!audible_token
.empty())
204 tokens
.push_back(AudioToken(audible_token
, true));
205 if (!inaudible_token
.empty())
206 tokens
.push_back(AudioToken(inaudible_token
, false));
209 rpc_handler_
->ReportTokens(tokens
);
212 void CopresenceManagerImpl::DispatchMessages(
213 const RepeatedPtrField
<SubscribedMessage
>& messages
) {
214 if (messages
.size() == 0)
217 // Index the messages by subscription id.
218 std::map
<std::string
, std::vector
<Message
>> messages_by_subscription
;
219 DVLOG(3) << "Processing " << messages
.size() << " received message(s).";
220 int immediate_message_count
= 0;
221 for (const SubscribedMessage
& message
: messages
) {
222 // If tokens are required for this message, queue it.
223 // Otherwise stage it for delivery.
224 if (message
.required_token_size() > 0) {
225 int supported_token_count
= 0;
226 for (const TokenObservation
& token
: message
.required_token()) {
227 if (SupportedTokenMedium(token
)) {
228 if (!queued_messages_by_token_
.HasKey(token
.token_id())) {
229 queued_messages_by_token_
.Add(
230 token
.token_id(), RepeatedPtrField
<SubscribedMessage
>());
232 RepeatedPtrField
<SubscribedMessage
>* queued_messages
=
233 queued_messages_by_token_
.GetMutableValue(token
.token_id());
234 DCHECK(queued_messages
);
235 queued_messages
->Add()->CopyFrom(message
);
236 supported_token_count
++;
240 if (supported_token_count
> 0) {
241 DVLOG(3) << "Queued message under " << supported_token_count
244 VLOG(2) << "Discarded message that requires one of "
245 << message
.required_token_size()
246 << " token(s), all on unsupported mediums.";
249 immediate_message_count
++;
250 for (const std::string
& subscription_id
: message
.subscription_id()) {
251 messages_by_subscription
[subscription_id
].push_back(
252 message
.published_message());
257 // Send the messages for each subscription.
258 DVLOG(3) << "Dispatching " << immediate_message_count
<< "message(s) for "
259 << messages_by_subscription
.size() << " subscription(s).";
260 for (const auto& map_entry
: messages_by_subscription
) {
261 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
262 // it in here and get rid of the app id registry from the main API class.
263 const std::string
& subscription
= map_entry
.first
;
264 const std::vector
<Message
>& messages
= map_entry
.second
;
265 delegate_
->HandleMessages(std::string(), subscription
, messages
);
269 } // namespace copresence