Add per-user preferences support.
[chromium-blink-merge.git] / third_party / tlslite / scripts / tls.py
blob48035ce2cd15c18503d2ff9cdde32d4096d68c3d
1 #!/usr/bin/env python
3 # Authors:
4 # Trevor Perrin
5 # Marcelo Fernandez - bugfix and NPN support
6 # Martin von Loewis - python 3 port
8 # See the LICENSE file for legal information regarding use of this file.
9 from __future__ import print_function
10 import sys
11 import os
12 import os.path
13 import socket
14 import time
15 import getopt
16 try:
17 import httplib
18 from SocketServer import *
19 from BaseHTTPServer import *
20 from SimpleHTTPServer import *
21 except ImportError:
22 # Python 3.x
23 from http import client as httplib
24 from socketserver import *
25 from http.server import *
27 if __name__ != "__main__":
28 raise "This must be run as a command, not used as a module!"
30 from tlslite.api import *
31 from tlslite import __version__
33 try:
34 from tack.structures.Tack import Tack
36 except ImportError:
37 pass
39 def printUsage(s=None):
40 if s:
41 print("ERROR: %s" % s)
43 print("")
44 print("Version: %s" % __version__)
45 print("")
46 print("RNG: %s" % prngName)
47 print("")
48 print("Modules:")
49 if tackpyLoaded:
50 print(" tackpy : Loaded")
51 else:
52 print(" tackpy : Not Loaded")
53 if m2cryptoLoaded:
54 print(" M2Crypto : Loaded")
55 else:
56 print(" M2Crypto : Not Loaded")
57 if pycryptoLoaded:
58 print(" pycrypto : Loaded")
59 else:
60 print(" pycrypto : Not Loaded")
61 if gmpyLoaded:
62 print(" GMPY : Loaded")
63 else:
64 print(" GMPY : Not Loaded")
66 print("")
67 print("""Commands:
69 server
70 [-k KEY] [-c CERT] [-t TACK] [-v VERIFIERDB] [-d DIR]
71 [--reqcert] HOST:PORT
73 client
74 [-k KEY] [-c CERT] [-u USER] [-p PASS]
75 HOST:PORT
76 """)
77 sys.exit(-1)
79 def printError(s):
80 """Print error message and exit"""
81 sys.stderr.write("ERROR: %s\n" % s)
82 sys.exit(-1)
85 def handleArgs(argv, argString, flagsList=[]):
86 # Convert to getopt argstring format:
87 # Add ":" after each arg, ie "abc" -> "a:b:c:"
88 getOptArgString = ":".join(argString) + ":"
89 try:
90 opts, argv = getopt.getopt(argv, getOptArgString, flagsList)
91 except getopt.GetoptError as e:
92 printError(e)
93 # Default values if arg not present
94 privateKey = None
95 certChain = None
96 username = None
97 password = None
98 tacks = None
99 verifierDB = None
100 reqCert = False
101 directory = None
103 for opt, arg in opts:
104 if opt == "-k":
105 s = open(arg, "rb").read()
106 privateKey = parsePEMKey(s, private=True)
107 elif opt == "-c":
108 s = open(arg, "rb").read()
109 x509 = X509()
110 x509.parse(s)
111 certChain = X509CertChain([x509])
112 elif opt == "-u":
113 username = arg
114 elif opt == "-p":
115 password = arg
116 elif opt == "-t":
117 if tackpyLoaded:
118 s = open(arg, "rU").read()
119 tacks = Tack.createFromPemList(s)
120 elif opt == "-v":
121 verifierDB = VerifierDB(arg)
122 verifierDB.open()
123 elif opt == "-d":
124 directory = arg
125 elif opt == "--reqcert":
126 reqCert = True
127 else:
128 assert(False)
130 if not argv:
131 printError("Missing address")
132 if len(argv)>1:
133 printError("Too many arguments")
134 #Split address into hostname/port tuple
135 address = argv[0]
136 address = address.split(":")
137 if len(address) != 2:
138 raise SyntaxError("Must specify <host>:<port>")
139 address = ( address[0], int(address[1]) )
141 # Populate the return list
142 retList = [address]
143 if "k" in argString:
144 retList.append(privateKey)
145 if "c" in argString:
146 retList.append(certChain)
147 if "u" in argString:
148 retList.append(username)
149 if "p" in argString:
150 retList.append(password)
151 if "t" in argString:
152 retList.append(tacks)
153 if "v" in argString:
154 retList.append(verifierDB)
155 if "d" in argString:
156 retList.append(directory)
157 if "reqcert" in flagsList:
158 retList.append(reqCert)
159 return retList
162 def printGoodConnection(connection, seconds):
163 print(" Handshake time: %.3f seconds" % seconds)
164 print(" Version: %s" % connection.getVersionName())
165 print(" Cipher: %s %s" % (connection.getCipherName(),
166 connection.getCipherImplementation()))
167 if connection.session.srpUsername:
168 print(" Client SRP username: %s" % connection.session.srpUsername)
169 if connection.session.clientCertChain:
170 print(" Client X.509 SHA1 fingerprint: %s" %
171 connection.session.clientCertChain.getFingerprint())
172 if connection.session.serverCertChain:
173 print(" Server X.509 SHA1 fingerprint: %s" %
174 connection.session.serverCertChain.getFingerprint())
175 if connection.session.serverName:
176 print(" SNI: %s" % connection.session.serverName)
177 if connection.session.tackExt:
178 if connection.session.tackInHelloExt:
179 emptyStr = "\n (via TLS Extension)"
180 else:
181 emptyStr = "\n (via TACK Certificate)"
182 print(" TACK: %s" % emptyStr)
183 print(str(connection.session.tackExt))
184 print(" Next-Protocol Negotiated: %s" % connection.next_proto)
187 def clientCmd(argv):
188 (address, privateKey, certChain, username, password) = \
189 handleArgs(argv, "kcup")
191 if (certChain and not privateKey) or (not certChain and privateKey):
192 raise SyntaxError("Must specify CERT and KEY together")
193 if (username and not password) or (not username and password):
194 raise SyntaxError("Must specify USER with PASS")
195 if certChain and username:
196 raise SyntaxError("Can use SRP or client cert for auth, not both")
198 #Connect to server
199 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
200 sock.settimeout(5)
201 sock.connect(address)
202 connection = TLSConnection(sock)
204 settings = HandshakeSettings()
205 settings.useExperimentalTackExtension = True
207 try:
208 start = time.clock()
209 if username and password:
210 connection.handshakeClientSRP(username, password,
211 settings=settings, serverName=address[0])
212 else:
213 connection.handshakeClientCert(certChain, privateKey,
214 settings=settings, serverName=address[0])
215 stop = time.clock()
216 print("Handshake success")
217 except TLSLocalAlert as a:
218 if a.description == AlertDescription.user_canceled:
219 print(str(a))
220 else:
221 raise
222 sys.exit(-1)
223 except TLSRemoteAlert as a:
224 if a.description == AlertDescription.unknown_psk_identity:
225 if username:
226 print("Unknown username")
227 else:
228 raise
229 elif a.description == AlertDescription.bad_record_mac:
230 if username:
231 print("Bad username or password")
232 else:
233 raise
234 elif a.description == AlertDescription.handshake_failure:
235 print("Unable to negotiate mutually acceptable parameters")
236 else:
237 raise
238 sys.exit(-1)
239 printGoodConnection(connection, stop-start)
240 connection.close()
243 def serverCmd(argv):
244 (address, privateKey, certChain, tacks,
245 verifierDB, directory, reqCert) = handleArgs(argv, "kctbvd", ["reqcert"])
248 if (certChain and not privateKey) or (not certChain and privateKey):
249 raise SyntaxError("Must specify CERT and KEY together")
250 if tacks and not certChain:
251 raise SyntaxError("Must specify CERT with Tacks")
253 print("I am an HTTPS test server, I will listen on %s:%d" %
254 (address[0], address[1]))
255 if directory:
256 os.chdir(directory)
257 print("Serving files from %s" % os.getcwd())
259 if certChain and privateKey:
260 print("Using certificate and private key...")
261 if verifierDB:
262 print("Using verifier DB...")
263 if tacks:
264 print("Using Tacks...")
266 #############
267 sessionCache = SessionCache()
269 class MyHTTPServer(ThreadingMixIn, TLSSocketServerMixIn, HTTPServer):
270 def handshake(self, connection):
271 print("About to handshake...")
272 activationFlags = 0
273 if tacks:
274 if len(tacks) == 1:
275 activationFlags = 1
276 elif len(tacks) == 2:
277 activationFlags = 3
279 try:
280 start = time.clock()
281 settings = HandshakeSettings()
282 settings.useExperimentalTackExtension=True
283 connection.handshakeServer(certChain=certChain,
284 privateKey=privateKey,
285 verifierDB=verifierDB,
286 tacks=tacks,
287 activationFlags=activationFlags,
288 sessionCache=sessionCache,
289 settings=settings,
290 nextProtos=[b"http/1.1"])
291 # As an example (does not work here):
292 #nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
293 stop = time.clock()
294 except TLSRemoteAlert as a:
295 if a.description == AlertDescription.user_canceled:
296 print(str(a))
297 return False
298 else:
299 raise
300 except TLSLocalAlert as a:
301 if a.description == AlertDescription.unknown_psk_identity:
302 if username:
303 print("Unknown username")
304 return False
305 else:
306 raise
307 elif a.description == AlertDescription.bad_record_mac:
308 if username:
309 print("Bad username or password")
310 return False
311 else:
312 raise
313 elif a.description == AlertDescription.handshake_failure:
314 print("Unable to negotiate mutually acceptable parameters")
315 return False
316 else:
317 raise
319 connection.ignoreAbruptClose = True
320 printGoodConnection(connection, stop-start)
321 return True
323 httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
324 httpd.serve_forever()
327 if __name__ == '__main__':
328 if len(sys.argv) < 2:
329 printUsage("Missing command")
330 elif sys.argv[1] == "client"[:len(sys.argv[1])]:
331 clientCmd(sys.argv[2:])
332 elif sys.argv[1] == "server"[:len(sys.argv[1])]:
333 serverCmd(sys.argv[2:])
334 else:
335 printUsage("Unknown command: %s" % sys.argv[1])