Extension syncing: Introduce a NeedsSync pref
[chromium-blink-merge.git] / components / copresence / rpc / rpc_handler.cc
blob22be6e2e5372ce90bea23fa84d279a4ac80b8b2b
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/copresence/rpc/rpc_handler.h"
7 #include "base/bind.h"
8 #include "base/command_line.h"
9 #include "base/logging.h"
10 #include "base/strings/string_util.h"
11 #include "base/strings/stringprintf.h"
13 // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
14 // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
15 // we fix this with an #undef.
16 #include "base/time/time.h"
17 #if defined(OS_WIN)
18 #undef DeviceCapabilities
19 #endif
21 #include "components/audio_modem/public/audio_modem_types.h"
22 #include "components/copresence/copresence_state_impl.h"
23 #include "components/copresence/copresence_switches.h"
24 #include "components/copresence/handlers/directive_handler.h"
25 #include "components/copresence/handlers/gcm_handler.h"
26 #include "components/copresence/proto/codes.pb.h"
27 #include "components/copresence/proto/data.pb.h"
28 #include "components/copresence/proto/rpcs.pb.h"
29 #include "components/copresence/public/copresence_constants.h"
30 #include "components/copresence/public/copresence_delegate.h"
31 #include "components/copresence/rpc/http_post.h"
32 #include "net/http/http_status_code.h"
34 using google::protobuf::MessageLite;
36 using audio_modem::AUDIBLE;
37 using audio_modem::AudioToken;
38 using audio_modem::INAUDIBLE;
40 // TODO(ckehoe): Return error messages for bad requests.
42 namespace copresence {
44 const char RpcHandler::kReportRequestRpcName[] = "report";
46 namespace {
48 const int kTokenLoggingSuffix = 5;
49 const int kInvalidTokenExpiryTimeMinutes = 10;
50 const int kMaxInvalidTokens = 10000;
51 const char kRegisterDeviceRpcName[] = "registerdevice";
52 const char kDefaultCopresenceServer[] =
53 "https://www.googleapis.com/copresence/v2/copresence";
55 // UrlSafe is defined as:
56 // '/' represented by a '_' and '+' represented by a '-'
57 // TODO(rkc): Move this to the wrapper.
58 std::string ToUrlSafe(std::string token) {
59 base::ReplaceChars(token, "+", "-", &token);
60 base::ReplaceChars(token, "/", "_", &token);
61 return token;
64 // Logging
66 // Checks for a copresence error. If there is one, logs it and returns true.
67 bool IsErrorStatus(const Status& status) {
68 if (status.code() != OK) {
69 LOG(ERROR) << "Copresence error code " << status.code()
70 << (status.message().empty() ? "" : ": " + status.message());
72 return status.code() != OK;
75 void LogIfErrorStatus(const util::error::Code& code,
76 const std::string& context) {
77 LOG_IF(ERROR, code != util::error::OK)
78 << context << " error " << code << ". See "
79 << "cs/google3/util/task/codes.proto for more info.";
82 // If any errors occurred, logs them and returns true.
83 bool ReportErrorLogged(const ReportResponse& response) {
84 bool result = IsErrorStatus(response.header().status());
86 // The Report fails or succeeds as a unit. If any responses had errors,
87 // the header will too. Thus we don't need to propagate individual errors.
88 if (response.has_update_signals_response())
89 LogIfErrorStatus(response.update_signals_response().status(), "Update");
90 if (response.has_manage_messages_response())
91 LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
92 if (response.has_manage_subscriptions_response()) {
93 LogIfErrorStatus(response.manage_subscriptions_response().status(),
94 "Subscribe");
97 return result;
100 const std::string LoggingStrForToken(const std::string& auth_token) {
101 if (auth_token.empty())
102 return "anonymous";
104 std::string token_suffix = auth_token.substr(
105 auth_token.length() - kTokenLoggingSuffix, kTokenLoggingSuffix);
106 return "token ..." + token_suffix;
110 // Request construction
112 template <typename T>
113 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
114 if (msg.has_token_exchange_strategy() &&
115 msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
116 return msg.token_exchange_strategy().broadcast_scan_configuration();
118 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
121 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
122 scoped_ptr<DeviceState> state(new DeviceState);
124 TokenTechnology* ultrasound =
125 state->mutable_capabilities()->add_token_technology();
126 ultrasound->set_medium(AUDIO_ULTRASOUND_PASSBAND);
127 ultrasound->add_instruction_type(TRANSMIT);
128 ultrasound->add_instruction_type(RECEIVE);
130 TokenTechnology* audible =
131 state->mutable_capabilities()->add_token_technology();
132 audible->set_medium(AUDIO_AUDIBLE_DTMF);
133 audible->add_instruction_type(TRANSMIT);
134 audible->add_instruction_type(RECEIVE);
136 return state.Pass();
139 // TODO(ckehoe): We're keeping this code in a separate function for now
140 // because we get a version string from Chrome, but the proto expects
141 // an int64 version. We should probably change the version proto
142 // to handle a more detailed version.
143 ClientVersion* CreateVersion(const std::string& client,
144 const std::string& version_name) {
145 ClientVersion* version = new ClientVersion;
146 version->set_client(client);
147 version->set_version_name(version_name);
148 return version;
151 void AddTokenToRequest(const AudioToken& token, ReportRequest* request) {
152 TokenObservation* token_observation =
153 request->mutable_update_signals_request()->add_token_observation();
154 token_observation->set_token_id(ToUrlSafe(token.token));
156 TokenSignals* signals = token_observation->add_signals();
157 signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
158 : AUDIO_ULTRASOUND_PASSBAND);
159 signals->set_observed_time_millis(base::Time::Now().ToJsTime());
162 } // namespace
165 // Public functions.
167 RpcHandler::RpcHandler(CopresenceDelegate* delegate,
168 DirectiveHandler* directive_handler,
169 CopresenceStateImpl* state,
170 GCMHandler* gcm_handler,
171 const MessagesCallback& new_messages_callback,
172 const PostCallback& server_post_callback)
173 : delegate_(delegate),
174 directive_handler_(directive_handler),
175 state_(state),
176 gcm_handler_(gcm_handler),
177 new_messages_callback_(new_messages_callback),
178 server_post_callback_(server_post_callback),
179 invalid_audio_token_cache_(
180 base::TimeDelta::FromMinutes(kInvalidTokenExpiryTimeMinutes),
181 kMaxInvalidTokens) {
182 DCHECK(delegate_);
183 DCHECK(directive_handler_);
184 // |gcm_handler_| is optional.
186 if (server_post_callback_.is_null()) {
187 server_post_callback_ =
188 base::Bind(&RpcHandler::SendHttpPost, base::Unretained(this));
191 if (gcm_handler_) {
192 gcm_handler_->GetGcmId(
193 base::Bind(&RpcHandler::RegisterGcmId, base::Unretained(this)));
197 RpcHandler::~RpcHandler() {
198 // TODO(ckehoe): Cancel the GCM callback?
199 for (HttpPost* post : pending_posts_)
200 delete post;
203 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
204 const std::string& app_id,
205 const std::string& auth_token,
206 const StatusCallback& status_callback) {
207 DCHECK(request.get());
209 // Check that the app, if any, has some kind of authentication token.
210 // Don't allow it to piggyback on Chrome's credentials.
211 if (!app_id.empty() && delegate_->GetAPIKey(app_id).empty() &&
212 auth_token.empty()) {
213 LOG(ERROR) << "App " << app_id << " has no API key or auth token";
214 status_callback.Run(FAIL);
215 return;
218 // Store just one auth token since we should have only one account
219 // per instance of the copresence component.
220 // TODO(ckehoe): We may eventually need to support multiple auth tokens.
221 const bool authenticated = !auth_token.empty();
222 if (authenticated && auth_token != auth_token_) {
223 LOG_IF(ERROR, !auth_token_.empty())
224 << "Overwriting old auth token: " << LoggingStrForToken(auth_token);
225 auth_token_ = auth_token;
228 // Check that we have a "device" registered for this authentication state.
229 bool queue_request;
230 const std::string device_id = delegate_->GetDeviceId(authenticated);
231 if (device_id.empty()) {
232 queue_request = true;
233 if (pending_registrations_.count(authenticated) == 0)
234 RegisterDevice(authenticated);
235 // else, registration is already in progress.
236 } else {
237 queue_request = false;
240 // We're not registered, or registration is in progress.
241 if (queue_request) {
242 pending_requests_queue_.push_back(new PendingRequest(
243 request.Pass(), app_id, authenticated, status_callback));
244 return;
247 DVLOG(3) << "Sending ReportRequest to server.";
249 // If we are unpublishing or unsubscribing, we need to stop those publish or
250 // subscribes right away, we don't need to wait for the server to tell us.
251 ProcessRemovedOperations(*request);
253 request->mutable_update_signals_request()->set_allocated_state(
254 GetDeviceCapabilities(*request).release());
256 AddPlayingTokens(request.get());
258 request->set_allocated_header(CreateRequestHeader(app_id, device_id));
259 server_post_callback_.Run(delegate_->GetRequestContext(),
260 kReportRequestRpcName,
261 delegate_->GetAPIKey(app_id),
262 auth_token,
263 make_scoped_ptr<MessageLite>(request.release()),
264 // On destruction, this request will be cancelled.
265 base::Bind(&RpcHandler::ReportResponseHandler,
266 base::Unretained(this),
267 status_callback));
270 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
271 DCHECK(!tokens.empty());
273 scoped_ptr<ReportRequest> request(new ReportRequest);
274 for (const AudioToken& token : tokens) {
275 if (invalid_audio_token_cache_.HasKey(ToUrlSafe(token.token)))
276 continue;
277 DVLOG(3) << "Sending token " << token.token << " to server";
278 AddTokenToRequest(token, request.get());
281 ReportOnAllDevices(request.Pass());
285 // Private functions.
287 RpcHandler::PendingRequest::PendingRequest(scoped_ptr<ReportRequest> report,
288 const std::string& app_id,
289 bool authenticated,
290 const StatusCallback& callback)
291 : report(report.Pass()),
292 app_id(app_id),
293 authenticated(authenticated),
294 callback(callback) {}
296 RpcHandler::PendingRequest::~PendingRequest() {}
298 void RpcHandler::RegisterDevice(const bool authenticated) {
299 DVLOG(2) << "Sending " << (authenticated ? "authenticated" : "anonymous")
300 << " registration to server.";
302 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
304 // Add a GCM ID for authenticated registration, if we have one.
305 if (!authenticated || gcm_id_.empty()) {
306 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
307 } else {
308 DVLOG(2) << "Registering GCM ID with " << LoggingStrForToken(auth_token_);
309 request->mutable_push_service()->set_service(GCM);
310 request->mutable_push_service()->mutable_gcm_registration()
311 ->set_device_token(gcm_id_);
314 // Only identify as a Chrome device if we're in anonymous mode.
315 // Authenticated calls come from a "GAIA device".
316 if (!authenticated) {
317 // Make sure this isn't a duplicate anonymous registration.
318 // Duplicate authenticated registrations are allowed, to update the GCM ID.
319 DCHECK(delegate_->GetDeviceId(false).empty())
320 << "Attempted anonymous re-registration";
322 Identity* identity =
323 request->mutable_device_identifiers()->mutable_registrant();
324 identity->set_type(CHROME);
327 bool gcm_pending = authenticated && gcm_handler_ && gcm_id_.empty();
328 pending_registrations_.insert(authenticated);
329 request->set_allocated_header(CreateRequestHeader(
330 // The device is empty on first registration.
331 // When re-registering to pass on the GCM ID, it will be present.
332 std::string(), delegate_->GetDeviceId(authenticated)));
333 if (authenticated)
334 DCHECK(!auth_token_.empty());
335 server_post_callback_.Run(delegate_->GetRequestContext(),
336 kRegisterDeviceRpcName,
337 std::string(),
338 authenticated ? auth_token_ : std::string(),
339 make_scoped_ptr<MessageLite>(request.release()),
340 // On destruction, this request will be cancelled.
341 base::Bind(&RpcHandler::RegisterResponseHandler,
342 base::Unretained(this),
343 authenticated,
344 gcm_pending));
347 void RpcHandler::ProcessQueuedRequests(const bool authenticated) {
348 // Track requests that are not in this auth state.
349 ScopedVector<PendingRequest> still_pending_requests;
351 // If there is no device ID for this auth state, registration failed.
352 bool registration_failed = delegate_->GetDeviceId(authenticated).empty();
354 // We momentarily take ownership of all the pointers in the queue.
355 // They are either deleted here or passed on to a new queue.
356 for (PendingRequest* request : pending_requests_queue_) {
357 if (request->authenticated == authenticated) {
358 if (registration_failed) {
359 request->callback.Run(FAIL);
360 } else {
361 if (request->authenticated)
362 DCHECK(!auth_token_.empty());
363 SendReportRequest(request->report.Pass(),
364 request->app_id,
365 request->authenticated ? auth_token_ : std::string(),
366 request->callback);
368 delete request;
369 } else {
370 // The request is in a different auth state.
371 still_pending_requests.push_back(request);
375 // Only keep the requests that weren't processed.
376 // All the pointers in the queue are now spoken for.
377 pending_requests_queue_.weak_clear();
378 pending_requests_queue_ = still_pending_requests.Pass();
381 void RpcHandler::ReportOnAllDevices(scoped_ptr<ReportRequest> request) {
382 std::vector<bool> auth_states;
383 if (!auth_token_.empty() && !delegate_->GetDeviceId(true).empty())
384 auth_states.push_back(true);
385 if (!delegate_->GetDeviceId(false).empty())
386 auth_states.push_back(false);
387 if (auth_states.empty()) {
388 VLOG(2) << "Skipping reporting because no device IDs are registered";
389 return;
392 for (bool authenticated : auth_states) {
393 SendReportRequest(make_scoped_ptr(new ReportRequest(*request)),
394 std::string(),
395 authenticated ? auth_token_ : std::string(),
396 StatusCallback());
400 // Store a GCM ID and send it to the server if needed. The constructor passes
401 // this callback to the GCMHandler to receive the ID whenever it's ready.
402 // It may be returned immediately, if the ID is cached, or require a server
403 // round-trip. This ID must then be passed along to the copresence server.
404 // There are a few ways this can happen:
406 // 1. The GCM ID is available when we first register, and is passed along
407 // with the RegisterDeviceRequest.
409 // 2. The GCM ID becomes available after the RegisterDeviceRequest has
410 // completed. Then this function will invoke RegisterDevice()
411 // again to pass on the ID.
413 // 3. The GCM ID becomes available after the RegisterDeviceRequest is sent,
414 // but before it completes. In this case, the gcm_pending flag is passed
415 // through to the RegisterResponseHandler, which invokes RegisterDevice()
416 // again to pass on the ID. This function must skip pending registrations,
417 // as the device ID will be empty.
419 // TODO(ckehoe): Add tests for these scenarios.
420 void RpcHandler::RegisterGcmId(const std::string& gcm_id) {
421 gcm_id_ = gcm_id;
422 if (!gcm_id.empty()) {
423 const std::string& device_id = delegate_->GetDeviceId(true);
424 if (!auth_token_.empty() && !device_id.empty())
425 RegisterDevice(true);
429 void RpcHandler::RegisterResponseHandler(
430 bool authenticated,
431 bool gcm_pending,
432 HttpPost* completed_post,
433 int http_status_code,
434 const std::string& response_data) {
435 if (completed_post) {
436 size_t elements_erased = pending_posts_.erase(completed_post);
437 DCHECK_GT(elements_erased, 0u);
438 delete completed_post;
441 size_t registrations_completed = pending_registrations_.erase(authenticated);
442 DCHECK_GT(registrations_completed, 0u);
444 RegisterDeviceResponse response;
445 const std::string token_str =
446 LoggingStrForToken(authenticated ? auth_token_ : std::string());
447 if (http_status_code != net::HTTP_OK) {
448 // TODO(ckehoe): Retry registration if appropriate.
449 LOG(ERROR) << token_str << " device registration failed";
450 } else if (!response.ParseFromString(response_data)) {
451 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
452 } else if (!IsErrorStatus(response.header().status())) {
453 const std::string& device_id = response.registered_device_id();
454 DCHECK(!device_id.empty());
455 delegate_->SaveDeviceId(authenticated, device_id);
456 DVLOG(2) << token_str << " device registration successful. Id: "
457 << device_id;
459 // If we have a GCM ID now, and didn't before, pass it on to the server.
460 if (gcm_pending && !gcm_id_.empty())
461 RegisterDevice(authenticated);
464 // Send or fail requests on this auth token.
465 ProcessQueuedRequests(authenticated);
468 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
469 HttpPost* completed_post,
470 int http_status_code,
471 const std::string& response_data) {
472 if (completed_post) {
473 size_t elements_erased = pending_posts_.erase(completed_post);
474 DCHECK_GT(elements_erased, 0u);
475 delete completed_post;
478 if (http_status_code != net::HTTP_OK) {
479 if (!status_callback.is_null())
480 status_callback.Run(FAIL);
481 return;
484 DVLOG(3) << "Received ReportResponse.";
485 ReportResponse response;
486 if (!response.ParseFromString(response_data)) {
487 LOG(ERROR) << "Invalid ReportResponse";
488 if (!status_callback.is_null())
489 status_callback.Run(FAIL);
490 return;
493 if (ReportErrorLogged(response)) {
494 if (!status_callback.is_null())
495 status_callback.Run(FAIL);
496 return;
499 for (const MessageResult& result :
500 response.manage_messages_response().published_message_result()) {
501 DVLOG(2) << "Published message with id " << result.published_message_id();
504 for (const SubscriptionResult& result :
505 response.manage_subscriptions_response().subscription_result()) {
506 DVLOG(2) << "Created subscription with id " << result.subscription_id();
509 if (response.has_update_signals_response()) {
510 const UpdateSignalsResponse& update_response =
511 response.update_signals_response();
512 new_messages_callback_.Run(update_response.message());
514 for (const Directive& directive : update_response.directive())
515 directive_handler_->AddDirective(directive);
517 for (const Token& token : update_response.token()) {
518 if (state_)
519 state_->UpdateTokenStatus(token.id(), token.status());
520 switch (token.status()) {
521 case VALID:
522 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
523 // short TTL (like 10s) and send it up with every report request.
524 // Then we'll still get messages while we're waiting to hear it again.
525 VLOG(1) << "Got valid token " << token.id();
526 break;
527 case INVALID:
528 DVLOG(3) << "Discarding invalid token " << token.id();
529 invalid_audio_token_cache_.Add(token.id(), true);
530 break;
531 default:
532 DVLOG(2) << "Token " << token.id() << " has status code "
533 << token.status();
538 // TODO(ckehoe): Return a more detailed status response.
539 if (!status_callback.is_null())
540 status_callback.Run(SUCCESS);
543 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
544 // Remove unpublishes.
545 if (request.has_manage_messages_request()) {
546 for (const std::string& unpublish :
547 request.manage_messages_request().id_to_unpublish()) {
548 directive_handler_->RemoveDirectives(unpublish);
552 // Remove unsubscribes.
553 if (request.has_manage_subscriptions_request()) {
554 for (const std::string& unsubscribe :
555 request.manage_subscriptions_request().id_to_unsubscribe()) {
556 directive_handler_->RemoveDirectives(unsubscribe);
561 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
562 const std::string& audible_token =
563 directive_handler_->GetCurrentAudioToken(AUDIBLE);
564 const std::string& inaudible_token =
565 directive_handler_->GetCurrentAudioToken(INAUDIBLE);
567 if (!audible_token.empty())
568 AddTokenToRequest(AudioToken(audible_token, true), request);
569 if (!inaudible_token.empty())
570 AddTokenToRequest(AudioToken(inaudible_token, false), request);
573 // TODO(ckehoe): Pass in the version string and
574 // group this with the local functions up top.
575 RequestHeader* RpcHandler::CreateRequestHeader(
576 const std::string& app_id,
577 const std::string& device_id) const {
578 RequestHeader* header = new RequestHeader;
580 header->set_allocated_framework_version(CreateVersion(
581 "Chrome", delegate_->GetPlatformVersionString()));
582 if (!app_id.empty())
583 header->set_allocated_client_version(CreateVersion(app_id, std::string()));
584 header->set_current_time_millis(base::Time::Now().ToJsTime());
585 if (!device_id.empty())
586 header->set_registered_device_id(device_id);
588 DeviceFingerprint* fingerprint = new DeviceFingerprint;
589 fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
590 fingerprint->set_type(CHROME_PLATFORM_TYPE);
591 header->set_allocated_device_fingerprint(fingerprint);
593 return header;
596 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
597 const std::string& rpc_name,
598 const std::string& api_key,
599 const std::string& auth_token,
600 scoped_ptr<MessageLite> request_proto,
601 const PostCleanupCallback& callback) {
602 // Create the base URL to call.
603 base::CommandLine* command_line = base::CommandLine::ForCurrentProcess();
604 const std::string copresence_server_host =
605 command_line->HasSwitch(switches::kCopresenceServer) ?
606 command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
607 kDefaultCopresenceServer;
609 // Create the request and keep a pointer until it completes.
610 HttpPost* http_post = new HttpPost(
611 url_context_getter,
612 copresence_server_host,
613 rpc_name,
614 api_key,
615 auth_token,
616 command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
617 *request_proto);
619 http_post->Start(base::Bind(callback, http_post));
620 pending_posts_.insert(http_post);
623 } // namespace copresence