Blink roll 25b6bd3a7a131ffe68d809546ad1a20707915cdc:3a503f41ae42e5b79cfcd2ff10e65afde...
[chromium-blink-merge.git] / third_party / tlslite / tests / tlstest.py
blobfa1b13fa65d3248748de662d75a136ecedf39cba
1 #!/usr/bin/env python
3 # Authors:
4 # Trevor Perrin
5 # Kees Bos - Added tests for XML-RPC
6 # Dimitris Moraitis - Anon ciphersuites
7 # Marcelo Fernandez - Added test for NPN
8 # Martin von Loewis - python 3 port
11 # See the LICENSE file for legal information regarding use of this file.
12 from __future__ import print_function
13 import sys
14 import os
15 import os.path
16 import socket
17 import time
18 import getopt
19 try:
20 from BaseHTTPServer import HTTPServer
21 from SimpleHTTPServer import SimpleHTTPRequestHandler
22 except ImportError:
23 from http.server import HTTPServer, SimpleHTTPRequestHandler
25 from tlslite import TLSConnection, Fault, HandshakeSettings, \
26 X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \
27 parsePEMKey, constants, \
28 AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \
29 POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \
30 Checker, __version__
32 from tlslite.errors import *
33 from tlslite.utils.cryptomath import prngName
34 try:
35 import xmlrpclib
36 except ImportError:
37 # Python 3
38 from xmlrpc import client as xmlrpclib
39 from tlslite import *
41 try:
42 from tack.structures.Tack import Tack
44 except ImportError:
45 pass
47 def printUsage(s=None):
48 if m2cryptoLoaded:
49 crypto = "M2Crypto/OpenSSL"
50 else:
51 crypto = "Python crypto"
52 if s:
53 print("ERROR: %s" % s)
54 print("""\ntls.py version %s (using %s)
56 Commands:
57 server HOST:PORT DIRECTORY
59 client HOST:PORT DIRECTORY
60 """ % (__version__, crypto))
61 sys.exit(-1)
64 def testConnClient(conn):
65 b1 = os.urandom(1)
66 b10 = os.urandom(10)
67 b100 = os.urandom(100)
68 b1000 = os.urandom(1000)
69 conn.write(b1)
70 conn.write(b10)
71 conn.write(b100)
72 conn.write(b1000)
73 assert(conn.read(min=1, max=1) == b1)
74 assert(conn.read(min=10, max=10) == b10)
75 assert(conn.read(min=100, max=100) == b100)
76 assert(conn.read(min=1000, max=1000) == b1000)
78 def clientTestCmd(argv):
80 address = argv[0]
81 dir = argv[1]
83 #Split address into hostname/port tuple
84 address = address.split(":")
85 address = ( address[0], int(address[1]) )
87 def connect():
88 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
89 if hasattr(sock, 'settimeout'): #It's a python 2.3 feature
90 sock.settimeout(5)
91 sock.connect(address)
92 c = TLSConnection(sock)
93 return c
95 test = 0
97 badFault = False
99 print("Test 0 - anonymous handshake")
100 connection = connect()
101 connection.handshakeClientAnonymous()
102 testConnClient(connection)
103 connection.close()
105 print("Test 1 - good X509 (plus SNI)")
106 connection = connect()
107 connection.handshakeClientCert(serverName=address[0])
108 testConnClient(connection)
109 assert(isinstance(connection.session.serverCertChain, X509CertChain))
110 assert(connection.session.serverName == address[0])
111 connection.close()
113 print("Test 1.a - good X509, SSLv3")
114 connection = connect()
115 settings = HandshakeSettings()
116 settings.minVersion = (3,0)
117 settings.maxVersion = (3,0)
118 connection.handshakeClientCert(settings=settings)
119 testConnClient(connection)
120 assert(isinstance(connection.session.serverCertChain, X509CertChain))
121 connection.close()
123 print("Test 1.b - good X509, RC4-MD5")
124 connection = connect()
125 settings = HandshakeSettings()
126 settings.macNames = ["md5"]
127 connection.handshakeClientCert(settings=settings)
128 testConnClient(connection)
129 assert(isinstance(connection.session.serverCertChain, X509CertChain))
130 assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5)
131 connection.close()
133 if tackpyLoaded:
135 settings = HandshakeSettings()
136 settings.useExperimentalTackExtension = True
138 print("Test 2.a - good X.509, TACK")
139 connection = connect()
140 connection.handshakeClientCert(settings=settings)
141 assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3")
142 assert(connection.session.tackExt.activation_flags == 1)
143 testConnClient(connection)
144 connection.close()
146 print("Test 2.b - good X.509, TACK unrelated to cert chain")
147 connection = connect()
148 try:
149 connection.handshakeClientCert(settings=settings)
150 assert(False)
151 except TLSLocalAlert as alert:
152 if alert.description != AlertDescription.illegal_parameter:
153 raise
154 connection.close()
156 print("Test 3 - good SRP")
157 connection = connect()
158 connection.handshakeClientSRP("test", "password")
159 testConnClient(connection)
160 connection.close()
162 print("Test 4 - SRP faults")
163 for fault in Fault.clientSrpFaults + Fault.genericFaults:
164 connection = connect()
165 connection.fault = fault
166 try:
167 connection.handshakeClientSRP("test", "password")
168 print(" Good Fault %s" % (Fault.faultNames[fault]))
169 except TLSFaultError as e:
170 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
171 badFault = True
173 print("Test 6 - good SRP: with X.509 certificate, TLSv1.0")
174 settings = HandshakeSettings()
175 settings.minVersion = (3,1)
176 settings.maxVersion = (3,1)
177 connection = connect()
178 connection.handshakeClientSRP("test", "password", settings=settings)
179 assert(isinstance(connection.session.serverCertChain, X509CertChain))
180 testConnClient(connection)
181 connection.close()
183 print("Test 7 - X.509 with SRP faults")
184 for fault in Fault.clientSrpFaults + Fault.genericFaults:
185 connection = connect()
186 connection.fault = fault
187 try:
188 connection.handshakeClientSRP("test", "password")
189 print(" Good Fault %s" % (Fault.faultNames[fault]))
190 except TLSFaultError as e:
191 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
192 badFault = True
194 print("Test 11 - X.509 faults")
195 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
196 connection = connect()
197 connection.fault = fault
198 try:
199 connection.handshakeClientCert()
200 print(" Good Fault %s" % (Fault.faultNames[fault]))
201 except TLSFaultError as e:
202 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
203 badFault = True
205 print("Test 14 - good mutual X509")
206 x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
207 x509Chain = X509CertChain([x509Cert])
208 s = open(os.path.join(dir, "clientX509Key.pem")).read()
209 x509Key = parsePEMKey(s, private=True)
211 connection = connect()
212 connection.handshakeClientCert(x509Chain, x509Key)
213 testConnClient(connection)
214 assert(isinstance(connection.session.serverCertChain, X509CertChain))
215 connection.close()
217 print("Test 14.a - good mutual X509, SSLv3")
218 connection = connect()
219 settings = HandshakeSettings()
220 settings.minVersion = (3,0)
221 settings.maxVersion = (3,0)
222 connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
223 testConnClient(connection)
224 assert(isinstance(connection.session.serverCertChain, X509CertChain))
225 connection.close()
227 print("Test 15 - mutual X.509 faults")
228 for fault in Fault.clientCertFaults + Fault.genericFaults:
229 connection = connect()
230 connection.fault = fault
231 try:
232 connection.handshakeClientCert(x509Chain, x509Key)
233 print(" Good Fault %s" % (Fault.faultNames[fault]))
234 except TLSFaultError as e:
235 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
236 badFault = True
238 print("Test 18 - good SRP, prepare to resume... (plus SNI)")
239 connection = connect()
240 connection.handshakeClientSRP("test", "password", serverName=address[0])
241 testConnClient(connection)
242 connection.close()
243 session = connection.session
245 print("Test 19 - resumption (plus SNI)")
246 connection = connect()
247 connection.handshakeClientSRP("test", "garbage", serverName=address[0],
248 session=session)
249 testConnClient(connection)
250 #Don't close! -- see below
252 print("Test 20 - invalidated resumption (plus SNI)")
253 connection.sock.close() #Close the socket without a close_notify!
254 connection = connect()
255 try:
256 connection.handshakeClientSRP("test", "garbage",
257 serverName=address[0], session=session)
258 assert(False)
259 except TLSRemoteAlert as alert:
260 if alert.description != AlertDescription.bad_record_mac:
261 raise
262 connection.close()
264 print("Test 21 - HTTPS test X.509")
265 address = address[0], address[1]+1
266 if hasattr(socket, "timeout"):
267 timeoutEx = socket.timeout
268 else:
269 timeoutEx = socket.error
270 while 1:
271 try:
272 time.sleep(2)
273 htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8")
274 fingerprint = None
275 for y in range(2):
276 checker =Checker(x509Fingerprint=fingerprint)
277 h = HTTPTLSConnection(\
278 address[0], address[1], checker=checker)
279 for x in range(3):
280 h.request("GET", "/index.html")
281 r = h.getresponse()
282 assert(r.status == 200)
283 b = bytearray(r.read())
284 assert(b == htmlBody)
285 fingerprint = h.tlsSession.serverCertChain.getFingerprint()
286 assert(fingerprint)
287 time.sleep(2)
288 break
289 except timeoutEx:
290 print("timeout, retrying...")
291 pass
293 address = address[0], address[1]+1
295 implementations = []
296 if m2cryptoLoaded:
297 implementations.append("openssl")
298 if pycryptoLoaded:
299 implementations.append("pycrypto")
300 implementations.append("python")
302 print("Test 22 - different ciphers, TLSv1.0")
303 for implementation in implementations:
304 for cipher in ["aes128", "aes256", "rc4"]:
306 print("Test 22:", end=' ')
307 connection = connect()
309 settings = HandshakeSettings()
310 settings.cipherNames = [cipher]
311 settings.cipherImplementations = [implementation, "python"]
312 settings.minVersion = (3,1)
313 settings.maxVersion = (3,1)
314 connection.handshakeClientCert(settings=settings)
315 testConnClient(connection)
316 print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
317 connection.close()
319 print("Test 23 - throughput test")
320 for implementation in implementations:
321 for cipher in ["aes128", "aes256", "3des", "rc4"]:
322 if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
323 continue
325 print("Test 23:", end=' ')
326 connection = connect()
328 settings = HandshakeSettings()
329 settings.cipherNames = [cipher]
330 settings.cipherImplementations = [implementation, "python"]
331 connection.handshakeClientCert(settings=settings)
332 print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ')
334 startTime = time.clock()
335 connection.write(b"hello"*10000)
336 h = connection.read(min=50000, max=50000)
337 stopTime = time.clock()
338 if stopTime-startTime:
339 print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)))
340 else:
341 print("100K exchanged very fast")
343 assert(h == b"hello"*10000)
344 connection.close()
346 print("Test 24.a - Next-Protocol Client Negotiation")
347 connection = connect()
348 connection.handshakeClientCert(nextProtos=[b"http/1.1"])
349 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
350 assert(connection.next_proto == b'http/1.1')
351 connection.close()
353 print("Test 24.b - Next-Protocol Client Negotiation")
354 connection = connect()
355 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
356 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
357 assert(connection.next_proto == b'spdy/2')
358 connection.close()
360 print("Test 24.c - Next-Protocol Client Negotiation")
361 connection = connect()
362 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
363 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
364 assert(connection.next_proto == b'spdy/2')
365 connection.close()
367 print("Test 24.d - Next-Protocol Client Negotiation")
368 connection = connect()
369 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
370 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
371 assert(connection.next_proto == b'spdy/2')
372 connection.close()
374 print("Test 24.e - Next-Protocol Client Negotiation")
375 connection = connect()
376 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
377 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
378 assert(connection.next_proto == b'spdy/3')
379 connection.close()
381 print("Test 24.f - Next-Protocol Client Negotiation")
382 connection = connect()
383 connection.handshakeClientCert(nextProtos=[b"http/1.1"])
384 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
385 assert(connection.next_proto == b'http/1.1')
386 connection.close()
388 print("Test 24.g - Next-Protocol Client Negotiation")
389 connection = connect()
390 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
391 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
392 assert(connection.next_proto == b'spdy/2')
393 connection.close()
395 print('Test 25 - good standard XMLRPC https client')
396 time.sleep(2) # Hack for lack of ability to set timeout here
397 address = address[0], address[1]+1
398 server = xmlrpclib.Server('https://%s:%s' % address)
399 assert server.add(1,2) == 3
400 assert server.pow(2,4) == 16
402 print('Test 26 - good tlslite XMLRPC client')
403 transport = XMLRPCTransport(ignoreAbruptClose=True)
404 server = xmlrpclib.Server('https://%s:%s' % address, transport)
405 assert server.add(1,2) == 3
406 assert server.pow(2,4) == 16
408 print('Test 27 - good XMLRPC ignored protocol')
409 server = xmlrpclib.Server('http://%s:%s' % address, transport)
410 assert server.add(1,2) == 3
411 assert server.pow(2,4) == 16
413 print("Test 28 - Internet servers test")
414 try:
415 i = IMAP4_TLS("cyrus.andrew.cmu.edu")
416 i.login("anonymous", "anonymous@anonymous.net")
417 i.logout()
418 print("Test 28: IMAP4 good")
419 p = POP3_TLS("pop.gmail.com")
420 p.quit()
421 print("Test 29: POP3 good")
422 except socket.error as e:
423 print("Non-critical error: socket error trying to reach internet server: ", e)
425 if not badFault:
426 print("Test succeeded")
427 else:
428 print("Test failed")
432 def testConnServer(connection):
433 count = 0
434 while 1:
435 s = connection.read()
436 count += len(s)
437 if len(s) == 0:
438 break
439 connection.write(s)
440 if count == 1111:
441 break
443 def serverTestCmd(argv):
445 address = argv[0]
446 dir = argv[1]
448 #Split address into hostname/port tuple
449 address = address.split(":")
450 address = ( address[0], int(address[1]) )
452 #Connect to server
453 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
454 lsock.bind(address)
455 lsock.listen(5)
457 def connect():
458 return TLSConnection(lsock.accept()[0])
460 x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
461 x509Chain = X509CertChain([x509Cert])
462 s = open(os.path.join(dir, "serverX509Key.pem")).read()
463 x509Key = parsePEMKey(s, private=True)
465 print("Test 0 - Anonymous server handshake")
466 connection = connect()
467 connection.handshakeServer(anon=True)
468 testConnServer(connection)
469 connection.close()
471 print("Test 1 - good X.509")
472 connection = connect()
473 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
474 assert(connection.session.serverName == address[0])
475 testConnServer(connection)
476 connection.close()
478 print("Test 1.a - good X.509, SSL v3")
479 connection = connect()
480 settings = HandshakeSettings()
481 settings.minVersion = (3,0)
482 settings.maxVersion = (3,0)
483 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
484 testConnServer(connection)
485 connection.close()
487 print("Test 1.b - good X.509, RC4-MD5")
488 connection = connect()
489 settings = HandshakeSettings()
490 settings.macNames = ["sha", "md5"]
491 settings.cipherNames = ["rc4"]
492 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
493 testConnServer(connection)
494 connection.close()
496 if tackpyLoaded:
497 tack = Tack.createFromPem(open("./TACK1.pem", "rU").read())
498 tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read())
500 settings = HandshakeSettings()
501 settings.useExperimentalTackExtension = True
503 print("Test 2.a - good X.509, TACK")
504 connection = connect()
505 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
506 tacks=[tack], activationFlags=1, settings=settings)
507 testConnServer(connection)
508 connection.close()
510 print("Test 2.b - good X.509, TACK unrelated to cert chain")
511 connection = connect()
512 try:
513 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
514 tacks=[tackUnrelated], settings=settings)
515 assert(False)
516 except TLSRemoteAlert as alert:
517 if alert.description != AlertDescription.illegal_parameter:
518 raise
520 print("Test 3 - good SRP")
521 verifierDB = VerifierDB()
522 verifierDB.create()
523 entry = VerifierDB.makeVerifier("test", "password", 1536)
524 verifierDB["test"] = entry
526 connection = connect()
527 connection.handshakeServer(verifierDB=verifierDB)
528 testConnServer(connection)
529 connection.close()
531 print("Test 4 - SRP faults")
532 for fault in Fault.clientSrpFaults + Fault.genericFaults:
533 connection = connect()
534 connection.fault = fault
535 try:
536 connection.handshakeServer(verifierDB=verifierDB)
537 assert()
538 except:
539 pass
540 connection.close()
542 print("Test 6 - good SRP: with X.509 cert")
543 connection = connect()
544 connection.handshakeServer(verifierDB=verifierDB, \
545 certChain=x509Chain, privateKey=x509Key)
546 testConnServer(connection)
547 connection.close()
549 print("Test 7 - X.509 with SRP faults")
550 for fault in Fault.clientSrpFaults + Fault.genericFaults:
551 connection = connect()
552 connection.fault = fault
553 try:
554 connection.handshakeServer(verifierDB=verifierDB, \
555 certChain=x509Chain, privateKey=x509Key)
556 assert()
557 except:
558 pass
559 connection.close()
561 print("Test 11 - X.509 faults")
562 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
563 connection = connect()
564 connection.fault = fault
565 try:
566 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
567 assert()
568 except:
569 pass
570 connection.close()
572 print("Test 14 - good mutual X.509")
573 connection = connect()
574 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
575 testConnServer(connection)
576 assert(isinstance(connection.session.serverCertChain, X509CertChain))
577 connection.close()
579 print("Test 14a - good mutual X.509, SSLv3")
580 connection = connect()
581 settings = HandshakeSettings()
582 settings.minVersion = (3,0)
583 settings.maxVersion = (3,0)
584 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
585 testConnServer(connection)
586 assert(isinstance(connection.session.serverCertChain, X509CertChain))
587 connection.close()
589 print("Test 15 - mutual X.509 faults")
590 for fault in Fault.clientCertFaults + Fault.genericFaults:
591 connection = connect()
592 connection.fault = fault
593 try:
594 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
595 assert()
596 except:
597 pass
598 connection.close()
600 print("Test 18 - good SRP, prepare to resume")
601 sessionCache = SessionCache()
602 connection = connect()
603 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
604 assert(connection.session.serverName == address[0])
605 testConnServer(connection)
606 connection.close()
608 print("Test 19 - resumption")
609 connection = connect()
610 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
611 assert(connection.session.serverName == address[0])
612 testConnServer(connection)
613 #Don't close! -- see next test
615 print("Test 20 - invalidated resumption")
616 try:
617 connection.read(min=1, max=1)
618 assert() #Client is going to close the socket without a close_notify
619 except TLSAbruptCloseError as e:
620 pass
621 connection = connect()
622 try:
623 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
624 except TLSLocalAlert as alert:
625 if alert.description != AlertDescription.bad_record_mac:
626 raise
627 connection.close()
629 print("Test 21 - HTTPS test X.509")
631 #Close the current listening socket
632 lsock.close()
634 #Create and run an HTTP Server using TLSSocketServerMixIn
635 class MyHTTPServer(TLSSocketServerMixIn,
636 HTTPServer):
637 def handshake(self, tlsConnection):
638 tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
639 return True
640 cd = os.getcwd()
641 os.chdir(dir)
642 address = address[0], address[1]+1
643 httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
644 for x in range(6):
645 httpd.handle_request()
646 httpd.server_close()
647 cd = os.chdir(cd)
649 #Re-connect the listening socket
650 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
651 address = address[0], address[1]+1
652 lsock.bind(address)
653 lsock.listen(5)
655 implementations = []
656 if m2cryptoLoaded:
657 implementations.append("openssl")
658 if pycryptoLoaded:
659 implementations.append("pycrypto")
660 implementations.append("python")
662 print("Test 22 - different ciphers")
663 for implementation in ["python"] * len(implementations):
664 for cipher in ["aes128", "aes256", "rc4"]:
666 print("Test 22:", end=' ')
667 connection = connect()
669 settings = HandshakeSettings()
670 settings.cipherNames = [cipher]
671 settings.cipherImplementations = [implementation, "python"]
673 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
674 settings=settings)
675 print(connection.getCipherName(), connection.getCipherImplementation())
676 testConnServer(connection)
677 connection.close()
679 print("Test 23 - throughput test")
680 for implementation in implementations:
681 for cipher in ["aes128", "aes256", "3des", "rc4"]:
682 if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
683 continue
685 print("Test 23:", end=' ')
686 connection = connect()
688 settings = HandshakeSettings()
689 settings.cipherNames = [cipher]
690 settings.cipherImplementations = [implementation, "python"]
692 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
693 settings=settings)
694 print(connection.getCipherName(), connection.getCipherImplementation())
695 h = connection.read(min=50000, max=50000)
696 assert(h == b"hello"*10000)
697 connection.write(h)
698 connection.close()
700 print("Test 24.a - Next-Protocol Server Negotiation")
701 connection = connect()
702 settings = HandshakeSettings()
703 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
704 settings=settings, nextProtos=[b"http/1.1"])
705 testConnServer(connection)
706 connection.close()
708 print("Test 24.b - Next-Protocol Server Negotiation")
709 connection = connect()
710 settings = HandshakeSettings()
711 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
712 settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
713 testConnServer(connection)
714 connection.close()
716 print("Test 24.c - Next-Protocol Server Negotiation")
717 connection = connect()
718 settings = HandshakeSettings()
719 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
720 settings=settings, nextProtos=[b"http/1.1", b"spdy/2"])
721 testConnServer(connection)
722 connection.close()
724 print("Test 24.d - Next-Protocol Server Negotiation")
725 connection = connect()
726 settings = HandshakeSettings()
727 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
728 settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
729 testConnServer(connection)
730 connection.close()
732 print("Test 24.e - Next-Protocol Server Negotiation")
733 connection = connect()
734 settings = HandshakeSettings()
735 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
736 settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"])
737 testConnServer(connection)
738 connection.close()
740 print("Test 24.f - Next-Protocol Server Negotiation")
741 connection = connect()
742 settings = HandshakeSettings()
743 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
744 settings=settings, nextProtos=[b"spdy/3", b"spdy/2"])
745 testConnServer(connection)
746 connection.close()
748 print("Test 24.g - Next-Protocol Server Negotiation")
749 connection = connect()
750 settings = HandshakeSettings()
751 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
752 settings=settings, nextProtos=[])
753 testConnServer(connection)
754 connection.close()
756 print("Tests 25-27 - XMLRPXC server")
757 address = address[0], address[1]+1
758 class Server(TLSXMLRPCServer):
760 def handshake(self, tlsConnection):
761 try:
762 tlsConnection.handshakeServer(certChain=x509Chain,
763 privateKey=x509Key,
764 sessionCache=sessionCache)
765 tlsConnection.ignoreAbruptClose = True
766 return True
767 except TLSError as error:
768 print("Handshake failure:", str(error))
769 return False
771 class MyFuncs:
772 def pow(self, x, y): return pow(x, y)
773 def add(self, x, y): return x + y
775 server = Server(address)
776 server.register_instance(MyFuncs())
777 #sa = server.socket.getsockname()
778 #print "Serving HTTPS on", sa[0], "port", sa[1]
779 for i in range(6):
780 server.handle_request()
782 print("Test succeeded")
785 if __name__ == '__main__':
786 if len(sys.argv) < 2:
787 printUsage("Missing command")
788 elif sys.argv[1] == "client"[:len(sys.argv[1])]:
789 clientTestCmd(sys.argv[2:])
790 elif sys.argv[1] == "server"[:len(sys.argv[1])]:
791 serverTestCmd(sys.argv[2:])
792 else:
793 printUsage("Unknown command: %s" % sys.argv[1])