Roll src/third_party/WebKit bf18a82:a9cee16 (svn 185297:185304)
[chromium-blink-merge.git] / components / copresence / rpc / rpc_handler.cc
blob78bd95835f1989bb4b3c09dba344cbd407cbdb18
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/copresence/rpc/rpc_handler.h"
7 #include "base/bind.h"
8 #include "base/command_line.h"
9 #include "base/guid.h"
10 #include "base/logging.h"
11 #include "base/strings/string_util.h"
12 #include "base/strings/stringprintf.h"
14 // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
15 // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
16 // we fix this with an #undef.
17 #include "base/time/time.h"
18 #if defined(OS_WIN)
19 #undef DeviceCapabilities
20 #endif
22 #include "components/copresence/copresence_switches.h"
23 #include "components/copresence/handlers/directive_handler.h"
24 #include "components/copresence/handlers/gcm_handler.h"
25 #include "components/copresence/proto/codes.pb.h"
26 #include "components/copresence/proto/data.pb.h"
27 #include "components/copresence/proto/rpcs.pb.h"
28 #include "components/copresence/public/copresence_constants.h"
29 #include "components/copresence/public/copresence_delegate.h"
30 #include "components/copresence/rpc/http_post.h"
31 #include "net/http/http_status_code.h"
33 // TODO(ckehoe): Return error messages for bad requests.
35 namespace copresence {
37 using google::protobuf::MessageLite;
38 using google::protobuf::RepeatedPtrField;
40 const char RpcHandler::kReportRequestRpcName[] = "report";
42 namespace {
44 const int kTokenLoggingSuffix = 5;
45 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
46 const int kMaxInvalidTokens = 10000;
47 const char kRegisterDeviceRpcName[] = "registerdevice";
48 const char kDefaultCopresenceServer[] =
49 "https://www.googleapis.com/copresence/v2/copresence";
51 // UrlSafe is defined as:
52 // '/' represented by a '_' and '+' represented by a '-'
53 // TODO(rkc): Move this to the wrapper.
54 std::string ToUrlSafe(std::string token) {
55 base::ReplaceChars(token, "+", "-", &token);
56 base::ReplaceChars(token, "/", "_", &token);
57 return token;
61 // Logging
63 // Checks for a copresence error. If there is one, logs it and returns true.
64 bool IsErrorStatus(const Status& status) {
65 if (status.code() != OK) {
66 LOG(ERROR) << "Copresence error code " << status.code()
67 << (status.message().empty() ? "" : ": " + status.message());
69 return status.code() != OK;
72 void LogIfErrorStatus(const util::error::Code& code,
73 const std::string& context) {
74 LOG_IF(ERROR, code != util::error::OK)
75 << context << " error " << code << ". See "
76 << "cs/google3/util/task/codes.proto for more info.";
79 // If any errors occurred, logs them and returns true.
80 bool ReportErrorLogged(const ReportResponse& response) {
81 bool result = IsErrorStatus(response.header().status());
83 // The Report fails or succeeds as a unit. If any responses had errors,
84 // the header will too. Thus we don't need to propagate individual errors.
85 if (response.has_update_signals_response())
86 LogIfErrorStatus(response.update_signals_response().status(), "Update");
87 if (response.has_manage_messages_response())
88 LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
89 if (response.has_manage_subscriptions_response()) {
90 LogIfErrorStatus(response.manage_subscriptions_response().status(),
91 "Subscribe");
94 return result;
97 const std::string LoggingStrForToken(const std::string& auth_token) {
98 if (auth_token.empty())
99 return "anonymous";
101 std::string token_suffix = auth_token.substr(
102 auth_token.length() - kTokenLoggingSuffix, kTokenLoggingSuffix);
103 return base::StringPrintf("token ...%s", token_suffix.c_str());
107 // Request construction
108 // TODO(ckehoe): Move these into a separate file?
110 template <typename T>
111 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
112 if (msg.has_token_exchange_strategy() &&
113 msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
114 return msg.token_exchange_strategy().broadcast_scan_configuration();
116 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
119 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
120 scoped_ptr<DeviceState> state(new DeviceState);
122 TokenTechnology* ultrasound =
123 state->mutable_capabilities()->add_token_technology();
124 ultrasound->set_medium(AUDIO_ULTRASOUND_PASSBAND);
125 ultrasound->add_instruction_type(TRANSMIT);
126 ultrasound->add_instruction_type(RECEIVE);
128 TokenTechnology* audible =
129 state->mutable_capabilities()->add_token_technology();
130 audible->set_medium(AUDIO_AUDIBLE_DTMF);
131 audible->add_instruction_type(TRANSMIT);
132 audible->add_instruction_type(RECEIVE);
134 return state.Pass();
137 // TODO(ckehoe): We're keeping this code in a separate function for now
138 // because we get a version string from Chrome, but the proto expects
139 // an int64 version. We should probably change the version proto
140 // to handle a more detailed version.
141 ClientVersion* CreateVersion(const std::string& client,
142 const std::string& version_name) {
143 ClientVersion* version = new ClientVersion;
145 version->set_client(client);
146 version->set_version_name(version_name);
148 return version;
151 void AddTokenToRequest(const AudioToken& token, ReportRequest* request) {
152 TokenObservation* token_observation =
153 request->mutable_update_signals_request()->add_token_observation();
154 token_observation->set_token_id(ToUrlSafe(token.token));
156 TokenSignals* signals = token_observation->add_signals();
157 signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
158 : AUDIO_ULTRASOUND_PASSBAND);
159 signals->set_observed_time_millis(base::Time::Now().ToJsTime());
162 } // namespace
165 // Public functions.
167 RpcHandler::RpcHandler(CopresenceDelegate* delegate,
168 DirectiveHandler* directive_handler,
169 GCMHandler* gcm_handler,
170 const PostCallback& server_post_callback)
171 : delegate_(delegate),
172 directive_handler_(directive_handler),
173 gcm_handler_(gcm_handler),
174 server_post_callback_(server_post_callback),
175 invalid_audio_token_cache_(
176 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
177 kMaxInvalidTokens) {
178 DCHECK(delegate_);
179 DCHECK(directive_handler_);
180 // |gcm_handler_| is optional.
182 if (server_post_callback_.is_null()) {
183 server_post_callback_ =
184 base::Bind(&RpcHandler::SendHttpPost, base::Unretained(this));
187 if (gcm_handler_) {
188 gcm_handler_->GetGcmId(
189 base::Bind(&RpcHandler::RegisterGcmId, base::Unretained(this)));
193 RpcHandler::~RpcHandler() {
194 // Do not use |directive_handler_| or |gcm_handler_| here.
195 // They will already have been destructed.
196 for (HttpPost* post : pending_posts_)
197 delete post;
200 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
201 const std::string& app_id,
202 const std::string& auth_token,
203 const StatusCallback& status_callback) {
204 DCHECK(request.get());
206 // Check that we have a "device" registered for this auth token.
207 bool queue_request = true;
208 const auto& registration = device_id_by_auth_token_.find(auth_token);
209 if (registration == device_id_by_auth_token_.end()) {
210 // Not registered.
211 RegisterForToken(auth_token);
212 } else if (!registration->second.empty()) {
213 // Registration complete.
214 queue_request = false;
217 // We're not registered, or registration is in progress.
218 if (queue_request) {
219 pending_requests_queue_.push_back(new PendingRequest(
220 request.Pass(), app_id, auth_token, status_callback));
221 return;
224 DVLOG(3) << "Sending ReportRequest to server.";
226 // If we are unpublishing or unsubscribing, we need to stop those publish or
227 // subscribes right away, we don't need to wait for the server to tell us.
228 ProcessRemovedOperations(*request);
230 request->mutable_update_signals_request()->set_allocated_state(
231 GetDeviceCapabilities(*request).release());
233 AddPlayingTokens(request.get());
235 SendServerRequest(kReportRequestRpcName,
236 registration->second,
237 app_id,
238 auth_token,
239 request.Pass(),
240 // On destruction, this request will be cancelled.
241 base::Bind(&RpcHandler::ReportResponseHandler,
242 base::Unretained(this),
243 status_callback));
246 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
247 DCHECK(!tokens.empty());
249 if (device_id_by_auth_token_.empty()) {
250 VLOG(2) << "Skipping token reporting because no device IDs are registered";
251 return;
254 // Construct the ReportRequest.
255 ReportRequest request;
256 for (const AudioToken& token : tokens) {
257 if (invalid_audio_token_cache_.HasKey(ToUrlSafe(token.token)))
258 continue;
259 DVLOG(3) << "Sending token " << token.token << " to server under "
260 << device_id_by_auth_token_.size() << " device ID(s)";
261 AddTokenToRequest(token, &request);
264 // Report under all active tokens.
265 for (const auto& registration : device_id_by_auth_token_) {
266 SendReportRequest(make_scoped_ptr(new ReportRequest(request)),
267 registration.first);
272 // Private functions.
274 RpcHandler::PendingRequest::PendingRequest(scoped_ptr<ReportRequest> report,
275 const std::string& app_id,
276 const std::string& auth_token,
277 const StatusCallback& callback)
278 : report(report.Pass()),
279 app_id(app_id),
280 auth_token(auth_token),
281 callback(callback) {}
283 RpcHandler::PendingRequest::~PendingRequest() {}
285 void RpcHandler::RegisterForToken(const std::string& auth_token) {
286 DVLOG(2) << "Sending " << LoggingStrForToken(auth_token)
287 << " registration to server.";
289 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
291 // Add a GCM ID for authenticated registration, if we have one.
292 if (auth_token.empty() || gcm_id_.empty()) {
293 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
294 } else {
295 DVLOG(2) << "Registering GCM ID with " << LoggingStrForToken(auth_token);
296 request->mutable_push_service()->set_service(GCM);
297 request->mutable_push_service()->mutable_gcm_registration()
298 ->set_device_token(gcm_id_);
301 // Only identify as a Chrome device if we're in anonymous mode.
302 // Authenticated calls come from a "GAIA device".
303 if (auth_token.empty()) {
304 Identity* identity =
305 request->mutable_device_identifiers()->mutable_registrant();
306 identity->set_type(CHROME);
307 identity->set_chrome_id(base::GenerateGUID());
309 // Since we're generating a new "Chrome ID" here,
310 // we need to make sure this isn't a duplicate registration.
311 DCHECK_EQ(0u, device_id_by_auth_token_.count(std::string()))
312 << "Attempted anonymous re-registration";
315 bool gcm_pending = !auth_token.empty() && gcm_handler_ && gcm_id_.empty();
316 SendServerRequest(
317 kRegisterDeviceRpcName,
318 // This will have the side effect of populating an empty device ID
319 // for this auth token in the map. This is what we want,
320 // to mark registration as being in progress.
321 device_id_by_auth_token_[auth_token],
322 std::string(), // app ID
323 auth_token,
324 request.Pass(),
325 base::Bind(&RpcHandler::RegisterResponseHandler,
326 // On destruction, this request will be cancelled.
327 base::Unretained(this),
328 auth_token,
329 gcm_pending));
332 void RpcHandler::ProcessQueuedRequests(const std::string& auth_token) {
333 // Track requests that are not on this auth token.
334 ScopedVector<PendingRequest> still_pending_requests;
336 // If there is no device ID for this auth token, registration failed.
337 bool registration_failed =
338 (device_id_by_auth_token_.count(auth_token) == 0);
340 // We momentarily take ownership of all the pointers in the queue.
341 // They are either deleted here or passed on to a new queue.
342 for (PendingRequest* request : pending_requests_queue_) {
343 if (request->auth_token == auth_token) {
344 if (registration_failed) {
345 request->callback.Run(FAIL);
346 } else {
347 SendReportRequest(request->report.Pass(),
348 request->app_id,
349 request->auth_token,
350 request->callback);
352 delete request;
353 } else {
354 // The request is on a different auth token.
355 still_pending_requests.push_back(request);
359 // Only keep the requests that weren't processed.
360 // All the pointers in the queue are now spoken for.
361 pending_requests_queue_.weak_clear();
362 pending_requests_queue_ = still_pending_requests.Pass();
365 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
366 const std::string& auth_token) {
367 SendReportRequest(request.Pass(),
368 std::string(),
369 auth_token,
370 StatusCallback());
373 // Store a GCM ID and send it to the server if needed. The constructor passes
374 // this callback to the GCMHandler to receive the ID whenever it's ready.
375 // It may be returned immediately, if the ID is cached, or require a server
376 // round-trip. This ID must then be passed along to the copresence server.
377 // There are a few ways this can happen for each auth token:
379 // 1. The GCM ID is available when we first register, and is passed along
380 // with the RegisterDeviceRequest.
382 // 2. The GCM ID becomes available after the RegisterDeviceRequest has
383 // completed. Then the loop in this function will invoke RegisterForToken()
384 // again to pass on the ID.
386 // 3. The GCM ID becomes available after the RegisterDeviceRequest is sent,
387 // but before it completes. In this case, the gcm_pending flag is passed
388 // through to the RegisterResponseHandler, which invokes RegisterForToken()
389 // again to pass on the ID. The loop here must skip pending registrations,
390 // as the device ID will be empty.
392 // TODO(ckehoe): Add tests for these scenarios.
393 void RpcHandler::RegisterGcmId(const std::string& gcm_id) {
394 gcm_id_ = gcm_id;
395 if (!gcm_id.empty()) {
396 for (const auto& registration : device_id_by_auth_token_) {
397 const std::string& auth_token = registration.first;
398 const std::string& device_id = registration.second;
399 if (!auth_token.empty() && !device_id.empty())
400 RegisterForToken(auth_token);
405 void RpcHandler::RegisterResponseHandler(
406 const std::string& auth_token,
407 bool gcm_pending,
408 HttpPost* completed_post,
409 int http_status_code,
410 const std::string& response_data) {
411 if (completed_post) {
412 int elements_erased = pending_posts_.erase(completed_post);
413 DCHECK_GT(elements_erased, 0);
414 delete completed_post;
417 // Registration is no longer in progress.
418 // If it was successful, we'll update below.
419 device_id_by_auth_token_.erase(auth_token);
421 RegisterDeviceResponse response;
422 if (http_status_code != net::HTTP_OK) {
423 // TODO(ckehoe): Retry registration if appropriate.
424 LOG(ERROR) << LoggingStrForToken(auth_token)
425 << " device registration failed";
426 } else if (!response.ParseFromString(response_data)) {
427 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
428 } else if (!IsErrorStatus(response.header().status())) {
429 const std::string& device_id = response.registered_device_id();
430 DCHECK(!device_id.empty());
431 device_id_by_auth_token_[auth_token] = device_id;
432 DVLOG(2) << LoggingStrForToken(auth_token)
433 << " device registration successful. Id: " << device_id;
435 // If we have a GCM ID now, and didn't before, pass it on to the server.
436 if (gcm_pending && !gcm_id_.empty())
437 RegisterForToken(auth_token);
440 // Send or fail requests on this auth token.
441 ProcessQueuedRequests(auth_token);
444 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
445 HttpPost* completed_post,
446 int http_status_code,
447 const std::string& response_data) {
448 if (completed_post) {
449 int elements_erased = pending_posts_.erase(completed_post);
450 DCHECK(elements_erased);
451 delete completed_post;
454 if (http_status_code != net::HTTP_OK) {
455 if (!status_callback.is_null())
456 status_callback.Run(FAIL);
457 return;
460 DVLOG(3) << "Received ReportResponse.";
461 ReportResponse response;
462 if (!response.ParseFromString(response_data)) {
463 LOG(ERROR) << "Invalid ReportResponse";
464 if (!status_callback.is_null())
465 status_callback.Run(FAIL);
466 return;
469 if (ReportErrorLogged(response)) {
470 if (!status_callback.is_null())
471 status_callback.Run(FAIL);
472 return;
475 for (const MessageResult& result :
476 response.manage_messages_response().published_message_result()) {
477 DVLOG(2) << "Published message with id " << result.published_message_id();
480 for (const SubscriptionResult& result :
481 response.manage_subscriptions_response().subscription_result()) {
482 DVLOG(2) << "Created subscription with id " << result.subscription_id();
485 if (response.has_update_signals_response()) {
486 const UpdateSignalsResponse& update_response =
487 response.update_signals_response();
488 DispatchMessages(update_response.message());
490 for (const Directive& directive : update_response.directive())
491 directive_handler_->AddDirective(directive);
493 for (const Token& token : update_response.token()) {
494 switch (token.status()) {
495 case VALID:
496 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
497 // short TTL (like 10s) and send it up with every report request.
498 // Then we'll still get messages while we're waiting to hear it again.
499 VLOG(1) << "Got valid token " << token.id();
500 break;
501 case INVALID:
502 DVLOG(3) << "Discarding invalid token " << token.id();
503 invalid_audio_token_cache_.Add(token.id(), true);
504 break;
505 default:
506 DVLOG(2) << "Token " << token.id() << " has status code "
507 << token.status();
512 // TODO(ckehoe): Return a more detailed status response.
513 if (!status_callback.is_null())
514 status_callback.Run(SUCCESS);
517 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
518 // Remove unpublishes.
519 if (request.has_manage_messages_request()) {
520 for (const std::string& unpublish :
521 request.manage_messages_request().id_to_unpublish()) {
522 directive_handler_->RemoveDirectives(unpublish);
526 // Remove unsubscribes.
527 if (request.has_manage_subscriptions_request()) {
528 for (const std::string& unsubscribe :
529 request.manage_subscriptions_request().id_to_unsubscribe()) {
530 directive_handler_->RemoveDirectives(unsubscribe);
535 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
536 const std::string& audible_token =
537 directive_handler_->GetCurrentAudioToken(AUDIBLE);
538 const std::string& inaudible_token =
539 directive_handler_->GetCurrentAudioToken(INAUDIBLE);
541 if (!audible_token.empty())
542 AddTokenToRequest(AudioToken(audible_token, true), request);
543 if (!inaudible_token.empty())
544 AddTokenToRequest(AudioToken(inaudible_token, false), request);
547 void RpcHandler::DispatchMessages(
548 const RepeatedPtrField<SubscribedMessage>& messages) {
549 if (messages.size() == 0)
550 return;
552 // Index the messages by subscription id.
553 std::map<std::string, std::vector<Message>> messages_by_subscription;
554 DVLOG(3) << "Dispatching " << messages.size() << " messages";
555 for (const SubscribedMessage& message : messages) {
556 for (const std::string& subscription_id : message.subscription_id()) {
557 messages_by_subscription[subscription_id].push_back(
558 message.published_message());
562 // Send the messages for each subscription.
563 for (const auto& map_entry : messages_by_subscription) {
564 // TODO(ckehoe): Once we have the app ID from the server, we need to pass
565 // it in here and get rid of the app id registry from the main API class.
566 const std::string& subscription = map_entry.first;
567 const std::vector<Message>& messages = map_entry.second;
568 delegate_->HandleMessages(std::string(), subscription, messages);
572 // TODO(ckehoe): Pass in the version string and
573 // group this with the local functions up top.
574 RequestHeader* RpcHandler::CreateRequestHeader(
575 const std::string& client_name,
576 const std::string& device_id) const {
577 RequestHeader* header = new RequestHeader;
579 header->set_allocated_framework_version(CreateVersion(
580 "Chrome", delegate_->GetPlatformVersionString()));
581 if (!client_name.empty()) {
582 header->set_allocated_client_version(
583 CreateVersion(client_name, std::string()));
585 header->set_current_time_millis(base::Time::Now().ToJsTime());
586 if (!device_id.empty())
587 header->set_registered_device_id(device_id);
589 DeviceFingerprint* fingerprint = new DeviceFingerprint;
590 fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
591 fingerprint->set_type(CHROME_PLATFORM_TYPE);
592 header->set_allocated_device_fingerprint(fingerprint);
594 return header;
597 template <class T>
598 void RpcHandler::SendServerRequest(
599 const std::string& rpc_name,
600 const std::string& device_id,
601 const std::string& app_id,
602 const std::string& auth_token,
603 scoped_ptr<T> request,
604 const PostCleanupCallback& response_handler) {
605 request->set_allocated_header(CreateRequestHeader(app_id, device_id));
606 server_post_callback_.Run(delegate_->GetRequestContext(),
607 rpc_name,
608 delegate_->GetAPIKey(app_id),
609 auth_token,
610 make_scoped_ptr<MessageLite>(request.release()),
611 response_handler);
614 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
615 const std::string& rpc_name,
616 const std::string& api_key,
617 const std::string& auth_token,
618 scoped_ptr<MessageLite> request_proto,
619 const PostCleanupCallback& callback) {
620 // Create the base URL to call.
621 CommandLine* command_line = CommandLine::ForCurrentProcess();
622 const std::string copresence_server_host =
623 command_line->HasSwitch(switches::kCopresenceServer) ?
624 command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
625 kDefaultCopresenceServer;
627 // Create the request and keep a pointer until it completes.
628 HttpPost* http_post = new HttpPost(
629 url_context_getter,
630 copresence_server_host,
631 rpc_name,
632 api_key,
633 auth_token,
634 command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
635 *request_proto);
637 http_post->Start(base::Bind(callback, http_post));
638 pending_posts_.insert(http_post);
641 } // namespace copresence