Separate Simple Backend creation from initialization.
[chromium-blink-merge.git] / third_party / tlslite / scripts / tls.py
blobfa2c663f3b06bff1a2567e240e0eb5a110f2416b
1 #! python
3 import sys
4 import os
5 import os.path
6 import socket
7 import thread
8 import time
9 import httplib
10 import BaseHTTPServer
11 import SimpleHTTPServer
14 try:
15 from cryptoIDlib.api import *
16 cryptoIDlibLoaded = True
17 except:
18 cryptoIDlibLoaded = False
20 if __name__ != "__main__":
21 raise "This must be run as a command, not used as a module!"
23 #import tlslite
24 #from tlslite.constants import AlertDescription, Fault
26 #from tlslite.utils.jython_compat import formatExceptionTrace
27 #from tlslite.X509 import X509, X509CertChain
29 from tlslite.api import *
31 def parsePrivateKey(s):
32 try:
33 return parsePEMKey(s, private=True)
34 except Exception, e:
35 print e
36 return parseXMLKey(s, private=True)
39 def clientTest(address, dir):
41 #Split address into hostname/port tuple
42 address = address.split(":")
43 if len(address)==1:
44 address.append("4443")
45 address = ( address[0], int(address[1]) )
47 def connect():
48 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
49 if hasattr(sock, 'settimeout'): #It's a python 2.3 feature
50 sock.settimeout(5)
51 sock.connect(address)
52 c = TLSConnection(sock)
53 return c
55 test = 0
57 badFault = False
59 print "Test 1 - good shared key"
60 connection = connect()
61 connection.handshakeClientSharedKey("shared", "key")
62 connection.close()
63 connection.sock.close()
65 print "Test 2 - shared key faults"
66 for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
67 connection = connect()
68 connection.fault = fault
69 try:
70 connection.handshakeClientSharedKey("shared", "key")
71 print " Good Fault %s" % (Fault.faultNames[fault])
72 except TLSFaultError, e:
73 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
74 badFault = True
75 connection.sock.close()
77 print "Test 3 - good SRP"
78 connection = connect()
79 connection.handshakeClientSRP("test", "password")
80 connection.close()
82 print "Test 4 - SRP faults"
83 for fault in Fault.clientSrpFaults + Fault.genericFaults:
84 connection = connect()
85 connection.fault = fault
86 try:
87 connection.handshakeClientSRP("test", "password")
88 print " Good Fault %s" % (Fault.faultNames[fault])
89 except TLSFaultError, e:
90 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
91 badFault = True
92 connection.sock.close()
94 print "Test 5 - good SRP: unknown_srp_username idiom"
95 def srpCallback():
96 return ("test", "password")
97 connection = connect()
98 connection.handshakeClientUnknown(srpCallback=srpCallback)
99 connection.close()
100 connection.sock.close()
102 print "Test 6 - good SRP: with X.509 certificate"
103 connection = connect()
104 connection.handshakeClientSRP("test", "password")
105 assert(isinstance(connection.session.serverCertChain, X509CertChain))
106 connection.close()
107 connection.sock.close()
109 print "Test 7 - X.509 with SRP faults"
110 for fault in Fault.clientSrpFaults + Fault.genericFaults:
111 connection = connect()
112 connection.fault = fault
113 try:
114 connection.handshakeClientSRP("test", "password")
115 print " Good Fault %s" % (Fault.faultNames[fault])
116 except TLSFaultError, e:
117 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
118 badFault = True
119 connection.sock.close()
121 if cryptoIDlibLoaded:
122 print "Test 8 - good SRP: with cryptoID certificate chain"
123 connection = connect()
124 connection.handshakeClientSRP("test", "password")
125 assert(isinstance(connection.session.serverCertChain, CertChain))
126 if not (connection.session.serverCertChain.validate()):
127 print connection.session.serverCertChain.validate(listProblems=True)
129 connection.close()
130 connection.sock.close()
132 print "Test 9 - CryptoID with SRP faults"
133 for fault in Fault.clientSrpFaults + Fault.genericFaults:
134 connection = connect()
135 connection.fault = fault
136 try:
137 connection.handshakeClientSRP("test", "password")
138 print " Good Fault %s" % (Fault.faultNames[fault])
139 except TLSFaultError, e:
140 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
141 badFault = True
142 connection.sock.close()
144 print "Test 10 - good X509"
145 connection = connect()
146 connection.handshakeClientCert()
147 assert(isinstance(connection.session.serverCertChain, X509CertChain))
148 connection.close()
149 connection.sock.close()
151 print "Test 10.a - good X509, SSLv3"
152 connection = connect()
153 settings = HandshakeSettings()
154 settings.minVersion = (3,0)
155 settings.maxVersion = (3,0)
156 connection.handshakeClientCert(settings=settings)
157 assert(isinstance(connection.session.serverCertChain, X509CertChain))
158 connection.close()
159 connection.sock.close()
161 print "Test 11 - X.509 faults"
162 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
163 connection = connect()
164 connection.fault = fault
165 try:
166 connection.handshakeClientCert()
167 print " Good Fault %s" % (Fault.faultNames[fault])
168 except TLSFaultError, e:
169 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
170 badFault = True
171 connection.sock.close()
173 if cryptoIDlibLoaded:
174 print "Test 12 - good cryptoID"
175 connection = connect()
176 connection.handshakeClientCert()
177 assert(isinstance(connection.session.serverCertChain, CertChain))
178 assert(connection.session.serverCertChain.validate())
179 connection.close()
180 connection.sock.close()
182 print "Test 13 - cryptoID faults"
183 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
184 connection = connect()
185 connection.fault = fault
186 try:
187 connection.handshakeClientCert()
188 print " Good Fault %s" % (Fault.faultNames[fault])
189 except TLSFaultError, e:
190 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
191 badFault = True
192 connection.sock.close()
194 print "Test 14 - good mutual X509"
195 x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
196 x509Chain = X509CertChain([x509Cert])
197 s = open(os.path.join(dir, "clientX509Key.pem")).read()
198 x509Key = parsePEMKey(s, private=True)
200 connection = connect()
201 connection.handshakeClientCert(x509Chain, x509Key)
202 assert(isinstance(connection.session.serverCertChain, X509CertChain))
203 connection.close()
204 connection.sock.close()
206 print "Test 14.a - good mutual X509, SSLv3"
207 connection = connect()
208 settings = HandshakeSettings()
209 settings.minVersion = (3,0)
210 settings.maxVersion = (3,0)
211 connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
212 assert(isinstance(connection.session.serverCertChain, X509CertChain))
213 connection.close()
214 connection.sock.close()
216 print "Test 15 - mutual X.509 faults"
217 for fault in Fault.clientCertFaults + Fault.genericFaults:
218 connection = connect()
219 connection.fault = fault
220 try:
221 connection.handshakeClientCert(x509Chain, x509Key)
222 print " Good Fault %s" % (Fault.faultNames[fault])
223 except TLSFaultError, e:
224 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
225 badFault = True
226 connection.sock.close()
228 if cryptoIDlibLoaded:
229 print "Test 16 - good mutual cryptoID"
230 cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
231 cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
233 connection = connect()
234 connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
235 assert(isinstance(connection.session.serverCertChain, CertChain))
236 assert(connection.session.serverCertChain.validate())
237 connection.close()
238 connection.sock.close()
240 print "Test 17 - mutual cryptoID faults"
241 for fault in Fault.clientCertFaults + Fault.genericFaults:
242 connection = connect()
243 connection.fault = fault
244 try:
245 connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
246 print " Good Fault %s" % (Fault.faultNames[fault])
247 except TLSFaultError, e:
248 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
249 badFault = True
250 connection.sock.close()
252 print "Test 18 - good SRP, prepare to resume..."
253 connection = connect()
254 connection.handshakeClientSRP("test", "password")
255 connection.close()
256 connection.sock.close()
257 session = connection.session
259 print "Test 19 - resumption"
260 connection = connect()
261 connection.handshakeClientSRP("test", "garbage", session=session)
262 #Don't close! -- see below
264 print "Test 20 - invalidated resumption"
265 connection.sock.close() #Close the socket without a close_notify!
266 connection = connect()
267 try:
268 connection.handshakeClientSRP("test", "garbage", session=session)
269 assert()
270 except TLSRemoteAlert, alert:
271 if alert.description != AlertDescription.bad_record_mac:
272 raise
273 connection.sock.close()
275 print "Test 21 - HTTPS test X.509"
276 address = address[0], address[1]+1
277 if hasattr(socket, "timeout"):
278 timeoutEx = socket.timeout
279 else:
280 timeoutEx = socket.error
281 while 1:
282 try:
283 time.sleep(2)
284 htmlBody = open(os.path.join(dir, "index.html")).read()
285 fingerprint = None
286 for y in range(2):
287 h = HTTPTLSConnection(\
288 address[0], address[1], x509Fingerprint=fingerprint)
289 for x in range(3):
290 h.request("GET", "/index.html")
291 r = h.getresponse()
292 assert(r.status == 200)
293 s = r.read()
294 assert(s == htmlBody)
295 fingerprint = h.tlsSession.serverCertChain.getFingerprint()
296 assert(fingerprint)
297 time.sleep(2)
298 break
299 except timeoutEx:
300 print "timeout, retrying..."
301 pass
303 if cryptoIDlibLoaded:
304 print "Test 21a - HTTPS test SRP+cryptoID"
305 address = address[0], address[1]+1
306 if hasattr(socket, "timeout"):
307 timeoutEx = socket.timeout
308 else:
309 timeoutEx = socket.error
310 while 1:
311 try:
312 time.sleep(2) #Time to generate key and cryptoID
313 htmlBody = open(os.path.join(dir, "index.html")).read()
314 fingerprint = None
315 protocol = None
316 for y in range(2):
317 h = HTTPTLSConnection(\
318 address[0], address[1],
319 username="test", password="password",
320 cryptoID=fingerprint, protocol=protocol)
321 for x in range(3):
322 h.request("GET", "/index.html")
323 r = h.getresponse()
324 assert(r.status == 200)
325 s = r.read()
326 assert(s == htmlBody)
327 fingerprint = h.tlsSession.serverCertChain.cryptoID
328 assert(fingerprint)
329 protocol = "urn:whatever"
330 time.sleep(2)
331 break
332 except timeoutEx:
333 print "timeout, retrying..."
334 pass
336 address = address[0], address[1]+1
338 implementations = []
339 if cryptlibpyLoaded:
340 implementations.append("cryptlib")
341 if m2cryptoLoaded:
342 implementations.append("openssl")
343 if pycryptoLoaded:
344 implementations.append("pycrypto")
345 implementations.append("python")
347 print "Test 22 - different ciphers"
348 for implementation in implementations:
349 for cipher in ["aes128", "aes256", "rc4"]:
351 print "Test 22:",
352 connection = connect()
354 settings = HandshakeSettings()
355 settings.cipherNames = [cipher]
356 settings.cipherImplementations = [implementation, "python"]
357 connection.handshakeClientSharedKey("shared", "key", settings=settings)
358 print ("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
360 connection.write("hello")
361 h = connection.read(min=5, max=5)
362 assert(h == "hello")
363 connection.close()
364 connection.sock.close()
366 print "Test 23 - throughput test"
367 for implementation in implementations:
368 for cipher in ["aes128", "aes256", "3des", "rc4"]:
369 if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
370 continue
372 print "Test 23:",
373 connection = connect()
375 settings = HandshakeSettings()
376 settings.cipherNames = [cipher]
377 settings.cipherImplementations = [implementation, "python"]
378 connection.handshakeClientSharedKey("shared", "key", settings=settings)
379 print ("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation())),
381 startTime = time.clock()
382 connection.write("hello"*10000)
383 h = connection.read(min=50000, max=50000)
384 stopTime = time.clock()
385 print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))
387 assert(h == "hello"*10000)
388 connection.close()
389 connection.sock.close()
391 print "Test 24 - Internet servers test"
392 try:
393 i = IMAP4_TLS("cyrus.andrew.cmu.edu")
394 i.login("anonymous", "anonymous@anonymous.net")
395 i.logout()
396 print "Test 24: IMAP4 good"
397 p = POP3_TLS("pop.gmail.com")
398 p.quit()
399 print "Test 24: POP3 good"
400 except socket.error, e:
401 print "Non-critical error: socket error trying to reach internet server: ", e
403 if not badFault:
404 print "Test succeeded"
405 else:
406 print "Test failed"
409 def serverTest(address, dir):
410 #Split address into hostname/port tuple
411 address = address.split(":")
412 if len(address)==1:
413 address.append("4443")
414 address = ( address[0], int(address[1]) )
416 #Connect to server
417 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
418 lsock.bind(address)
419 lsock.listen(5)
421 def connect():
422 return TLSConnection(lsock.accept()[0])
424 print "Test 1 - good shared key"
425 sharedKeyDB = SharedKeyDB()
426 sharedKeyDB["shared"] = "key"
427 sharedKeyDB["shared2"] = "key2"
428 connection = connect()
429 connection.handshakeServer(sharedKeyDB=sharedKeyDB)
430 connection.close()
431 connection.sock.close()
433 print "Test 2 - shared key faults"
434 for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
435 connection = connect()
436 connection.fault = fault
437 try:
438 connection.handshakeServer(sharedKeyDB=sharedKeyDB)
439 assert()
440 except:
441 pass
442 connection.sock.close()
444 print "Test 3 - good SRP"
445 #verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB"))
446 #verifierDB.open()
447 verifierDB = VerifierDB()
448 verifierDB.create()
449 entry = VerifierDB.makeVerifier("test", "password", 1536)
450 verifierDB["test"] = entry
452 connection = connect()
453 connection.handshakeServer(verifierDB=verifierDB)
454 connection.close()
455 connection.sock.close()
457 print "Test 4 - SRP faults"
458 for fault in Fault.clientSrpFaults + Fault.genericFaults:
459 connection = connect()
460 connection.fault = fault
461 try:
462 connection.handshakeServer(verifierDB=verifierDB)
463 assert()
464 except:
465 pass
466 connection.sock.close()
468 print "Test 5 - good SRP: unknown_srp_username idiom"
469 connection = connect()
470 connection.handshakeServer(verifierDB=verifierDB)
471 connection.close()
472 connection.sock.close()
474 print "Test 6 - good SRP: with X.509 cert"
475 x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
476 x509Chain = X509CertChain([x509Cert])
477 s = open(os.path.join(dir, "serverX509Key.pem")).read()
478 x509Key = parsePEMKey(s, private=True)
480 connection = connect()
481 connection.handshakeServer(verifierDB=verifierDB, \
482 certChain=x509Chain, privateKey=x509Key)
483 connection.close()
484 connection.sock.close()
486 print "Test 7 - X.509 with SRP faults"
487 for fault in Fault.clientSrpFaults + Fault.genericFaults:
488 connection = connect()
489 connection.fault = fault
490 try:
491 connection.handshakeServer(verifierDB=verifierDB, \
492 certChain=x509Chain, privateKey=x509Key)
493 assert()
494 except:
495 pass
496 connection.sock.close()
498 if cryptoIDlibLoaded:
499 print "Test 8 - good SRP: with cryptoID certs"
500 cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
501 cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
502 connection = connect()
503 connection.handshakeServer(verifierDB=verifierDB, \
504 certChain=cryptoIDChain, privateKey=cryptoIDKey)
505 connection.close()
506 connection.sock.close()
508 print "Test 9 - cryptoID with SRP faults"
509 for fault in Fault.clientSrpFaults + Fault.genericFaults:
510 connection = connect()
511 connection.fault = fault
512 try:
513 connection.handshakeServer(verifierDB=verifierDB, \
514 certChain=cryptoIDChain, privateKey=cryptoIDKey)
515 assert()
516 except:
517 pass
518 connection.sock.close()
520 print "Test 10 - good X.509"
521 connection = connect()
522 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
523 connection.close()
524 connection.sock.close()
526 print "Test 10.a - good X.509, SSL v3"
527 connection = connect()
528 settings = HandshakeSettings()
529 settings.minVersion = (3,0)
530 settings.maxVersion = (3,0)
531 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
532 connection.close()
533 connection.sock.close()
535 print "Test 11 - X.509 faults"
536 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
537 connection = connect()
538 connection.fault = fault
539 try:
540 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
541 assert()
542 except:
543 pass
544 connection.sock.close()
546 if cryptoIDlibLoaded:
547 print "Test 12 - good cryptoID"
548 connection = connect()
549 connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
550 connection.close()
551 connection.sock.close()
553 print "Test 13 - cryptoID faults"
554 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
555 connection = connect()
556 connection.fault = fault
557 try:
558 connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
559 assert()
560 except:
561 pass
562 connection.sock.close()
564 print "Test 14 - good mutual X.509"
565 connection = connect()
566 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
567 assert(isinstance(connection.session.serverCertChain, X509CertChain))
568 connection.close()
569 connection.sock.close()
571 print "Test 14a - good mutual X.509, SSLv3"
572 connection = connect()
573 settings = HandshakeSettings()
574 settings.minVersion = (3,0)
575 settings.maxVersion = (3,0)
576 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
577 assert(isinstance(connection.session.serverCertChain, X509CertChain))
578 connection.close()
579 connection.sock.close()
581 print "Test 15 - mutual X.509 faults"
582 for fault in Fault.clientCertFaults + Fault.genericFaults:
583 connection = connect()
584 connection.fault = fault
585 try:
586 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
587 assert()
588 except:
589 pass
590 connection.sock.close()
592 if cryptoIDlibLoaded:
593 print "Test 16 - good mutual cryptoID"
594 connection = connect()
595 connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
596 assert(isinstance(connection.session.serverCertChain, CertChain))
597 assert(connection.session.serverCertChain.validate())
598 connection.close()
599 connection.sock.close()
601 print "Test 17 - mutual cryptoID faults"
602 for fault in Fault.clientCertFaults + Fault.genericFaults:
603 connection = connect()
604 connection.fault = fault
605 try:
606 connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
607 assert()
608 except:
609 pass
610 connection.sock.close()
612 print "Test 18 - good SRP, prepare to resume"
613 sessionCache = SessionCache()
614 connection = connect()
615 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
616 connection.close()
617 connection.sock.close()
619 print "Test 19 - resumption"
620 connection = connect()
621 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
622 #Don't close! -- see next test
624 print "Test 20 - invalidated resumption"
625 try:
626 connection.read(min=1, max=1)
627 assert() #Client is going to close the socket without a close_notify
628 except TLSAbruptCloseError, e:
629 pass
630 connection = connect()
631 try:
632 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
633 except TLSLocalAlert, alert:
634 if alert.description != AlertDescription.bad_record_mac:
635 raise
636 connection.sock.close()
638 print "Test 21 - HTTPS test X.509"
640 #Close the current listening socket
641 lsock.close()
643 #Create and run an HTTP Server using TLSSocketServerMixIn
644 class MyHTTPServer(TLSSocketServerMixIn,
645 BaseHTTPServer.HTTPServer):
646 def handshake(self, tlsConnection):
647 tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
648 return True
649 cd = os.getcwd()
650 os.chdir(dir)
651 address = address[0], address[1]+1
652 httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
653 for x in range(6):
654 httpd.handle_request()
655 httpd.server_close()
656 cd = os.chdir(cd)
658 if cryptoIDlibLoaded:
659 print "Test 21a - HTTPS test SRP+cryptoID"
661 #Create and run an HTTP Server using TLSSocketServerMixIn
662 class MyHTTPServer(TLSSocketServerMixIn,
663 BaseHTTPServer.HTTPServer):
664 def handshake(self, tlsConnection):
665 tlsConnection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey,
666 verifierDB=verifierDB)
667 return True
668 cd = os.getcwd()
669 os.chdir(dir)
670 address = address[0], address[1]+1
671 httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
672 for x in range(6):
673 httpd.handle_request()
674 httpd.server_close()
675 cd = os.chdir(cd)
677 #Re-connect the listening socket
678 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
679 address = address[0], address[1]+1
680 lsock.bind(address)
681 lsock.listen(5)
683 def connect():
684 return TLSConnection(lsock.accept()[0])
686 implementations = []
687 if cryptlibpyLoaded:
688 implementations.append("cryptlib")
689 if m2cryptoLoaded:
690 implementations.append("openssl")
691 if pycryptoLoaded:
692 implementations.append("pycrypto")
693 implementations.append("python")
695 print "Test 22 - different ciphers"
696 for implementation in ["python"] * len(implementations):
697 for cipher in ["aes128", "aes256", "rc4"]:
699 print "Test 22:",
700 connection = connect()
702 settings = HandshakeSettings()
703 settings.cipherNames = [cipher]
704 settings.cipherImplementations = [implementation, "python"]
706 connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
707 print connection.getCipherName(), connection.getCipherImplementation()
708 h = connection.read(min=5, max=5)
709 assert(h == "hello")
710 connection.write(h)
711 connection.close()
712 connection.sock.close()
714 print "Test 23 - throughput test"
715 for implementation in implementations:
716 for cipher in ["aes128", "aes256", "3des", "rc4"]:
717 if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
718 continue
720 print "Test 23:",
721 connection = connect()
723 settings = HandshakeSettings()
724 settings.cipherNames = [cipher]
725 settings.cipherImplementations = [implementation, "python"]
727 connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
728 print connection.getCipherName(), connection.getCipherImplementation()
729 h = connection.read(min=50000, max=50000)
730 assert(h == "hello"*10000)
731 connection.write(h)
732 connection.close()
733 connection.sock.close()
735 print "Test succeeded"
748 if len(sys.argv) == 1 or (len(sys.argv)==2 and sys.argv[1].lower().endswith("help")):
749 print ""
750 print "Version: 0.3.8"
751 print ""
752 print "RNG: %s" % prngName
753 print ""
754 print "Modules:"
755 if cryptlibpyLoaded:
756 print " cryptlib_py : Loaded"
757 else:
758 print " cryptlib_py : Not Loaded"
759 if m2cryptoLoaded:
760 print " M2Crypto : Loaded"
761 else:
762 print " M2Crypto : Not Loaded"
763 if pycryptoLoaded:
764 print " pycrypto : Loaded"
765 else:
766 print " pycrypto : Not Loaded"
767 if gmpyLoaded:
768 print " GMPY : Loaded"
769 else:
770 print " GMPY : Not Loaded"
771 if cryptoIDlibLoaded:
772 print " cryptoIDlib : Loaded"
773 else:
774 print " cryptoIDlib : Not Loaded"
775 print ""
776 print "Commands:"
777 print ""
778 print " clientcert <server> [<chain> <key>]"
779 print " clientsharedkey <server> <user> <pass>"
780 print " clientsrp <server> <user> <pass>"
781 print " clienttest <server> <dir>"
782 print ""
783 print " serversrp <server> <verifierDB>"
784 print " servercert <server> <chain> <key> [req]"
785 print " serversrpcert <server> <verifierDB> <chain> <key>"
786 print " serversharedkey <server> <sharedkeyDB>"
787 print " servertest <server> <dir>"
788 sys.exit()
790 cmd = sys.argv[1].lower()
792 class Args:
793 def __init__(self, argv):
794 self.argv = argv
795 def get(self, index):
796 if len(self.argv)<=index:
797 raise SyntaxError("Not enough arguments")
798 return self.argv[index]
799 def getLast(self, index):
800 if len(self.argv)>index+1:
801 raise SyntaxError("Too many arguments")
802 return self.get(index)
804 args = Args(sys.argv)
806 def reformatDocString(s):
807 lines = s.splitlines()
808 newLines = []
809 for line in lines:
810 newLines.append(" " + line.strip())
811 return "\n".join(newLines)
813 try:
814 if cmd == "clienttest":
815 address = args.get(2)
816 dir = args.getLast(3)
817 clientTest(address, dir)
818 sys.exit()
820 elif cmd.startswith("client"):
821 address = args.get(2)
823 #Split address into hostname/port tuple
824 address = address.split(":")
825 if len(address)==1:
826 address.append("4443")
827 address = ( address[0], int(address[1]) )
829 def connect():
830 #Connect to server
831 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
832 if hasattr(sock, "settimeout"):
833 sock.settimeout(5)
834 sock.connect(address)
836 #Instantiate TLSConnections
837 return TLSConnection(sock)
839 try:
840 if cmd == "clientsrp":
841 username = args.get(3)
842 password = args.getLast(4)
843 connection = connect()
844 start = time.clock()
845 connection.handshakeClientSRP(username, password)
846 elif cmd == "clientsharedkey":
847 username = args.get(3)
848 password = args.getLast(4)
849 connection = connect()
850 start = time.clock()
851 connection.handshakeClientSharedKey(username, password)
852 elif cmd == "clientcert":
853 certChain = None
854 privateKey = None
855 if len(sys.argv) > 3:
856 certFilename = args.get(3)
857 keyFilename = args.getLast(4)
859 s1 = open(certFilename, "rb").read()
860 s2 = open(keyFilename, "rb").read()
862 #Try to create cryptoID cert chain
863 if cryptoIDlibLoaded:
864 try:
865 certChain = CertChain().parse(s1)
866 privateKey = parsePrivateKey(s2)
867 except:
868 certChain = None
869 privateKey = None
871 #Try to create X.509 cert chain
872 if not certChain:
873 x509 = X509()
874 x509.parse(s1)
875 certChain = X509CertChain([x509])
876 privateKey = parsePrivateKey(s2)
878 connection = connect()
879 start = time.clock()
880 connection.handshakeClientCert(certChain, privateKey)
881 else:
882 raise SyntaxError("Unknown command")
884 except TLSLocalAlert, a:
885 if a.description == AlertDescription.bad_record_mac:
886 if cmd == "clientsharedkey":
887 print "Bad sharedkey password"
888 else:
889 raise
890 elif a.description == AlertDescription.user_canceled:
891 print str(a)
892 else:
893 raise
894 sys.exit()
895 except TLSRemoteAlert, a:
896 if a.description == AlertDescription.unknown_srp_username:
897 if cmd == "clientsrp":
898 print "Unknown username"
899 else:
900 raise
901 elif a.description == AlertDescription.bad_record_mac:
902 if cmd == "clientsrp":
903 print "Bad username or password"
904 else:
905 raise
906 elif a.description == AlertDescription.handshake_failure:
907 print "Unable to negotiate mutually acceptable parameters"
908 else:
909 raise
910 sys.exit()
912 stop = time.clock()
913 print "Handshake success"
914 print " Handshake time: %.4f seconds" % (stop - start)
915 print " Version: %s.%s" % connection.version
916 print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
917 if connection.session.srpUsername:
918 print " Client SRP username: %s" % connection.session.srpUsername
919 if connection.session.sharedKeyUsername:
920 print " Client shared key username: %s" % connection.session.sharedKeyUsername
921 if connection.session.clientCertChain:
922 print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
923 if connection.session.serverCertChain:
924 print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
925 connection.close()
926 connection.sock.close()
928 elif cmd.startswith("server"):
929 address = args.get(2)
931 #Split address into hostname/port tuple
932 address = address.split(":")
933 if len(address)==1:
934 address.append("4443")
935 address = ( address[0], int(address[1]) )
937 verifierDBFilename = None
938 sharedKeyDBFilename = None
939 certFilename = None
940 keyFilename = None
941 sharedKeyDB = None
942 reqCert = False
944 if cmd == "serversrp":
945 verifierDBFilename = args.getLast(3)
946 elif cmd == "servercert":
947 certFilename = args.get(3)
948 keyFilename = args.get(4)
949 if len(sys.argv)>=6:
950 req = args.getLast(5)
951 if req.lower() != "req":
952 raise SyntaxError()
953 reqCert = True
954 elif cmd == "serversrpcert":
955 verifierDBFilename = args.get(3)
956 certFilename = args.get(4)
957 keyFilename = args.getLast(5)
958 elif cmd == "serversharedkey":
959 sharedKeyDBFilename = args.getLast(3)
960 elif cmd == "servertest":
961 address = args.get(2)
962 dir = args.getLast(3)
963 serverTest(address, dir)
964 sys.exit()
966 verifierDB = None
967 if verifierDBFilename:
968 verifierDB = VerifierDB(verifierDBFilename)
969 verifierDB.open()
971 sharedKeyDB = None
972 if sharedKeyDBFilename:
973 sharedKeyDB = SharedKeyDB(sharedKeyDBFilename)
974 sharedKeyDB.open()
976 certChain = None
977 privateKey = None
978 if certFilename:
979 s1 = open(certFilename, "rb").read()
980 s2 = open(keyFilename, "rb").read()
982 #Try to create cryptoID cert chain
983 if cryptoIDlibLoaded:
984 try:
985 certChain = CertChain().parse(s1)
986 privateKey = parsePrivateKey(s2)
987 except:
988 certChain = None
989 privateKey = None
991 #Try to create X.509 cert chain
992 if not certChain:
993 x509 = X509()
994 x509.parse(s1)
995 certChain = X509CertChain([x509])
996 privateKey = parsePrivateKey(s2)
1000 #Create handler function - performs handshake, then echos all bytes received
1001 def handler(sock):
1002 try:
1003 connection = TLSConnection(sock)
1004 settings = HandshakeSettings()
1005 connection.handshakeServer(sharedKeyDB=sharedKeyDB, verifierDB=verifierDB, \
1006 certChain=certChain, privateKey=privateKey, \
1007 reqCert=reqCert, settings=settings)
1008 print "Handshake success"
1009 print " Version: %s.%s" % connection.version
1010 print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
1011 if connection.session.srpUsername:
1012 print " Client SRP username: %s" % connection.session.srpUsername
1013 if connection.session.sharedKeyUsername:
1014 print " Client shared key username: %s" % connection.session.sharedKeyUsername
1015 if connection.session.clientCertChain:
1016 print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
1017 if connection.session.serverCertChain:
1018 print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
1020 s = ""
1021 while 1:
1022 newS = connection.read()
1023 if not newS:
1024 break
1025 s += newS
1026 if s[-1]=='\n':
1027 connection.write(s)
1028 s = ""
1029 except TLSLocalAlert, a:
1030 if a.description == AlertDescription.unknown_srp_username:
1031 print "Unknown SRP username"
1032 elif a.description == AlertDescription.bad_record_mac:
1033 if cmd == "serversrp" or cmd == "serversrpcert":
1034 print "Bad SRP password for:", connection.allegedSrpUsername
1035 else:
1036 raise
1037 elif a.description == AlertDescription.handshake_failure:
1038 print "Unable to negotiate mutually acceptable parameters"
1039 else:
1040 raise
1041 except TLSRemoteAlert, a:
1042 if a.description == AlertDescription.bad_record_mac:
1043 if cmd == "serversharedkey":
1044 print "Bad sharedkey password for:", connection.allegedSharedKeyUsername
1045 else:
1046 raise
1047 elif a.description == AlertDescription.user_canceled:
1048 print "Handshake cancelled"
1049 elif a.description == AlertDescription.handshake_failure:
1050 print "Unable to negotiate mutually acceptable parameters"
1051 elif a.description == AlertDescription.close_notify:
1052 pass
1053 else:
1054 raise
1056 #Run multi-threaded server
1057 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1058 sock.bind(address)
1059 sock.listen(5)
1060 while 1:
1061 (newsock, cliAddress) = sock.accept()
1062 thread.start_new_thread(handler, (newsock,))
1065 else:
1066 print "Bad command: '%s'" % cmd
1067 except TLSRemoteAlert, a:
1068 print str(a)
1069 raise