Update mojo sdk to rev c02a28868825edfa57ab77947b8cb15e741c5598
[chromium-blink-merge.git] / components / proximity_auth / e2e_test / cryptauth.py
blob7373fa9fb8f2f6e498826766b3ee00b70af286b5
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 import httplib
6 import json
7 import logging
8 import pprint
9 import time
11 logger = logging.getLogger('proximity_auth.%s' % __name__)
13 _GOOGLE_APIS_URL = 'www.googleapis.com'
14 _REQUEST_PATH = '/cryptauth/v1/%s?alt=JSON'
16 class CryptAuthClient(object):
17 """ A client for making blocking CryptAuth API calls. """
19 def __init__(self, access_token, google_apis_url=_GOOGLE_APIS_URL):
20 self._access_token = access_token
21 self._google_apis_url = google_apis_url
23 def GetMyDevices(self):
24 """ Invokes the GetMyDevices API.
26 Returns:
27 A list of devices or None if the API call fails.
28 Each device is a dictionary of the deserialized JSON returned by
29 CryptAuth.
30 """
31 request_data = {
32 'approvedForUnlockRequired': False,
33 'allowStaleRead': False,
34 'invocationReason': 13 # REASON_MANUAL
36 response = self._SendRequest('deviceSync/getmydevices', request_data)
37 return response['devices'] if response is not None else None
39 def GetUnlockKey(self):
40 """
41 Returns:
42 The unlock key registered with CryptAuth if it exists or None.
43 The device is a dictionary of the deserialized JSON returned by CryptAuth.
44 """
45 devices = self.GetMyDevices()
46 if devices is None:
47 return None
49 for device in devices:
50 if device['unlockKey']:
51 return device
52 return None
54 def ToggleEasyUnlock(self, enable, public_key=''):
55 """ Calls the ToggleEasyUnlock API.
56 Args:
57 enable: True to designate the device specified by |public_key| as an
58 unlock key.
59 public_key: The public key of the device to toggle. Ignored if |enable| is
60 False, which toggles all unlock keys off.
61 Returns:
62 True upon success, else False.
63 """
64 request_data = { 'enable': enable, }
65 if not enable:
66 request_data['applyToAll'] = True
67 else:
68 request_data['publicKey'] = public_key
69 response = self._SendRequest('deviceSync/toggleeasyunlock', request_data)
70 return response is not None
72 def FindEligibleUnlockDevices(self, time_delta_millis=None):
73 """ Finds devices eligible to be an unlock key.
74 Args:
75 time_delta_millis: If specified, then only return eligible devices that
76 have contacted CryptAuth in the last time delta.
77 Returns:
78 A tuple containing two lists, one of eligible devices and the other of
79 ineligible devices.
80 Each device is a dictionary of the deserialized JSON returned by
81 CryptAuth.
82 """
83 request_data = {}
84 if time_delta_millis is not None:
85 request_data['maxLastUpdateTimeDeltaMillis'] = time_delta_millis * 1000;
87 response = self._SendRequest(
88 'deviceSync/findeligibleunlockdevices', request_data)
89 if response is None:
90 return None
91 eligibleDevices = (
92 response['eligibleDevices'] if 'eligibleDevices' in response else [])
93 ineligibleDevices = (
94 response['ineligibleDevices'] if (
95 'ineligibleDevices' in response) else [])
96 return eligibleDevices, ineligibleDevices
98 def PingPhones(self, timeout_secs=10):
99 """ Asks CryptAuth to ping registered phones and determine their status.
100 Args:
101 timeout_secs: The number of seconds to wait for phones to respond.
102 Returns:
103 A tuple containing two lists, one of eligible devices and the other of
104 ineligible devices.
105 Each device is a dictionary of the deserialized JSON returned by
106 CryptAuth.
108 response = self._SendRequest(
109 'deviceSync/senddevicesynctickle',
110 { 'tickleType': 'updateEnrollment' })
111 if response is None:
112 return None
113 # We wait for phones to update their status with CryptAuth.
114 logger.info('Waiting for %s seconds for phone status...' % timeout_secs)
115 time.sleep(timeout_secs)
116 return self.FindEligibleUnlockDevices(time_delta_millis=timeout_secs)
118 def _SendRequest(self, function_path, request_data):
119 """ Sends an HTTP request to CryptAuth and returns the deserialized
120 response.
122 conn = httplib.HTTPSConnection(self._google_apis_url)
123 path = _REQUEST_PATH % function_path
125 headers = {
126 'authorization': 'Bearer ' + self._access_token,
127 'Content-Type': 'application/json'
129 body = json.dumps(request_data)
130 logger.info('Making request to %s with body:\n%s' % (
131 path, pprint.pformat(request_data)))
132 conn.request('POST', path, body, headers)
134 response = conn.getresponse()
135 if response.status == 204:
136 return {}
137 if response.status != 200:
138 logger.warning('Request to %s failed: %s' % (path, response.status))
139 return None
140 return json.loads(response.read())