1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/copresence/copresence_manager_impl.h"
10 #include "base/bind.h"
11 #include "base/strings/stringprintf.h"
12 #include "base/time/time.h"
13 #include "base/timer/timer.h"
14 #include "components/copresence/copresence_state_impl.h"
15 #include "components/copresence/handlers/directive_handler_impl.h"
16 #include "components/copresence/handlers/gcm_handler_impl.h"
17 #include "components/copresence/proto/rpcs.pb.h"
18 #include "components/copresence/public/whispernet_client.h"
19 #include "components/copresence/rpc/rpc_handler.h"
21 using google::protobuf::RepeatedPtrField
;
25 const int kPollTimerIntervalMs
= 3000; // milliseconds.
26 const int kAudioCheckIntervalMs
= 1000; // milliseconds.
28 const int kQueuedMessageTimeout
= 10; // seconds.
29 const int kMaxQueuedMessages
= 1000;
33 namespace copresence
{
35 bool SupportedTokenMedium(const TokenObservation
& token
) {
36 for (const TokenSignals
& signals
: token
.signals()) {
37 if (signals
.medium() == AUDIO_ULTRASOUND_PASSBAND
||
38 signals
.medium() == AUDIO_AUDIBLE_DTMF
)
47 CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate
* delegate
)
48 : delegate_(delegate
),
49 whispernet_init_callback_(base::Bind(
50 &CopresenceManagerImpl::WhispernetInitComplete
,
51 // This callback gets cancelled when we are destroyed.
52 base::Unretained(this))),
54 state_(new CopresenceStateImpl
),
55 directive_handler_(new DirectiveHandlerImpl(
56 // The directive handler and its descendants
57 // will be destructed before the CopresenceState instance.
58 base::Bind(&CopresenceStateImpl::UpdateDirectives
,
59 base::Unretained(state_
.get())))),
60 poll_timer_(new base::RepeatingTimer
<CopresenceManagerImpl
>),
61 audio_check_timer_(new base::RepeatingTimer
<CopresenceManagerImpl
>),
62 queued_messages_by_token_(
63 base::TimeDelta::FromSeconds(kQueuedMessageTimeout
),
66 DCHECK(delegate_
->GetWhispernetClient());
67 delegate_
->GetWhispernetClient()->Initialize(
68 whispernet_init_callback_
.callback());
70 MessagesCallback messages_callback
= base::Bind(
71 &CopresenceManagerImpl::DispatchMessages
,
72 // This will only be passed to objects that we own.
73 base::Unretained(this));
75 if (delegate
->GetGCMDriver())
76 gcm_handler_
.reset(new GCMHandlerImpl(delegate
->GetGCMDriver(),
77 directive_handler_
.get(),
80 rpc_handler_
.reset(new RpcHandler(delegate
,
82 directive_handler_
.get(),
87 CopresenceManagerImpl::~CopresenceManagerImpl() {
88 whispernet_init_callback_
.Cancel();
91 CopresenceState
* CopresenceManagerImpl::state() {
95 // Returns false if any operations were malformed.
96 void CopresenceManagerImpl::ExecuteReportRequest(
97 const ReportRequest
& request
,
98 const std::string
& app_id
,
99 const std::string
& auth_token
,
100 const StatusCallback
& callback
) {
101 // If initialization has failed, reject all requests.
107 // We'll need to modify the ReportRequest, so we make our own copy to send.
108 scoped_ptr
<ReportRequest
> request_copy(new ReportRequest(request
));
109 rpc_handler_
->SendReportRequest(
110 request_copy
.Pass(), app_id
, auth_token
, callback
);
114 // Private functions.
116 void CopresenceManagerImpl::WhispernetInitComplete(bool success
) {
118 DVLOG(3) << "Whispernet initialized successfully.";
120 directive_handler_
->Start(delegate_
->GetWhispernetClient(),
121 base::Bind(&CopresenceManagerImpl::ReceivedTokens
,
122 base::Unretained(this)));
125 poll_timer_
->Start(FROM_HERE
,
126 base::TimeDelta::FromMilliseconds(kPollTimerIntervalMs
),
127 base::Bind(&CopresenceManagerImpl::PollForMessages
,
128 base::Unretained(this)));
129 audio_check_timer_
->Start(
130 FROM_HERE
, base::TimeDelta::FromMilliseconds(kAudioCheckIntervalMs
),
131 base::Bind(&CopresenceManagerImpl::AudioCheck
, base::Unretained(this)));
133 LOG(ERROR
) << "Whispernet initialization failed!";
138 void CopresenceManagerImpl::ReceivedTokens(
139 const std::vector
<AudioToken
>& tokens
) {
140 rpc_handler_
->ReportTokens(tokens
);
142 for (const AudioToken audio_token
: tokens
) {
143 const std::string
& token_id
= audio_token
.token
;
144 DVLOG(3) << "Heard token: " << token_id
;
146 // Update the CopresenceState.
149 audio_token
.audible
? AUDIO_AUDIBLE_DTMF
: AUDIO_ULTRASOUND_PASSBAND
,
151 state_
->UpdateReceivedToken(token
);
153 // Deliver messages that were pre-sent on this token.
154 if (queued_messages_by_token_
.HasKey(token_id
)) {
155 // Not const because we have to remove the required tokens for delivery.
156 // We're going to delete this whole vector at the end anyway.
157 RepeatedPtrField
<SubscribedMessage
>* messages
=
158 queued_messages_by_token_
.GetMutableValue(token_id
);
159 DCHECK_GT(messages
->size(), 0)
160 << "Empty entry in queued_messages_by_token_";
162 // These messages still have their required tokens stored, and
163 // DispatchMessages() will still check for them. If we don't remove
164 // the tokens before delivery, we'll just end up re-queuing the message.
165 for (SubscribedMessage
& message
: *messages
)
166 message
.mutable_required_token()->Clear();
168 DVLOG(3) << "Delivering " << messages
->size()
169 << " message(s) pre-sent on token " << token_id
;
170 DispatchMessages(*messages
);
172 // The messages have been delivered, so we don't need to keep them
173 // in the queue. Note that the token will still be reported
174 // to the server (above), so we'll keep getting the message.
175 // But we can now drop our local copy of it.
176 int erase_count
= queued_messages_by_token_
.Erase(token_id
);
177 DCHECK_GT(erase_count
, 0);
182 void CopresenceManagerImpl::AudioCheck() {
183 if (!directive_handler_
->GetCurrentAudioToken(AUDIBLE
).empty() &&
184 !directive_handler_
->IsAudioTokenHeard(AUDIBLE
)) {
185 delegate_
->HandleStatusUpdate(AUDIO_FAIL
);
186 } else if (!directive_handler_
->GetCurrentAudioToken(INAUDIBLE
).empty() &&
187 !directive_handler_
->IsAudioTokenHeard(INAUDIBLE
)) {
188 delegate_
->HandleStatusUpdate(AUDIO_FAIL
);
192 // Report our currently playing tokens to the server.
193 void CopresenceManagerImpl::PollForMessages() {
194 const std::string
& audible_token
=
195 directive_handler_
->GetCurrentAudioToken(AUDIBLE
);
196 const std::string
& inaudible_token
=
197 directive_handler_
->GetCurrentAudioToken(INAUDIBLE
);
199 std::vector
<AudioToken
> tokens
;
200 if (!audible_token
.empty())
201 tokens
.push_back(AudioToken(audible_token
, true));
202 if (!inaudible_token
.empty())
203 tokens
.push_back(AudioToken(inaudible_token
, false));
206 rpc_handler_
->ReportTokens(tokens
);
209 void CopresenceManagerImpl::DispatchMessages(
210 const RepeatedPtrField
<SubscribedMessage
>& messages
) {
211 if (messages
.size() == 0)
214 // Index the messages by subscription id.
215 std::map
<std::string
, std::vector
<Message
>> messages_by_subscription
;
216 DVLOG(3) << "Processing " << messages
.size() << " received message(s).";
217 int immediate_message_count
= 0;
218 for (const SubscribedMessage
& message
: messages
) {
219 // If tokens are required for this message, queue it.
220 // Otherwise stage it for delivery.
221 if (message
.required_token_size() > 0) {
222 int supported_token_count
= 0;
223 for (const TokenObservation
& token
: message
.required_token()) {
224 if (SupportedTokenMedium(token
)) {
225 if (!queued_messages_by_token_
.HasKey(token
.token_id())) {
226 queued_messages_by_token_
.Add(
227 token
.token_id(), RepeatedPtrField
<SubscribedMessage
>());
229 RepeatedPtrField
<SubscribedMessage
>* queued_messages
=
230 queued_messages_by_token_
.GetMutableValue(token
.token_id());
231 DCHECK(queued_messages
);
232 queued_messages
->Add()->CopyFrom(message
);
233 supported_token_count
++;
237 if (supported_token_count
> 0) {
238 DVLOG(3) << "Queued message under " << supported_token_count
241 VLOG(2) << "Discarded message that requires one of "
242 << message
.required_token_size()
243 << " token(s), all on unsupported mediums.";
246 immediate_message_count
++;
247 for (const std::string
& subscription_id
: message
.subscription_id()) {
248 messages_by_subscription
[subscription_id
].push_back(
249 message
.published_message());
254 // Send the messages for each subscription.
255 DVLOG(3) << "Dispatching " << immediate_message_count
<< "message(s) for "
256 << messages_by_subscription
.size() << " subscription(s).";
257 for (const auto& map_entry
: messages_by_subscription
) {
258 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
259 // it in here and get rid of the app id registry from the main API class.
260 const std::string
& subscription
= map_entry
.first
;
261 const std::vector
<Message
>& messages
= map_entry
.second
;
262 delegate_
->HandleMessages(std::string(), subscription
, messages
);
266 } // namespace copresence