11 import SimpleHTTPServer
15 from cryptoIDlib
.api
import *
16 cryptoIDlibLoaded
= True
18 cryptoIDlibLoaded
= False
20 if __name__
!= "__main__":
21 raise "This must be run as a command, not used as a module!"
24 #from tlslite.constants import AlertDescription, Fault
26 #from tlslite.utils.jython_compat import formatExceptionTrace
27 #from tlslite.X509 import X509, X509CertChain
29 from tlslite
.api
import *
31 def parsePrivateKey(s
):
33 return parsePEMKey(s
, private
=True)
36 return parseXMLKey(s
, private
=True)
39 def clientTest(address
, dir):
41 #Split address into hostname/port tuple
42 address
= address
.split(":")
44 address
.append("4443")
45 address
= ( address
[0], int(address
[1]) )
48 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
49 if hasattr(sock
, 'settimeout'): #It's a python 2.3 feature
52 c
= TLSConnection(sock
)
59 print "Test 1 - good shared key"
60 connection
= connect()
61 connection
.handshakeClientSharedKey("shared", "key")
63 connection
.sock
.close()
65 print "Test 2 - shared key faults"
66 for fault
in Fault
.clientSharedKeyFaults
+ Fault
.genericFaults
:
67 connection
= connect()
68 connection
.fault
= fault
70 connection
.handshakeClientSharedKey("shared", "key")
71 print " Good Fault %s" % (Fault
.faultNames
[fault
])
72 except TLSFaultError
, e
:
73 print " BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
))
75 connection
.sock
.close()
77 print "Test 3 - good SRP"
78 connection
= connect()
79 connection
.handshakeClientSRP("test", "password")
82 print "Test 4 - SRP faults"
83 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
84 connection
= connect()
85 connection
.fault
= fault
87 connection
.handshakeClientSRP("test", "password")
88 print " Good Fault %s" % (Fault
.faultNames
[fault
])
89 except TLSFaultError
, e
:
90 print " BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
))
92 connection
.sock
.close()
94 print "Test 5 - good SRP: unknown_srp_username idiom"
96 return ("test", "password")
97 connection
= connect()
98 connection
.handshakeClientUnknown(srpCallback
=srpCallback
)
100 connection
.sock
.close()
102 print "Test 6 - good SRP: with X.509 certificate"
103 connection
= connect()
104 connection
.handshakeClientSRP("test", "password")
105 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
107 connection
.sock
.close()
109 print "Test 7 - X.509 with SRP faults"
110 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
111 connection
= connect()
112 connection
.fault
= fault
114 connection
.handshakeClientSRP("test", "password")
115 print " Good Fault %s" % (Fault
.faultNames
[fault
])
116 except TLSFaultError
, e
:
117 print " BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
))
119 connection
.sock
.close()
121 if cryptoIDlibLoaded
:
122 print "Test 8 - good SRP: with cryptoID certificate chain"
123 connection
= connect()
124 connection
.handshakeClientSRP("test", "password")
125 assert(isinstance(connection
.session
.serverCertChain
, CertChain
))
126 if not (connection
.session
.serverCertChain
.validate()):
127 print connection
.session
.serverCertChain
.validate(listProblems
=True)
130 connection
.sock
.close()
132 print "Test 9 - CryptoID with SRP faults"
133 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
134 connection
= connect()
135 connection
.fault
= fault
137 connection
.handshakeClientSRP("test", "password")
138 print " Good Fault %s" % (Fault
.faultNames
[fault
])
139 except TLSFaultError
, e
:
140 print " BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
))
142 connection
.sock
.close()
144 print "Test 10 - good X509"
145 connection
= connect()
146 connection
.handshakeClientCert()
147 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
149 connection
.sock
.close()
151 print "Test 10.a - good X509, SSLv3"
152 connection
= connect()
153 settings
= HandshakeSettings()
154 settings
.minVersion
= (3,0)
155 settings
.maxVersion
= (3,0)
156 connection
.handshakeClientCert(settings
=settings
)
157 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
159 connection
.sock
.close()
161 print "Test 11 - X.509 faults"
162 for fault
in Fault
.clientNoAuthFaults
+ Fault
.genericFaults
:
163 connection
= connect()
164 connection
.fault
= fault
166 connection
.handshakeClientCert()
167 print " Good Fault %s" % (Fault
.faultNames
[fault
])
168 except TLSFaultError
, e
:
169 print " BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
))
171 connection
.sock
.close()
173 if cryptoIDlibLoaded
:
174 print "Test 12 - good cryptoID"
175 connection
= connect()
176 connection
.handshakeClientCert()
177 assert(isinstance(connection
.session
.serverCertChain
, CertChain
))
178 assert(connection
.session
.serverCertChain
.validate())
180 connection
.sock
.close()
182 print "Test 13 - cryptoID faults"
183 for fault
in Fault
.clientNoAuthFaults
+ Fault
.genericFaults
:
184 connection
= connect()
185 connection
.fault
= fault
187 connection
.handshakeClientCert()
188 print " Good Fault %s" % (Fault
.faultNames
[fault
])
189 except TLSFaultError
, e
:
190 print " BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
))
192 connection
.sock
.close()
194 print "Test 14 - good mutual X509"
195 x509Cert
= X509().parse(open(os
.path
.join(dir, "clientX509Cert.pem")).read())
196 x509Chain
= X509CertChain([x509Cert
])
197 s
= open(os
.path
.join(dir, "clientX509Key.pem")).read()
198 x509Key
= parsePEMKey(s
, private
=True)
200 connection
= connect()
201 connection
.handshakeClientCert(x509Chain
, x509Key
)
202 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
204 connection
.sock
.close()
206 print "Test 14.a - good mutual X509, SSLv3"
207 connection
= connect()
208 settings
= HandshakeSettings()
209 settings
.minVersion
= (3,0)
210 settings
.maxVersion
= (3,0)
211 connection
.handshakeClientCert(x509Chain
, x509Key
, settings
=settings
)
212 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
214 connection
.sock
.close()
216 print "Test 15 - mutual X.509 faults"
217 for fault
in Fault
.clientCertFaults
+ Fault
.genericFaults
:
218 connection
= connect()
219 connection
.fault
= fault
221 connection
.handshakeClientCert(x509Chain
, x509Key
)
222 print " Good Fault %s" % (Fault
.faultNames
[fault
])
223 except TLSFaultError
, e
:
224 print " BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
))
226 connection
.sock
.close()
228 if cryptoIDlibLoaded
:
229 print "Test 16 - good mutual cryptoID"
230 cryptoIDChain
= CertChain().parse(open(os
.path
.join(dir, "serverCryptoIDChain.xml"), "r").read())
231 cryptoIDKey
= parseXMLKey(open(os
.path
.join(dir, "serverCryptoIDKey.xml"), "r").read(), private
=True)
233 connection
= connect()
234 connection
.handshakeClientCert(cryptoIDChain
, cryptoIDKey
)
235 assert(isinstance(connection
.session
.serverCertChain
, CertChain
))
236 assert(connection
.session
.serverCertChain
.validate())
238 connection
.sock
.close()
240 print "Test 17 - mutual cryptoID faults"
241 for fault
in Fault
.clientCertFaults
+ Fault
.genericFaults
:
242 connection
= connect()
243 connection
.fault
= fault
245 connection
.handshakeClientCert(cryptoIDChain
, cryptoIDKey
)
246 print " Good Fault %s" % (Fault
.faultNames
[fault
])
247 except TLSFaultError
, e
:
248 print " BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
))
250 connection
.sock
.close()
252 print "Test 18 - good SRP, prepare to resume..."
253 connection
= connect()
254 connection
.handshakeClientSRP("test", "password")
256 connection
.sock
.close()
257 session
= connection
.session
259 print "Test 19 - resumption"
260 connection
= connect()
261 connection
.handshakeClientSRP("test", "garbage", session
=session
)
262 #Don't close! -- see below
264 print "Test 20 - invalidated resumption"
265 connection
.sock
.close() #Close the socket without a close_notify!
266 connection
= connect()
268 connection
.handshakeClientSRP("test", "garbage", session
=session
)
270 except TLSRemoteAlert
, alert
:
271 if alert
.description
!= AlertDescription
.bad_record_mac
:
273 connection
.sock
.close()
275 print "Test 21 - HTTPS test X.509"
276 address
= address
[0], address
[1]+1
277 if hasattr(socket
, "timeout"):
278 timeoutEx
= socket
.timeout
280 timeoutEx
= socket
.error
284 htmlBody
= open(os
.path
.join(dir, "index.html")).read()
287 h
= HTTPTLSConnection(\
288 address
[0], address
[1], x509Fingerprint
=fingerprint
)
290 h
.request("GET", "/index.html")
292 assert(r
.status
== 200)
294 assert(s
== htmlBody
)
295 fingerprint
= h
.tlsSession
.serverCertChain
.getFingerprint()
300 print "timeout, retrying..."
303 if cryptoIDlibLoaded
:
304 print "Test 21a - HTTPS test SRP+cryptoID"
305 address
= address
[0], address
[1]+1
306 if hasattr(socket
, "timeout"):
307 timeoutEx
= socket
.timeout
309 timeoutEx
= socket
.error
312 time
.sleep(2) #Time to generate key and cryptoID
313 htmlBody
= open(os
.path
.join(dir, "index.html")).read()
317 h
= HTTPTLSConnection(\
318 address
[0], address
[1],
319 username
="test", password
="password",
320 cryptoID
=fingerprint
, protocol
=protocol
)
322 h
.request("GET", "/index.html")
324 assert(r
.status
== 200)
326 assert(s
== htmlBody
)
327 fingerprint
= h
.tlsSession
.serverCertChain
.cryptoID
329 protocol
= "urn:whatever"
333 print "timeout, retrying..."
336 address
= address
[0], address
[1]+1
340 implementations
.append("cryptlib")
342 implementations
.append("openssl")
344 implementations
.append("pycrypto")
345 implementations
.append("python")
347 print "Test 22 - different ciphers"
348 for implementation
in implementations
:
349 for cipher
in ["aes128", "aes256", "rc4"]:
352 connection
= connect()
354 settings
= HandshakeSettings()
355 settings
.cipherNames
= [cipher
]
356 settings
.cipherImplementations
= [implementation
, "python"]
357 connection
.handshakeClientSharedKey("shared", "key", settings
=settings
)
358 print ("%s %s" % (connection
.getCipherName(), connection
.getCipherImplementation()))
360 connection
.write("hello")
361 h
= connection
.read(min=5, max=5)
364 connection
.sock
.close()
366 print "Test 23 - throughput test"
367 for implementation
in implementations
:
368 for cipher
in ["aes128", "aes256", "3des", "rc4"]:
369 if cipher
== "3des" and implementation
not in ("openssl", "cryptlib", "pycrypto"):
373 connection
= connect()
375 settings
= HandshakeSettings()
376 settings
.cipherNames
= [cipher
]
377 settings
.cipherImplementations
= [implementation
, "python"]
378 connection
.handshakeClientSharedKey("shared", "key", settings
=settings
)
379 print ("%s %s:" % (connection
.getCipherName(), connection
.getCipherImplementation())),
381 startTime
= time
.clock()
382 connection
.write("hello"*10000)
383 h
= connection
.read(min=50000, max=50000)
384 stopTime
= time
.clock()
385 print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime
-startTime
))
387 assert(h
== "hello"*10000)
389 connection
.sock
.close()
391 print "Test 24 - Internet servers test"
393 i
= IMAP4_TLS("cyrus.andrew.cmu.edu")
394 i
.login("anonymous", "anonymous@anonymous.net")
396 print "Test 24: IMAP4 good"
397 p
= POP3_TLS("pop.gmail.com")
399 print "Test 24: POP3 good"
400 except socket
.error
, e
:
401 print "Non-critical error: socket error trying to reach internet server: ", e
404 print "Test succeeded"
409 def serverTest(address
, dir):
410 #Split address into hostname/port tuple
411 address
= address
.split(":")
413 address
.append("4443")
414 address
= ( address
[0], int(address
[1]) )
417 lsock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
422 return TLSConnection(lsock
.accept()[0])
424 print "Test 1 - good shared key"
425 sharedKeyDB
= SharedKeyDB()
426 sharedKeyDB
["shared"] = "key"
427 sharedKeyDB
["shared2"] = "key2"
428 connection
= connect()
429 connection
.handshakeServer(sharedKeyDB
=sharedKeyDB
)
431 connection
.sock
.close()
433 print "Test 2 - shared key faults"
434 for fault
in Fault
.clientSharedKeyFaults
+ Fault
.genericFaults
:
435 connection
= connect()
436 connection
.fault
= fault
438 connection
.handshakeServer(sharedKeyDB
=sharedKeyDB
)
442 connection
.sock
.close()
444 print "Test 3 - good SRP"
445 #verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB"))
447 verifierDB
= VerifierDB()
449 entry
= VerifierDB
.makeVerifier("test", "password", 1536)
450 verifierDB
["test"] = entry
452 connection
= connect()
453 connection
.handshakeServer(verifierDB
=verifierDB
)
455 connection
.sock
.close()
457 print "Test 4 - SRP faults"
458 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
459 connection
= connect()
460 connection
.fault
= fault
462 connection
.handshakeServer(verifierDB
=verifierDB
)
466 connection
.sock
.close()
468 print "Test 5 - good SRP: unknown_srp_username idiom"
469 connection
= connect()
470 connection
.handshakeServer(verifierDB
=verifierDB
)
472 connection
.sock
.close()
474 print "Test 6 - good SRP: with X.509 cert"
475 x509Cert
= X509().parse(open(os
.path
.join(dir, "serverX509Cert.pem")).read())
476 x509Chain
= X509CertChain([x509Cert
])
477 s
= open(os
.path
.join(dir, "serverX509Key.pem")).read()
478 x509Key
= parsePEMKey(s
, private
=True)
480 connection
= connect()
481 connection
.handshakeServer(verifierDB
=verifierDB
, \
482 certChain
=x509Chain
, privateKey
=x509Key
)
484 connection
.sock
.close()
486 print "Test 7 - X.509 with SRP faults"
487 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
488 connection
= connect()
489 connection
.fault
= fault
491 connection
.handshakeServer(verifierDB
=verifierDB
, \
492 certChain
=x509Chain
, privateKey
=x509Key
)
496 connection
.sock
.close()
498 if cryptoIDlibLoaded
:
499 print "Test 8 - good SRP: with cryptoID certs"
500 cryptoIDChain
= CertChain().parse(open(os
.path
.join(dir, "serverCryptoIDChain.xml"), "r").read())
501 cryptoIDKey
= parseXMLKey(open(os
.path
.join(dir, "serverCryptoIDKey.xml"), "r").read(), private
=True)
502 connection
= connect()
503 connection
.handshakeServer(verifierDB
=verifierDB
, \
504 certChain
=cryptoIDChain
, privateKey
=cryptoIDKey
)
506 connection
.sock
.close()
508 print "Test 9 - cryptoID with SRP faults"
509 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
510 connection
= connect()
511 connection
.fault
= fault
513 connection
.handshakeServer(verifierDB
=verifierDB
, \
514 certChain
=cryptoIDChain
, privateKey
=cryptoIDKey
)
518 connection
.sock
.close()
520 print "Test 10 - good X.509"
521 connection
= connect()
522 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
)
524 connection
.sock
.close()
526 print "Test 10.a - good X.509, SSL v3"
527 connection
= connect()
528 settings
= HandshakeSettings()
529 settings
.minVersion
= (3,0)
530 settings
.maxVersion
= (3,0)
531 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, settings
=settings
)
533 connection
.sock
.close()
535 print "Test 11 - X.509 faults"
536 for fault
in Fault
.clientNoAuthFaults
+ Fault
.genericFaults
:
537 connection
= connect()
538 connection
.fault
= fault
540 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
)
544 connection
.sock
.close()
546 if cryptoIDlibLoaded
:
547 print "Test 12 - good cryptoID"
548 connection
= connect()
549 connection
.handshakeServer(certChain
=cryptoIDChain
, privateKey
=cryptoIDKey
)
551 connection
.sock
.close()
553 print "Test 13 - cryptoID faults"
554 for fault
in Fault
.clientNoAuthFaults
+ Fault
.genericFaults
:
555 connection
= connect()
556 connection
.fault
= fault
558 connection
.handshakeServer(certChain
=cryptoIDChain
, privateKey
=cryptoIDKey
)
562 connection
.sock
.close()
564 print "Test 14 - good mutual X.509"
565 connection
= connect()
566 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, reqCert
=True)
567 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
569 connection
.sock
.close()
571 print "Test 14a - good mutual X.509, SSLv3"
572 connection
= connect()
573 settings
= HandshakeSettings()
574 settings
.minVersion
= (3,0)
575 settings
.maxVersion
= (3,0)
576 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, reqCert
=True, settings
=settings
)
577 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
579 connection
.sock
.close()
581 print "Test 15 - mutual X.509 faults"
582 for fault
in Fault
.clientCertFaults
+ Fault
.genericFaults
:
583 connection
= connect()
584 connection
.fault
= fault
586 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, reqCert
=True)
590 connection
.sock
.close()
592 if cryptoIDlibLoaded
:
593 print "Test 16 - good mutual cryptoID"
594 connection
= connect()
595 connection
.handshakeServer(certChain
=cryptoIDChain
, privateKey
=cryptoIDKey
, reqCert
=True)
596 assert(isinstance(connection
.session
.serverCertChain
, CertChain
))
597 assert(connection
.session
.serverCertChain
.validate())
599 connection
.sock
.close()
601 print "Test 17 - mutual cryptoID faults"
602 for fault
in Fault
.clientCertFaults
+ Fault
.genericFaults
:
603 connection
= connect()
604 connection
.fault
= fault
606 connection
.handshakeServer(certChain
=cryptoIDChain
, privateKey
=cryptoIDKey
, reqCert
=True)
610 connection
.sock
.close()
612 print "Test 18 - good SRP, prepare to resume"
613 sessionCache
= SessionCache()
614 connection
= connect()
615 connection
.handshakeServer(verifierDB
=verifierDB
, sessionCache
=sessionCache
)
617 connection
.sock
.close()
619 print "Test 19 - resumption"
620 connection
= connect()
621 connection
.handshakeServer(verifierDB
=verifierDB
, sessionCache
=sessionCache
)
622 #Don't close! -- see next test
624 print "Test 20 - invalidated resumption"
626 connection
.read(min=1, max=1)
627 assert() #Client is going to close the socket without a close_notify
628 except TLSAbruptCloseError
, e
:
630 connection
= connect()
632 connection
.handshakeServer(verifierDB
=verifierDB
, sessionCache
=sessionCache
)
633 except TLSLocalAlert
, alert
:
634 if alert
.description
!= AlertDescription
.bad_record_mac
:
636 connection
.sock
.close()
638 print "Test 21 - HTTPS test X.509"
640 #Close the current listening socket
643 #Create and run an HTTP Server using TLSSocketServerMixIn
644 class MyHTTPServer(TLSSocketServerMixIn
,
645 BaseHTTPServer
.HTTPServer
):
646 def handshake(self
, tlsConnection
):
647 tlsConnection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
)
651 address
= address
[0], address
[1]+1
652 httpd
= MyHTTPServer(address
, SimpleHTTPServer
.SimpleHTTPRequestHandler
)
654 httpd
.handle_request()
658 if cryptoIDlibLoaded
:
659 print "Test 21a - HTTPS test SRP+cryptoID"
661 #Create and run an HTTP Server using TLSSocketServerMixIn
662 class MyHTTPServer(TLSSocketServerMixIn
,
663 BaseHTTPServer
.HTTPServer
):
664 def handshake(self
, tlsConnection
):
665 tlsConnection
.handshakeServer(certChain
=cryptoIDChain
, privateKey
=cryptoIDKey
,
666 verifierDB
=verifierDB
)
670 address
= address
[0], address
[1]+1
671 httpd
= MyHTTPServer(address
, SimpleHTTPServer
.SimpleHTTPRequestHandler
)
673 httpd
.handle_request()
677 #Re-connect the listening socket
678 lsock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
679 address
= address
[0], address
[1]+1
684 return TLSConnection(lsock
.accept()[0])
688 implementations
.append("cryptlib")
690 implementations
.append("openssl")
692 implementations
.append("pycrypto")
693 implementations
.append("python")
695 print "Test 22 - different ciphers"
696 for implementation
in ["python"] * len(implementations
):
697 for cipher
in ["aes128", "aes256", "rc4"]:
700 connection
= connect()
702 settings
= HandshakeSettings()
703 settings
.cipherNames
= [cipher
]
704 settings
.cipherImplementations
= [implementation
, "python"]
706 connection
.handshakeServer(sharedKeyDB
=sharedKeyDB
, settings
=settings
)
707 print connection
.getCipherName(), connection
.getCipherImplementation()
708 h
= connection
.read(min=5, max=5)
712 connection
.sock
.close()
714 print "Test 23 - throughput test"
715 for implementation
in implementations
:
716 for cipher
in ["aes128", "aes256", "3des", "rc4"]:
717 if cipher
== "3des" and implementation
not in ("openssl", "cryptlib", "pycrypto"):
721 connection
= connect()
723 settings
= HandshakeSettings()
724 settings
.cipherNames
= [cipher
]
725 settings
.cipherImplementations
= [implementation
, "python"]
727 connection
.handshakeServer(sharedKeyDB
=sharedKeyDB
, settings
=settings
)
728 print connection
.getCipherName(), connection
.getCipherImplementation()
729 h
= connection
.read(min=50000, max=50000)
730 assert(h
== "hello"*10000)
733 connection
.sock
.close()
735 print "Test succeeded"
748 if len(sys
.argv
) == 1 or (len(sys
.argv
)==2 and sys
.argv
[1].lower().endswith("help")):
750 print "Version: 0.3.8"
752 print "RNG: %s" % prngName
756 print " cryptlib_py : Loaded"
758 print " cryptlib_py : Not Loaded"
760 print " M2Crypto : Loaded"
762 print " M2Crypto : Not Loaded"
764 print " pycrypto : Loaded"
766 print " pycrypto : Not Loaded"
768 print " GMPY : Loaded"
770 print " GMPY : Not Loaded"
771 if cryptoIDlibLoaded
:
772 print " cryptoIDlib : Loaded"
774 print " cryptoIDlib : Not Loaded"
778 print " clientcert <server> [<chain> <key>]"
779 print " clientsharedkey <server> <user> <pass>"
780 print " clientsrp <server> <user> <pass>"
781 print " clienttest <server> <dir>"
783 print " serversrp <server> <verifierDB>"
784 print " servercert <server> <chain> <key> [req]"
785 print " serversrpcert <server> <verifierDB> <chain> <key>"
786 print " serversharedkey <server> <sharedkeyDB>"
787 print " servertest <server> <dir>"
790 cmd
= sys
.argv
[1].lower()
793 def __init__(self
, argv
):
795 def get(self
, index
):
796 if len(self
.argv
)<=index
:
797 raise SyntaxError("Not enough arguments")
798 return self
.argv
[index
]
799 def getLast(self
, index
):
800 if len(self
.argv
)>index
+1:
801 raise SyntaxError("Too many arguments")
802 return self
.get(index
)
804 args
= Args(sys
.argv
)
806 def reformatDocString(s
):
807 lines
= s
.splitlines()
810 newLines
.append(" " + line
.strip())
811 return "\n".join(newLines
)
814 if cmd
== "clienttest":
815 address
= args
.get(2)
816 dir = args
.getLast(3)
817 clientTest(address
, dir)
820 elif cmd
.startswith("client"):
821 address
= args
.get(2)
823 #Split address into hostname/port tuple
824 address
= address
.split(":")
826 address
.append("4443")
827 address
= ( address
[0], int(address
[1]) )
831 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
832 if hasattr(sock
, "settimeout"):
834 sock
.connect(address
)
836 #Instantiate TLSConnections
837 return TLSConnection(sock
)
840 if cmd
== "clientsrp":
841 username
= args
.get(3)
842 password
= args
.getLast(4)
843 connection
= connect()
845 connection
.handshakeClientSRP(username
, password
)
846 elif cmd
== "clientsharedkey":
847 username
= args
.get(3)
848 password
= args
.getLast(4)
849 connection
= connect()
851 connection
.handshakeClientSharedKey(username
, password
)
852 elif cmd
== "clientcert":
855 if len(sys
.argv
) > 3:
856 certFilename
= args
.get(3)
857 keyFilename
= args
.getLast(4)
859 s1
= open(certFilename
, "rb").read()
860 s2
= open(keyFilename
, "rb").read()
862 #Try to create cryptoID cert chain
863 if cryptoIDlibLoaded
:
865 certChain
= CertChain().parse(s1
)
866 privateKey
= parsePrivateKey(s2
)
871 #Try to create X.509 cert chain
875 certChain
= X509CertChain([x509
])
876 privateKey
= parsePrivateKey(s2
)
878 connection
= connect()
880 connection
.handshakeClientCert(certChain
, privateKey
)
882 raise SyntaxError("Unknown command")
884 except TLSLocalAlert
, a
:
885 if a
.description
== AlertDescription
.bad_record_mac
:
886 if cmd
== "clientsharedkey":
887 print "Bad sharedkey password"
890 elif a
.description
== AlertDescription
.user_canceled
:
895 except TLSRemoteAlert
, a
:
896 if a
.description
== AlertDescription
.unknown_srp_username
:
897 if cmd
== "clientsrp":
898 print "Unknown username"
901 elif a
.description
== AlertDescription
.bad_record_mac
:
902 if cmd
== "clientsrp":
903 print "Bad username or password"
906 elif a
.description
== AlertDescription
.handshake_failure
:
907 print "Unable to negotiate mutually acceptable parameters"
913 print "Handshake success"
914 print " Handshake time: %.4f seconds" % (stop
- start
)
915 print " Version: %s.%s" % connection
.version
916 print " Cipher: %s %s" % (connection
.getCipherName(), connection
.getCipherImplementation())
917 if connection
.session
.srpUsername
:
918 print " Client SRP username: %s" % connection
.session
.srpUsername
919 if connection
.session
.sharedKeyUsername
:
920 print " Client shared key username: %s" % connection
.session
.sharedKeyUsername
921 if connection
.session
.clientCertChain
:
922 print " Client fingerprint: %s" % connection
.session
.clientCertChain
.getFingerprint()
923 if connection
.session
.serverCertChain
:
924 print " Server fingerprint: %s" % connection
.session
.serverCertChain
.getFingerprint()
926 connection
.sock
.close()
928 elif cmd
.startswith("server"):
929 address
= args
.get(2)
931 #Split address into hostname/port tuple
932 address
= address
.split(":")
934 address
.append("4443")
935 address
= ( address
[0], int(address
[1]) )
937 verifierDBFilename
= None
938 sharedKeyDBFilename
= None
944 if cmd
== "serversrp":
945 verifierDBFilename
= args
.getLast(3)
946 elif cmd
== "servercert":
947 certFilename
= args
.get(3)
948 keyFilename
= args
.get(4)
950 req
= args
.getLast(5)
951 if req
.lower() != "req":
954 elif cmd
== "serversrpcert":
955 verifierDBFilename
= args
.get(3)
956 certFilename
= args
.get(4)
957 keyFilename
= args
.getLast(5)
958 elif cmd
== "serversharedkey":
959 sharedKeyDBFilename
= args
.getLast(3)
960 elif cmd
== "servertest":
961 address
= args
.get(2)
962 dir = args
.getLast(3)
963 serverTest(address
, dir)
967 if verifierDBFilename
:
968 verifierDB
= VerifierDB(verifierDBFilename
)
972 if sharedKeyDBFilename
:
973 sharedKeyDB
= SharedKeyDB(sharedKeyDBFilename
)
979 s1
= open(certFilename
, "rb").read()
980 s2
= open(keyFilename
, "rb").read()
982 #Try to create cryptoID cert chain
983 if cryptoIDlibLoaded
:
985 certChain
= CertChain().parse(s1
)
986 privateKey
= parsePrivateKey(s2
)
991 #Try to create X.509 cert chain
995 certChain
= X509CertChain([x509
])
996 privateKey
= parsePrivateKey(s2
)
1000 #Create handler function - performs handshake, then echos all bytes received
1003 connection
= TLSConnection(sock
)
1004 settings
= HandshakeSettings()
1005 connection
.handshakeServer(sharedKeyDB
=sharedKeyDB
, verifierDB
=verifierDB
, \
1006 certChain
=certChain
, privateKey
=privateKey
, \
1007 reqCert
=reqCert
, settings
=settings
)
1008 print "Handshake success"
1009 print " Version: %s.%s" % connection
.version
1010 print " Cipher: %s %s" % (connection
.getCipherName(), connection
.getCipherImplementation())
1011 if connection
.session
.srpUsername
:
1012 print " Client SRP username: %s" % connection
.session
.srpUsername
1013 if connection
.session
.sharedKeyUsername
:
1014 print " Client shared key username: %s" % connection
.session
.sharedKeyUsername
1015 if connection
.session
.clientCertChain
:
1016 print " Client fingerprint: %s" % connection
.session
.clientCertChain
.getFingerprint()
1017 if connection
.session
.serverCertChain
:
1018 print " Server fingerprint: %s" % connection
.session
.serverCertChain
.getFingerprint()
1022 newS
= connection
.read()
1029 except TLSLocalAlert
, a
:
1030 if a
.description
== AlertDescription
.unknown_srp_username
:
1031 print "Unknown SRP username"
1032 elif a
.description
== AlertDescription
.bad_record_mac
:
1033 if cmd
== "serversrp" or cmd
== "serversrpcert":
1034 print "Bad SRP password for:", connection
.allegedSrpUsername
1037 elif a
.description
== AlertDescription
.handshake_failure
:
1038 print "Unable to negotiate mutually acceptable parameters"
1041 except TLSRemoteAlert
, a
:
1042 if a
.description
== AlertDescription
.bad_record_mac
:
1043 if cmd
== "serversharedkey":
1044 print "Bad sharedkey password for:", connection
.allegedSharedKeyUsername
1047 elif a
.description
== AlertDescription
.user_canceled
:
1048 print "Handshake cancelled"
1049 elif a
.description
== AlertDescription
.handshake_failure
:
1050 print "Unable to negotiate mutually acceptable parameters"
1051 elif a
.description
== AlertDescription
.close_notify
:
1056 #Run multi-threaded server
1057 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1061 (newsock
, cliAddress
) = sock
.accept()
1062 thread
.start_new_thread(handler
, (newsock
,))
1066 print "Bad command: '%s'" % cmd
1067 except TLSRemoteAlert
, a
: