1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/copresence/rpc/rpc_handler.h"
10 #include "base/command_line.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
15 // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
16 // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
17 // we fix this with an #undef.
18 #include "base/time/time.h"
20 #undef DeviceCapabilities
23 #include "components/copresence/copresence_switches.h"
24 #include "components/copresence/handlers/directive_handler.h"
25 #include "components/copresence/proto/codes.pb.h"
26 #include "components/copresence/proto/data.pb.h"
27 #include "components/copresence/proto/rpcs.pb.h"
28 #include "components/copresence/public/copresence_delegate.h"
29 #include "net/http/http_status_code.h"
31 // TODO(ckehoe): Return error messages for bad requests.
33 namespace copresence
{
35 using google::protobuf::MessageLite
;
36 using google::protobuf::RepeatedPtrField
;
38 const char RpcHandler::kReportRequestRpcName
[] = "report";
42 // UrlSafe is defined as:
43 // '/' represented by a '_' and '+' represented by a '-'
44 // TODO(rkc): Move this to the wrapper.
45 std::string
ToUrlSafe(std::string token
) {
46 base::ReplaceChars(token
, "+", "-", &token
);
47 base::ReplaceChars(token
, "/", "_", &token
);
51 const int kInvalidTokenExpiryTimeMs
= 10 * 60 * 1000; // 10 minutes.
52 const int kMaxInvalidTokens
= 10000;
53 const char kRegisterDeviceRpcName
[] = "registerdevice";
54 const char kDefaultCopresenceServer
[] =
55 "https://www.googleapis.com/copresence/v2/copresence";
59 // Checks for a copresence error. If there is one, logs it and returns true.
60 bool CopresenceErrorLogged(const Status
& status
) {
61 if (status
.code() != OK
) {
62 LOG(ERROR
) << "Copresence error code " << status
.code()
63 << (status
.message().empty() ? std::string() :
64 ": " + status
.message());
66 return status
.code() != OK
;
69 void LogIfErrorStatus(const util::error::Code
& code
,
70 const std::string
& context
) {
71 LOG_IF(ERROR
, code
!= util::error::OK
)
72 << context
<< " error " << code
<< ". See "
73 << "cs/google3/util/task/codes.proto for more info.";
76 // If any errors occurred, logs them and returns true.
77 bool ReportErrorLogged(const ReportResponse
& response
) {
78 bool result
= CopresenceErrorLogged(response
.header().status());
80 // The Report fails or succeeds as a unit. If any responses had errors,
81 // the header will too. Thus we don't need to propagate individual errors.
82 if (response
.has_update_signals_response())
83 LogIfErrorStatus(response
.update_signals_response().status(), "Update");
84 if (response
.has_manage_messages_response())
85 LogIfErrorStatus(response
.manage_messages_response().status(), "Publish");
86 if (response
.has_manage_subscriptions_response()) {
87 LogIfErrorStatus(response
.manage_subscriptions_response().status(),
94 // Request construction
95 // TODO(ckehoe): Move these into a separate file?
98 BroadcastScanConfiguration
GetBroadcastScanConfig(const T
& msg
) {
99 if (msg
.has_token_exchange_strategy() &&
100 msg
.token_exchange_strategy().has_broadcast_scan_configuration()) {
101 return msg
.token_exchange_strategy().broadcast_scan_configuration();
103 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN
;
106 // This method will extract token exchange strategies
107 // from the publishes and subscribes in a report request.
108 // TODO(ckehoe): Delete this when the server supports
109 // BroadcastScanConfiguration.
110 BroadcastScanConfiguration
ExtractTokenExchangeStrategy(
111 const ReportRequest
& request
) {
112 bool broadcast_only
= false;
113 bool scan_only
= false;
115 // Strategies for publishes.
116 if (request
.has_manage_messages_request()) {
117 const RepeatedPtrField
<PublishedMessage
>& messages
=
118 request
.manage_messages_request().message_to_publish();
119 for (int i
= 0; i
< messages
.size(); ++i
) {
120 BroadcastScanConfiguration config
=
121 GetBroadcastScanConfig(messages
.Get(i
));
122 broadcast_only
= broadcast_only
|| config
== BROADCAST_ONLY
;
123 scan_only
= scan_only
|| config
== SCAN_ONLY
;
124 if (config
== BROADCAST_AND_SCAN
|| (broadcast_only
&& scan_only
))
125 return BROADCAST_AND_SCAN
;
129 // Strategies for subscriptions.
130 if (request
.has_manage_subscriptions_request()) {
131 const RepeatedPtrField
<Subscription
> subscriptions
=
132 request
.manage_subscriptions_request().subscription();
133 for (int i
= 0; i
< subscriptions
.size(); ++i
) {
134 BroadcastScanConfiguration config
=
135 GetBroadcastScanConfig(subscriptions
.Get(i
));
136 broadcast_only
= broadcast_only
|| config
== BROADCAST_ONLY
;
137 scan_only
= scan_only
|| config
== SCAN_ONLY
;
138 if (config
== BROADCAST_AND_SCAN
|| (broadcast_only
&& scan_only
))
139 return BROADCAST_AND_SCAN
;
144 return BROADCAST_ONLY
;
148 // If nothing else is specified, default to both broadcast and scan.
149 return BROADCAST_AND_SCAN
;
152 // TODO(rkc): Fix this hack once the server supports setting strategies per
154 bool ExtractIsAudibleStrategy(const ReportRequest
& request
) {
155 if (request
.has_manage_messages_request()) {
156 const RepeatedPtrField
<PublishedMessage
> messages
=
157 request
.manage_messages_request().message_to_publish();
158 for (int i
= 0; i
< messages
.size(); ++i
) {
159 const PublishedMessage
& msg
= messages
.Get(i
);
160 if (msg
.has_token_exchange_strategy() &&
161 msg
.token_exchange_strategy().has_use_audible() &&
162 msg
.token_exchange_strategy().use_audible()) {
170 scoped_ptr
<DeviceState
> GetDeviceCapabilities(const ReportRequest
& request
) {
171 scoped_ptr
<DeviceState
> state(new DeviceState
);
173 TokenTechnology
* token_technology
=
174 state
->mutable_capabilities()->add_token_technology();
175 token_technology
->set_medium(AUDIO_ULTRASOUND_PASSBAND
);
176 if (ExtractIsAudibleStrategy(request
))
177 token_technology
->set_medium(AUDIO_AUDIBLE_DTMF
);
179 BroadcastScanConfiguration config
=
180 ExtractTokenExchangeStrategy(request
);
181 if (config
== BROADCAST_ONLY
|| config
== BROADCAST_AND_SCAN
)
182 token_technology
->add_instruction_type(TRANSMIT
);
183 if (config
== SCAN_ONLY
|| config
== BROADCAST_AND_SCAN
)
184 token_technology
->add_instruction_type(RECEIVE
);
189 // TODO(ckehoe): We're keeping this code in a separate function for now
190 // because we get a version string from Chrome, but the proto expects
191 // an int64 version. We should probably change the version proto
192 // to handle a more detailed version.
193 ClientVersion
* CreateVersion(const std::string
& client
,
194 const std::string
& version_name
) {
195 ClientVersion
* version
= new ClientVersion
;
197 version
->set_client(client
);
198 version
->set_version_name(version_name
);
203 void AddTokenToRequest(ReportRequest
* request
, const AudioToken
& token
) {
204 TokenObservation
* token_observation
=
205 request
->mutable_update_signals_request()->add_token_observation();
206 token_observation
->set_token_id(ToUrlSafe(token
.token
));
208 TokenSignals
* signals
= token_observation
->add_signals();
209 signals
->set_medium(token
.audible
? AUDIO_AUDIBLE_DTMF
210 : AUDIO_ULTRASOUND_PASSBAND
);
211 signals
->set_observed_time_millis(base::Time::Now().ToJsTime());
218 RpcHandler::RpcHandler(CopresenceDelegate
* delegate
)
219 : delegate_(delegate
),
220 invalid_audio_token_cache_(
221 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs
),
223 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost
,
224 base::Unretained(this))) {}
226 RpcHandler::~RpcHandler() {
227 for (std::set
<HttpPost
*>::iterator post
= pending_posts_
.begin();
228 post
!= pending_posts_
.end(); ++post
) {
232 if (delegate_
&& delegate_
->GetWhispernetClient()) {
233 delegate_
->GetWhispernetClient()->RegisterTokensCallback(
234 WhispernetClient::TokensCallback());
238 void RpcHandler::Initialize(const SuccessCallback
& init_done_callback
) {
239 scoped_ptr
<RegisterDeviceRequest
> request(new RegisterDeviceRequest
);
240 DCHECK(device_id_
.empty());
242 request
->mutable_push_service()->set_service(PUSH_SERVICE_NONE
);
244 request
->mutable_device_identifiers()->mutable_registrant();
245 identity
->set_type(CHROME
);
246 identity
->set_chrome_id(base::GenerateGUID());
248 kRegisterDeviceRpcName
,
251 base::Bind(&RpcHandler::RegisterResponseHandler
,
252 // On destruction, this request will be cancelled.
253 base::Unretained(this),
254 init_done_callback
));
257 void RpcHandler::SendReportRequest(scoped_ptr
<ReportRequest
> request
) {
258 SendReportRequest(request
.Pass(), std::string(), StatusCallback());
261 void RpcHandler::SendReportRequest(scoped_ptr
<ReportRequest
> request
,
262 const std::string
& app_id
,
263 const StatusCallback
& status_callback
) {
264 DCHECK(request
.get());
265 DCHECK(!device_id_
.empty())
266 << "RpcHandler::Initialize() must complete successfully "
267 << "before other RpcHandler methods are called.";
269 DVLOG(3) << "Sending report request to server.";
271 // If we are unpublishing or unsubscribing, we need to stop those publish or
272 // subscribes right away, we don't need to wait for the server to tell us.
273 ProcessRemovedOperations(*request
);
275 request
->mutable_update_signals_request()->set_allocated_state(
276 GetDeviceCapabilities(*request
).release());
278 AddPlayingTokens(request
.get());
280 // TODO(ckehoe): Currently the server supports only BROADCAST_AND_SCAN.
281 // Remove this once b/16715253 is fixed.
282 if (request
->has_manage_messages_request()) {
283 RepeatedPtrField
<PublishedMessage
>* messages
= request
284 ->mutable_manage_messages_request()->mutable_message_to_publish();
285 for (int i
= 0; i
< messages
->size(); ++i
) {
286 messages
->Mutable(i
)->mutable_token_exchange_strategy()
287 ->set_broadcast_scan_configuration(BROADCAST_AND_SCAN
);
290 if (request
->has_manage_subscriptions_request()) {
291 RepeatedPtrField
<Subscription
>* subscriptions
=
292 request
->mutable_manage_subscriptions_request()->mutable_subscription();
293 for (int i
= 0; i
< subscriptions
->size(); ++i
) {
294 subscriptions
->Mutable(i
)->mutable_token_exchange_strategy()
295 ->set_broadcast_scan_configuration(BROADCAST_AND_SCAN
);
299 SendServerRequest(kReportRequestRpcName
,
302 // On destruction, this request will be cancelled.
303 base::Bind(&RpcHandler::ReportResponseHandler
,
304 base::Unretained(this),
308 void RpcHandler::ReportTokens(const std::vector
<AudioToken
>& tokens
) {
309 DCHECK(!tokens
.empty());
311 scoped_ptr
<ReportRequest
> request(new ReportRequest
);
312 for (size_t i
= 0; i
< tokens
.size(); ++i
) {
313 if (invalid_audio_token_cache_
.HasKey(ToUrlSafe(tokens
[i
].token
)))
315 DVLOG(3) << "Sending token " << tokens
[i
].token
<< " to server.";
316 AddTokenToRequest(request
.get(), tokens
[i
]);
318 SendReportRequest(request
.Pass());
321 void RpcHandler::ConnectToWhispernet() {
322 WhispernetClient
* whispernet_client
= delegate_
->GetWhispernetClient();
324 // |directive_handler_| will be destructed with us, so unretained is safe.
325 directive_handler_
.reset(new DirectiveHandler
);
326 directive_handler_
->Initialize(
327 base::Bind(&WhispernetClient::DecodeSamples
,
328 base::Unretained(whispernet_client
)),
329 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector
,
330 base::Unretained(this)));
332 whispernet_client
->RegisterTokensCallback(
333 base::Bind(&RpcHandler::ReportTokens
,
334 // On destruction, this callback will be disconnected.
335 base::Unretained(this)));
340 void RpcHandler::RegisterResponseHandler(
341 const SuccessCallback
& init_done_callback
,
342 HttpPost
* completed_post
,
343 int http_status_code
,
344 const std::string
& response_data
) {
345 if (completed_post
) {
346 int elements_erased
= pending_posts_
.erase(completed_post
);
347 DCHECK(elements_erased
);
348 delete completed_post
;
351 if (http_status_code
!= net::HTTP_OK
) {
352 init_done_callback
.Run(false);
356 RegisterDeviceResponse response
;
357 if (!response
.ParseFromString(response_data
)) {
358 LOG(ERROR
) << "Invalid RegisterDeviceResponse:\n" << response_data
;
359 init_done_callback
.Run(false);
363 if (CopresenceErrorLogged(response
.header().status()))
365 device_id_
= response
.registered_device_id();
366 DCHECK(!device_id_
.empty());
367 DVLOG(2) << "Device registration successful: id " << device_id_
;
368 init_done_callback
.Run(true);
371 void RpcHandler::ReportResponseHandler(const StatusCallback
& status_callback
,
372 HttpPost
* completed_post
,
373 int http_status_code
,
374 const std::string
& response_data
) {
375 if (completed_post
) {
376 int elements_erased
= pending_posts_
.erase(completed_post
);
377 DCHECK(elements_erased
);
378 delete completed_post
;
381 if (http_status_code
!= net::HTTP_OK
) {
382 if (!status_callback
.is_null())
383 status_callback
.Run(FAIL
);
387 DVLOG(3) << "Received ReportResponse.";
388 ReportResponse response
;
389 if (!response
.ParseFromString(response_data
)) {
390 LOG(ERROR
) << "Invalid ReportResponse";
391 if (!status_callback
.is_null())
392 status_callback
.Run(FAIL
);
396 if (ReportErrorLogged(response
)) {
397 if (!status_callback
.is_null())
398 status_callback
.Run(FAIL
);
402 const RepeatedPtrField
<MessageResult
>& message_results
=
403 response
.manage_messages_response().published_message_result();
404 for (int i
= 0; i
< message_results
.size(); ++i
) {
405 DVLOG(2) << "Published message with id "
406 << message_results
.Get(i
).published_message_id();
409 const RepeatedPtrField
<SubscriptionResult
>& subscription_results
=
410 response
.manage_subscriptions_response().subscription_result();
411 for (int i
= 0; i
< subscription_results
.size(); ++i
) {
412 DVLOG(2) << "Created subscription with id "
413 << subscription_results
.Get(i
).subscription_id();
416 if (response
.has_update_signals_response()) {
417 const UpdateSignalsResponse
& update_response
=
418 response
.update_signals_response();
419 DispatchMessages(update_response
.message());
421 if (directive_handler_
.get()) {
422 for (int i
= 0; i
< update_response
.directive_size(); ++i
)
423 directive_handler_
->AddDirective(update_response
.directive(i
));
425 DVLOG(1) << "No directive handler.";
428 const RepeatedPtrField
<Token
>& tokens
= update_response
.token();
429 for (int i
= 0; i
< tokens
.size(); ++i
) {
430 switch (tokens
.Get(i
).status()) {
432 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
433 // short TTL (like 10s) and send it up with every report request.
434 // Then we'll still get messages while we're waiting to hear it again.
435 VLOG(1) << "Got valid token " << tokens
.Get(i
).id();
438 DVLOG(3) << "Discarding invalid token " << tokens
.Get(i
).id();
439 invalid_audio_token_cache_
.Add(tokens
.Get(i
).id(), true);
442 DVLOG(2) << "Token " << tokens
.Get(i
).id() << " has status code "
443 << tokens
.Get(i
).status();
448 // TODO(ckehoe): Return a more detailed status response.
449 if (!status_callback
.is_null())
450 status_callback
.Run(SUCCESS
);
453 void RpcHandler::ProcessRemovedOperations(const ReportRequest
& request
) {
454 // Remove unpublishes.
455 if (request
.has_manage_messages_request()) {
456 const RepeatedPtrField
<std::string
>& unpublishes
=
457 request
.manage_messages_request().id_to_unpublish();
458 for (int i
= 0; i
< unpublishes
.size(); ++i
)
459 directive_handler_
->RemoveDirectives(unpublishes
.Get(i
));
462 // Remove unsubscribes.
463 if (request
.has_manage_subscriptions_request()) {
464 const RepeatedPtrField
<std::string
>& unsubscribes
=
465 request
.manage_subscriptions_request().id_to_unsubscribe();
466 for (int i
= 0; i
< unsubscribes
.size(); ++i
)
467 directive_handler_
->RemoveDirectives(unsubscribes
.Get(i
));
471 void RpcHandler::AddPlayingTokens(ReportRequest
* request
) {
472 if (!directive_handler_
)
475 const std::string
& audible_token
= directive_handler_
->CurrentAudibleToken();
476 const std::string
& inaudible_token
=
477 directive_handler_
->CurrentInaudibleToken();
479 if (!audible_token
.empty())
480 AddTokenToRequest(request
, AudioToken(audible_token
, true));
481 if (!inaudible_token
.empty())
482 AddTokenToRequest(request
, AudioToken(inaudible_token
, false));
485 void RpcHandler::DispatchMessages(
486 const RepeatedPtrField
<SubscribedMessage
>& messages
) {
487 if (messages
.size() == 0)
490 // Index the messages by subscription id.
491 std::map
<std::string
, std::vector
<Message
> > messages_by_subscription
;
492 DVLOG(3) << "Dispatching " << messages
.size() << " messages";
493 for (int m
= 0; m
< messages
.size(); ++m
) {
494 const RepeatedPtrField
<std::string
>& subscription_ids
=
495 messages
.Get(m
).subscription_id();
496 for (int s
= 0; s
< subscription_ids
.size(); ++s
) {
497 messages_by_subscription
[subscription_ids
.Get(s
)].push_back(
498 messages
.Get(m
).published_message());
502 // Send the messages for each subscription.
503 for (std::map
<std::string
, std::vector
<Message
> >::const_iterator
504 subscription
= messages_by_subscription
.begin();
505 subscription
!= messages_by_subscription
.end();
507 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
508 // it in here and get rid of the app id registry from the main API class.
509 delegate_
->HandleMessages("", subscription
->first
, subscription
->second
);
513 RequestHeader
* RpcHandler::CreateRequestHeader(
514 const std::string
& client_name
) const {
515 RequestHeader
* header
= new RequestHeader
;
517 header
->set_allocated_framework_version(CreateVersion(
518 "Chrome", delegate_
->GetPlatformVersionString()));
519 if (!client_name
.empty()) {
520 header
->set_allocated_client_version(
521 CreateVersion(client_name
, std::string()));
523 header
->set_current_time_millis(base::Time::Now().ToJsTime());
524 header
->set_registered_device_id(device_id_
);
526 DeviceFingerprint
* fingerprint
= new DeviceFingerprint
;
527 fingerprint
->set_platform_version(delegate_
->GetPlatformVersionString());
528 fingerprint
->set_type(CHROME_PLATFORM_TYPE
);
529 header
->set_allocated_device_fingerprint(fingerprint
);
535 void RpcHandler::SendServerRequest(
536 const std::string
& rpc_name
,
537 const std::string
& app_id
,
538 scoped_ptr
<T
> request
,
539 const PostCleanupCallback
& response_handler
) {
540 request
->set_allocated_header(CreateRequestHeader(app_id
));
541 server_post_callback_
.Run(delegate_
->GetRequestContext(),
543 make_scoped_ptr
<MessageLite
>(request
.release()),
547 void RpcHandler::SendHttpPost(net::URLRequestContextGetter
* url_context_getter
,
548 const std::string
& rpc_name
,
549 scoped_ptr
<MessageLite
> request_proto
,
550 const PostCleanupCallback
& callback
) {
551 // Create the base URL to call.
552 CommandLine
* command_line
= CommandLine::ForCurrentProcess();
553 const std::string copresence_server_host
=
554 command_line
->HasSwitch(switches::kCopresenceServer
) ?
555 command_line
->GetSwitchValueASCII(switches::kCopresenceServer
) :
556 kDefaultCopresenceServer
;
558 // Create the request and keep a pointer until it completes.
559 HttpPost
* http_post
= new HttpPost(
561 copresence_server_host
,
563 command_line
->GetSwitchValueASCII(switches::kCopresenceTracingToken
),
564 delegate_
->GetAPIKey(),
567 http_post
->Start(base::Bind(callback
, http_post
));
568 pending_posts_
.insert(http_post
);
571 void RpcHandler::AudioDirectiveListToWhispernetConnector(
572 const std::string
& token
,
574 const WhispernetClient::SamplesCallback
& samples_callback
) {
575 WhispernetClient
* whispernet_client
= delegate_
->GetWhispernetClient();
576 if (whispernet_client
) {
577 whispernet_client
->RegisterSamplesCallback(samples_callback
);
578 whispernet_client
->EncodeToken(token
, audible
);
582 } // namespace copresence