Only grant permissions to new extensions from sync if they have the expected version
[chromium-blink-merge.git] / tools / telemetry / third_party / webpagereplay / certutils.py
blob33236e030af4c76ea0ba15b292c78113dbc39317
1 # Copyright 2014 Google Inc. All Rights Reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Routines to generate root and server certificates.
17 Certificate Naming Conventions:
18 ca_cert: crypto.X509 for the certificate authority (w/ both the pub &
19 priv keys)
20 cert: a crypto.X509 certificate (w/ just the pub key)
21 cert_str: a certificate string (w/ just the pub cert)
22 key: a private crypto.PKey (from ca or pem)
23 ca_cert_str: a certificae authority string (w/ both the pub & priv certs)
24 """
26 import logging
27 import os
28 import socket
29 import time
31 openssl_import_error = None
33 Error = None
34 SSL_METHOD = None
35 SysCallError = None
36 VERIFY_PEER = None
37 ZeroReturnError = None
38 FILETYPE_PEM = None
40 try:
41 from OpenSSL import crypto, SSL
43 Error = SSL.Error
44 SSL_METHOD = SSL.SSLv23_METHOD
45 SysCallError = SSL.SysCallError
46 VERIFY_PEER = SSL.VERIFY_PEER
47 ZeroReturnError = SSL.ZeroReturnError
48 FILETYPE_PEM = crypto.FILETYPE_PEM
49 except ImportError, e:
50 openssl_import_error = e
53 def get_ssl_context(method=SSL_METHOD):
54 # One of: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD
55 if openssl_import_error:
56 raise openssl_import_error # pylint: disable=raising-bad-type
57 return SSL.Context(method)
60 class WrappedConnection(object):
62 def __init__(self, obj):
63 self._wrapped_obj = obj
65 def __getattr__(self, attr):
66 if attr in self.__dict__:
67 return getattr(self, attr)
68 return getattr(self._wrapped_obj, attr)
70 def recv(self, buflen=1024, flags=0):
71 try:
72 return self._wrapped_obj.recv(buflen, flags)
73 except SSL.SysCallError, e:
74 if e.args[1] == 'Unexpected EOF':
75 return ''
76 raise
77 except SSL.ZeroReturnError:
78 return ''
81 def get_ssl_connection(context, connection):
82 return WrappedConnection(SSL.Connection(context, connection))
85 def load_privatekey(key, filetype=FILETYPE_PEM):
86 """Loads obj private key object from string."""
87 return crypto.load_privatekey(filetype, key)
90 def load_cert(cert_str, filetype=FILETYPE_PEM):
91 """Loads obj cert object from string."""
92 return crypto.load_certificate(filetype, cert_str)
95 def _dump_privatekey(key, filetype=FILETYPE_PEM):
96 """Dumps obj private key object to string."""
97 return crypto.dump_privatekey(filetype, key)
100 def _dump_cert(cert, filetype=FILETYPE_PEM):
101 """Dumps obj cert object to string."""
102 return crypto.dump_certificate(filetype, cert)
105 def generate_dummy_ca_cert(subject='_WebPageReplayCert'):
106 """Generates dummy certificate authority.
108 Args:
109 subject: a string representing the desired root cert issuer
110 Returns:
111 A tuple of the public key and the private key strings for the root
112 certificate
114 if openssl_import_error:
115 raise openssl_import_error # pylint: disable=raising-bad-type
117 key = crypto.PKey()
118 key.generate_key(crypto.TYPE_RSA, 1024)
120 ca_cert = crypto.X509()
121 ca_cert.set_serial_number(int(time.time()*10000))
122 ca_cert.set_version(2)
123 ca_cert.get_subject().CN = subject
124 ca_cert.get_subject().O = subject
125 ca_cert.gmtime_adj_notBefore(-60 * 60 * 24 * 365 * 2)
126 ca_cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 2)
127 ca_cert.set_issuer(ca_cert.get_subject())
128 ca_cert.set_pubkey(key)
129 ca_cert.add_extensions([
130 crypto.X509Extension('basicConstraints', True, 'CA:TRUE'),
131 crypto.X509Extension('nsCertType', True, 'sslCA'),
132 crypto.X509Extension('extendedKeyUsage', True,
133 ('serverAuth,clientAuth,emailProtection,'
134 'timeStamping,msCodeInd,msCodeCom,msCTLSign,'
135 'msSGC,msEFS,nsSGC')),
136 crypto.X509Extension('keyUsage', False, 'keyCertSign, cRLSign'),
137 crypto.X509Extension('subjectKeyIdentifier', False, 'hash',
138 subject=ca_cert),
140 ca_cert.sign(key, 'sha1')
141 key_str = _dump_privatekey(key)
142 ca_cert_str = _dump_cert(ca_cert)
143 return ca_cert_str, key_str
146 def get_host_cert(host, port=443):
147 """Contacts the host and returns its certificate."""
148 host_certs = []
149 def verify_cb(conn, cert, errnum, depth, ok):
150 host_certs.append(cert)
151 # Return True to indicates that the certificate was ok.
152 return True
154 context = SSL.Context(SSL.SSLv23_METHOD)
155 context.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate
156 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
157 connection = SSL.Connection(context, s)
158 try:
159 connection.connect((host, port))
160 connection.send('')
161 except SSL.SysCallError:
162 pass
163 except socket.gaierror:
164 logging.debug('Host name is not valid')
165 finally:
166 connection.shutdown()
167 connection.close()
168 if not host_certs:
169 logging.warning('Unable to get host certificate from %s:%s', host, port)
170 return ''
171 return _dump_cert(host_certs[-1])
174 def write_dummy_ca_cert(ca_cert_str, key_str, cert_path):
175 """Writes four certificate files.
177 For example, if cert_path is "mycert.pem":
178 mycert.pem - CA plus private key
179 mycert-cert.pem - CA in PEM format
180 mycert-cert.cer - CA for Android
181 mycert-cert.p12 - CA in PKCS12 format for Windows devices
182 Args:
183 cert_path: path string such as "mycert.pem"
184 ca_cert_str: certificate string
185 key_str: private key string
187 dirname = os.path.dirname(cert_path)
188 if dirname and not os.path.exists(dirname):
189 os.makedirs(dirname)
191 root_path = os.path.splitext(cert_path)[0]
192 ca_cert_path = root_path + '-cert.pem'
193 android_cer_path = root_path + '-cert.cer'
194 windows_p12_path = root_path + '-cert.p12'
196 # Dump the CA plus private key
197 with open(cert_path, 'w') as f:
198 f.write(key_str)
199 f.write(ca_cert_str)
201 # Dump the certificate in PEM format
202 with open(ca_cert_path, 'w') as f:
203 f.write(ca_cert_str)
205 # Create a .cer file with the same contents for Android
206 with open(android_cer_path, 'w') as f:
207 f.write(ca_cert_str)
209 ca_cert = load_cert(ca_cert_str)
210 key = load_privatekey(key_str)
211 # Dump the certificate in PKCS12 format for Windows devices
212 with open(windows_p12_path, 'w') as f:
213 p12 = crypto.PKCS12()
214 p12.set_certificate(ca_cert)
215 p12.set_privatekey(key)
216 f.write(p12.export())
219 def generate_cert(root_ca_cert_str, server_cert_str, server_host):
220 """Generates a cert_str with the sni field in server_cert_str signed by the
221 root_ca_cert_str.
223 Args:
224 root_ca_cert_str: PEM formatted string representing the root cert
225 server_cert_str: PEM formatted string representing cert
226 server_host: host name to use if there is no server_cert_str
227 Returns:
228 a PEM formatted certificate string
230 if openssl_import_error:
231 raise openssl_import_error # pylint: disable=raising-bad-type
233 common_name = server_host
234 if server_cert_str:
235 cert = load_cert(server_cert_str)
236 common_name = cert.get_subject().commonName
237 else:
238 cert = crypto.X509()
240 ca_cert = load_cert(root_ca_cert_str)
241 key = load_privatekey(root_ca_cert_str)
243 req = crypto.X509Req()
244 req.get_subject().CN = common_name
245 req.set_pubkey(ca_cert.get_pubkey())
246 req.sign(key, 'sha1')
248 cert.gmtime_adj_notBefore(-60 * 60)
249 cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
250 cert.set_issuer(ca_cert.get_subject())
251 cert.set_subject(req.get_subject())
252 cert.set_serial_number(int(time.time()*10000))
253 cert.set_pubkey(req.get_pubkey())
254 cert.sign(key, 'sha1')
256 return _dump_cert(cert)