Updating trunk VERSION from 2139.0 to 2140.0
[chromium-blink-merge.git] / components / copresence / rpc / rpc_handler.cc
blob50cd3625d05b9b306f986e490721e2d220eca8e2
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/copresence/rpc/rpc_handler.h"
7 #include <map>
9 #include "base/bind.h"
10 #include "base/command_line.h"
11 #include "base/guid.h"
12 #include "base/logging.h"
13 #include "base/strings/string_util.h"
15 // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
16 // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
17 // we fix this with an #undef.
18 #include "base/time/time.h"
19 #if defined(OS_WIN)
20 #undef DeviceCapabilities
21 #endif
23 #include "components/copresence/copresence_switches.h"
24 #include "components/copresence/handlers/directive_handler.h"
25 #include "components/copresence/proto/codes.pb.h"
26 #include "components/copresence/proto/data.pb.h"
27 #include "components/copresence/proto/rpcs.pb.h"
28 #include "components/copresence/public/copresence_delegate.h"
29 #include "net/http/http_status_code.h"
31 // TODO(ckehoe): Return error messages for bad requests.
33 namespace copresence {
35 using google::protobuf::MessageLite;
36 using google::protobuf::RepeatedPtrField;
38 const char RpcHandler::kReportRequestRpcName[] = "report";
40 namespace {
42 // UrlSafe is defined as:
43 // '/' represented by a '_' and '+' represented by a '-'
44 // TODO(rkc): Move this to the wrapper.
45 std::string ToUrlSafe(std::string token) {
46 base::ReplaceChars(token, "+", "-", &token);
47 base::ReplaceChars(token, "/", "_", &token);
48 return token;
51 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
52 const int kMaxInvalidTokens = 10000;
53 const char kRegisterDeviceRpcName[] = "registerdevice";
54 const char kDefaultCopresenceServer[] =
55 "https://www.googleapis.com/copresence/v2/copresence";
57 // Logging
59 // Checks for a copresence error. If there is one, logs it and returns true.
60 bool CopresenceErrorLogged(const Status& status) {
61 if (status.code() != OK) {
62 LOG(ERROR) << "Copresence error code " << status.code()
63 << (status.message().empty() ? std::string() :
64 ": " + status.message());
66 return status.code() != OK;
69 void LogIfErrorStatus(const util::error::Code& code,
70 const std::string& context) {
71 LOG_IF(ERROR, code != util::error::OK)
72 << context << " error " << code << ". See "
73 << "cs/google3/util/task/codes.proto for more info.";
76 // If any errors occurred, logs them and returns true.
77 bool ReportErrorLogged(const ReportResponse& response) {
78 bool result = CopresenceErrorLogged(response.header().status());
80 // The Report fails or succeeds as a unit. If any responses had errors,
81 // the header will too. Thus we don't need to propagate individual errors.
82 if (response.has_update_signals_response())
83 LogIfErrorStatus(response.update_signals_response().status(), "Update");
84 if (response.has_manage_messages_response())
85 LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
86 if (response.has_manage_subscriptions_response()) {
87 LogIfErrorStatus(response.manage_subscriptions_response().status(),
88 "Subscribe");
91 return result;
94 // Request construction
95 // TODO(ckehoe): Move these into a separate file?
97 template <typename T>
98 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
99 if (msg.has_token_exchange_strategy() &&
100 msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
101 return msg.token_exchange_strategy().broadcast_scan_configuration();
103 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
106 // This method will extract token exchange strategies
107 // from the publishes and subscribes in a report request.
108 // TODO(ckehoe): Delete this when the server supports
109 // BroadcastScanConfiguration.
110 BroadcastScanConfiguration ExtractTokenExchangeStrategy(
111 const ReportRequest& request) {
112 bool broadcast_only = false;
113 bool scan_only = false;
115 // Strategies for publishes.
116 if (request.has_manage_messages_request()) {
117 const RepeatedPtrField<PublishedMessage>& messages =
118 request.manage_messages_request().message_to_publish();
119 for (int i = 0; i < messages.size(); ++i) {
120 BroadcastScanConfiguration config =
121 GetBroadcastScanConfig(messages.Get(i));
122 broadcast_only = broadcast_only || config == BROADCAST_ONLY;
123 scan_only = scan_only || config == SCAN_ONLY;
124 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
125 return BROADCAST_AND_SCAN;
129 // Strategies for subscriptions.
130 if (request.has_manage_subscriptions_request()) {
131 const RepeatedPtrField<Subscription> subscriptions =
132 request.manage_subscriptions_request().subscription();
133 for (int i = 0; i < subscriptions.size(); ++i) {
134 BroadcastScanConfiguration config =
135 GetBroadcastScanConfig(subscriptions.Get(i));
136 broadcast_only = broadcast_only || config == BROADCAST_ONLY;
137 scan_only = scan_only || config == SCAN_ONLY;
138 if (config == BROADCAST_AND_SCAN || (broadcast_only && scan_only))
139 return BROADCAST_AND_SCAN;
143 if (broadcast_only)
144 return BROADCAST_ONLY;
145 if (scan_only)
146 return SCAN_ONLY;
148 // If nothing else is specified, default to both broadcast and scan.
149 return BROADCAST_AND_SCAN;
152 // TODO(rkc): Fix this hack once the server supports setting strategies per
153 // operation.
154 bool ExtractIsAudibleStrategy(const ReportRequest& request) {
155 if (request.has_manage_messages_request()) {
156 const RepeatedPtrField<PublishedMessage> messages =
157 request.manage_messages_request().message_to_publish();
158 for (int i = 0; i < messages.size(); ++i) {
159 const PublishedMessage& msg = messages.Get(i);
160 if (msg.has_token_exchange_strategy() &&
161 msg.token_exchange_strategy().has_use_audible() &&
162 msg.token_exchange_strategy().use_audible()) {
163 return true;
167 return false;
170 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
171 scoped_ptr<DeviceState> state(new DeviceState);
173 TokenTechnology* token_technology =
174 state->mutable_capabilities()->add_token_technology();
175 token_technology->set_medium(AUDIO_ULTRASOUND_PASSBAND);
176 if (ExtractIsAudibleStrategy(request))
177 token_technology->set_medium(AUDIO_AUDIBLE_DTMF);
179 BroadcastScanConfiguration config =
180 ExtractTokenExchangeStrategy(request);
181 if (config == BROADCAST_ONLY || config == BROADCAST_AND_SCAN)
182 token_technology->add_instruction_type(TRANSMIT);
183 if (config == SCAN_ONLY || config == BROADCAST_AND_SCAN)
184 token_technology->add_instruction_type(RECEIVE);
186 return state.Pass();
189 // TODO(ckehoe): We're keeping this code in a separate function for now
190 // because we get a version string from Chrome, but the proto expects
191 // an int64 version. We should probably change the version proto
192 // to handle a more detailed version.
193 ClientVersion* CreateVersion(const std::string& client,
194 const std::string& version_name) {
195 ClientVersion* version = new ClientVersion;
197 version->set_client(client);
198 version->set_version_name(version_name);
200 return version;
203 void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
204 TokenObservation* token_observation =
205 request->mutable_update_signals_request()->add_token_observation();
206 token_observation->set_token_id(ToUrlSafe(token.token));
208 TokenSignals* signals = token_observation->add_signals();
209 signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
210 : AUDIO_ULTRASOUND_PASSBAND);
211 signals->set_observed_time_millis(base::Time::Now().ToJsTime());
214 } // namespace
216 // Public methods
218 RpcHandler::RpcHandler(CopresenceDelegate* delegate)
219 : delegate_(delegate),
220 invalid_audio_token_cache_(
221 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
222 kMaxInvalidTokens),
223 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
224 base::Unretained(this))) {}
226 RpcHandler::~RpcHandler() {
227 for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
228 post != pending_posts_.end(); ++post) {
229 delete *post;
232 if (delegate_ && delegate_->GetWhispernetClient()) {
233 delegate_->GetWhispernetClient()->RegisterTokensCallback(
234 WhispernetClient::TokensCallback());
238 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
239 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
240 DCHECK(device_id_.empty());
242 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
243 Identity* identity =
244 request->mutable_device_identifiers()->mutable_registrant();
245 identity->set_type(CHROME);
246 identity->set_chrome_id(base::GenerateGUID());
247 SendServerRequest(
248 kRegisterDeviceRpcName,
249 std::string(),
250 request.Pass(),
251 base::Bind(&RpcHandler::RegisterResponseHandler,
252 // On destruction, this request will be cancelled.
253 base::Unretained(this),
254 init_done_callback));
257 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
258 SendReportRequest(request.Pass(), std::string(), StatusCallback());
261 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
262 const std::string& app_id,
263 const StatusCallback& status_callback) {
264 DCHECK(request.get());
265 DCHECK(!device_id_.empty())
266 << "RpcHandler::Initialize() must complete successfully "
267 << "before other RpcHandler methods are called.";
269 DVLOG(3) << "Sending report request to server.";
271 // If we are unpublishing or unsubscribing, we need to stop those publish or
272 // subscribes right away, we don't need to wait for the server to tell us.
273 ProcessRemovedOperations(*request);
275 request->mutable_update_signals_request()->set_allocated_state(
276 GetDeviceCapabilities(*request).release());
278 AddPlayingTokens(request.get());
280 // TODO(ckehoe): Currently the server supports only BROADCAST_AND_SCAN.
281 // Remove this once b/16715253 is fixed.
282 if (request->has_manage_messages_request()) {
283 RepeatedPtrField<PublishedMessage>* messages = request
284 ->mutable_manage_messages_request()->mutable_message_to_publish();
285 for (int i = 0; i < messages->size(); ++i) {
286 messages->Mutable(i)->mutable_token_exchange_strategy()
287 ->set_broadcast_scan_configuration(BROADCAST_AND_SCAN);
290 if (request->has_manage_subscriptions_request()) {
291 RepeatedPtrField<Subscription>* subscriptions =
292 request->mutable_manage_subscriptions_request()->mutable_subscription();
293 for (int i = 0; i < subscriptions->size(); ++i) {
294 subscriptions->Mutable(i)->mutable_token_exchange_strategy()
295 ->set_broadcast_scan_configuration(BROADCAST_AND_SCAN);
299 SendServerRequest(kReportRequestRpcName,
300 app_id,
301 request.Pass(),
302 // On destruction, this request will be cancelled.
303 base::Bind(&RpcHandler::ReportResponseHandler,
304 base::Unretained(this),
305 status_callback));
308 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
309 DCHECK(!tokens.empty());
311 scoped_ptr<ReportRequest> request(new ReportRequest);
312 for (size_t i = 0; i < tokens.size(); ++i) {
313 if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
314 continue;
315 DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
316 AddTokenToRequest(request.get(), tokens[i]);
318 SendReportRequest(request.Pass());
321 void RpcHandler::ConnectToWhispernet() {
322 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
324 // |directive_handler_| will be destructed with us, so unretained is safe.
325 directive_handler_.reset(new DirectiveHandler);
326 directive_handler_->Initialize(
327 base::Bind(&WhispernetClient::DecodeSamples,
328 base::Unretained(whispernet_client)),
329 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
330 base::Unretained(this)));
332 whispernet_client->RegisterTokensCallback(
333 base::Bind(&RpcHandler::ReportTokens,
334 // On destruction, this callback will be disconnected.
335 base::Unretained(this)));
338 // Private methods
340 void RpcHandler::RegisterResponseHandler(
341 const SuccessCallback& init_done_callback,
342 HttpPost* completed_post,
343 int http_status_code,
344 const std::string& response_data) {
345 if (completed_post) {
346 int elements_erased = pending_posts_.erase(completed_post);
347 DCHECK(elements_erased);
348 delete completed_post;
351 if (http_status_code != net::HTTP_OK) {
352 init_done_callback.Run(false);
353 return;
356 RegisterDeviceResponse response;
357 if (!response.ParseFromString(response_data)) {
358 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
359 init_done_callback.Run(false);
360 return;
363 if (CopresenceErrorLogged(response.header().status()))
364 return;
365 device_id_ = response.registered_device_id();
366 DCHECK(!device_id_.empty());
367 DVLOG(2) << "Device registration successful: id " << device_id_;
368 init_done_callback.Run(true);
371 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
372 HttpPost* completed_post,
373 int http_status_code,
374 const std::string& response_data) {
375 if (completed_post) {
376 int elements_erased = pending_posts_.erase(completed_post);
377 DCHECK(elements_erased);
378 delete completed_post;
381 if (http_status_code != net::HTTP_OK) {
382 if (!status_callback.is_null())
383 status_callback.Run(FAIL);
384 return;
387 DVLOG(3) << "Received ReportResponse.";
388 ReportResponse response;
389 if (!response.ParseFromString(response_data)) {
390 LOG(ERROR) << "Invalid ReportResponse";
391 if (!status_callback.is_null())
392 status_callback.Run(FAIL);
393 return;
396 if (ReportErrorLogged(response)) {
397 if (!status_callback.is_null())
398 status_callback.Run(FAIL);
399 return;
402 const RepeatedPtrField<MessageResult>& message_results =
403 response.manage_messages_response().published_message_result();
404 for (int i = 0; i < message_results.size(); ++i) {
405 DVLOG(2) << "Published message with id "
406 << message_results.Get(i).published_message_id();
409 const RepeatedPtrField<SubscriptionResult>& subscription_results =
410 response.manage_subscriptions_response().subscription_result();
411 for (int i = 0; i < subscription_results.size(); ++i) {
412 DVLOG(2) << "Created subscription with id "
413 << subscription_results.Get(i).subscription_id();
416 if (response.has_update_signals_response()) {
417 const UpdateSignalsResponse& update_response =
418 response.update_signals_response();
419 DispatchMessages(update_response.message());
421 if (directive_handler_.get()) {
422 for (int i = 0; i < update_response.directive_size(); ++i)
423 directive_handler_->AddDirective(update_response.directive(i));
424 } else {
425 DVLOG(1) << "No directive handler.";
428 const RepeatedPtrField<Token>& tokens = update_response.token();
429 for (int i = 0; i < tokens.size(); ++i) {
430 switch (tokens.Get(i).status()) {
431 case VALID:
432 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
433 // short TTL (like 10s) and send it up with every report request.
434 // Then we'll still get messages while we're waiting to hear it again.
435 VLOG(1) << "Got valid token " << tokens.Get(i).id();
436 break;
437 case INVALID:
438 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
439 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
440 break;
441 default:
442 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
443 << tokens.Get(i).status();
448 // TODO(ckehoe): Return a more detailed status response.
449 if (!status_callback.is_null())
450 status_callback.Run(SUCCESS);
453 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
454 // Remove unpublishes.
455 if (request.has_manage_messages_request()) {
456 const RepeatedPtrField<std::string>& unpublishes =
457 request.manage_messages_request().id_to_unpublish();
458 for (int i = 0; i < unpublishes.size(); ++i)
459 directive_handler_->RemoveDirectives(unpublishes.Get(i));
462 // Remove unsubscribes.
463 if (request.has_manage_subscriptions_request()) {
464 const RepeatedPtrField<std::string>& unsubscribes =
465 request.manage_subscriptions_request().id_to_unsubscribe();
466 for (int i = 0; i < unsubscribes.size(); ++i)
467 directive_handler_->RemoveDirectives(unsubscribes.Get(i));
471 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
472 if (!directive_handler_)
473 return;
475 const std::string& audible_token = directive_handler_->CurrentAudibleToken();
476 const std::string& inaudible_token =
477 directive_handler_->CurrentInaudibleToken();
479 if (!audible_token.empty())
480 AddTokenToRequest(request, AudioToken(audible_token, true));
481 if (!inaudible_token.empty())
482 AddTokenToRequest(request, AudioToken(inaudible_token, false));
485 void RpcHandler::DispatchMessages(
486 const RepeatedPtrField<SubscribedMessage>& messages) {
487 if (messages.size() == 0)
488 return;
490 // Index the messages by subscription id.
491 std::map<std::string, std::vector<Message> > messages_by_subscription;
492 DVLOG(3) << "Dispatching " << messages.size() << " messages";
493 for (int m = 0; m < messages.size(); ++m) {
494 const RepeatedPtrField<std::string>& subscription_ids =
495 messages.Get(m).subscription_id();
496 for (int s = 0; s < subscription_ids.size(); ++s) {
497 messages_by_subscription[subscription_ids.Get(s)].push_back(
498 messages.Get(m).published_message());
502 // Send the messages for each subscription.
503 for (std::map<std::string, std::vector<Message> >::const_iterator
504 subscription = messages_by_subscription.begin();
505 subscription != messages_by_subscription.end();
506 ++subscription) {
507 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
508 // it in here and get rid of the app id registry from the main API class.
509 delegate_->HandleMessages("", subscription->first, subscription->second);
513 RequestHeader* RpcHandler::CreateRequestHeader(
514 const std::string& client_name) const {
515 RequestHeader* header = new RequestHeader;
517 header->set_allocated_framework_version(CreateVersion(
518 "Chrome", delegate_->GetPlatformVersionString()));
519 if (!client_name.empty()) {
520 header->set_allocated_client_version(
521 CreateVersion(client_name, std::string()));
523 header->set_current_time_millis(base::Time::Now().ToJsTime());
524 header->set_registered_device_id(device_id_);
526 DeviceFingerprint* fingerprint = new DeviceFingerprint;
527 fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
528 fingerprint->set_type(CHROME_PLATFORM_TYPE);
529 header->set_allocated_device_fingerprint(fingerprint);
531 return header;
534 template <class T>
535 void RpcHandler::SendServerRequest(
536 const std::string& rpc_name,
537 const std::string& app_id,
538 scoped_ptr<T> request,
539 const PostCleanupCallback& response_handler) {
540 request->set_allocated_header(CreateRequestHeader(app_id));
541 server_post_callback_.Run(delegate_->GetRequestContext(),
542 rpc_name,
543 make_scoped_ptr<MessageLite>(request.release()),
544 response_handler);
547 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
548 const std::string& rpc_name,
549 scoped_ptr<MessageLite> request_proto,
550 const PostCleanupCallback& callback) {
551 // Create the base URL to call.
552 CommandLine* command_line = CommandLine::ForCurrentProcess();
553 const std::string copresence_server_host =
554 command_line->HasSwitch(switches::kCopresenceServer) ?
555 command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
556 kDefaultCopresenceServer;
558 // Create the request and keep a pointer until it completes.
559 HttpPost* http_post = new HttpPost(
560 url_context_getter,
561 copresence_server_host,
562 rpc_name,
563 command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
564 delegate_->GetAPIKey(),
565 *request_proto);
567 http_post->Start(base::Bind(callback, http_post));
568 pending_posts_.insert(http_post);
571 void RpcHandler::AudioDirectiveListToWhispernetConnector(
572 const std::string& token,
573 bool audible,
574 const WhispernetClient::SamplesCallback& samples_callback) {
575 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
576 if (whispernet_client) {
577 whispernet_client->RegisterSamplesCallback(samples_callback);
578 whispernet_client->EncodeToken(token, audible);
582 } // namespace copresence