Extend the enterprise policy for SSL overrides to the "danger" command
[chromium-blink-merge.git] / third_party / tlslite / tests / tlstest.py
blob7985d232821ea136806bcb8518ba6df033af57b2
1 #!/usr/bin/env python
3 # Authors:
4 # Trevor Perrin
5 # Kees Bos - Added tests for XML-RPC
6 # Dimitris Moraitis - Anon ciphersuites
7 # Marcelo Fernandez - Added test for NPN
8 # Martin von Loewis - python 3 port
11 # See the LICENSE file for legal information regarding use of this file.
12 from __future__ import print_function
13 import sys
14 import os
15 import os.path
16 import socket
17 import time
18 import getopt
19 try:
20 from BaseHTTPServer import HTTPServer
21 from SimpleHTTPServer import SimpleHTTPRequestHandler
22 except ImportError:
23 from http.server import HTTPServer, SimpleHTTPRequestHandler
25 from tlslite import TLSConnection, Fault, HandshakeSettings, \
26 X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \
27 parsePEMKey, constants, \
28 AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \
29 POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \
30 Checker, __version__
32 from tlslite.errors import *
33 from tlslite.utils.cryptomath import prngName
34 try:
35 import xmlrpclib
36 except ImportError:
37 # Python 3
38 from xmlrpc import client as xmlrpclib
39 from tlslite import *
41 try:
42 from tack.structures.Tack import Tack
44 except ImportError:
45 pass
47 def printUsage(s=None):
48 if m2cryptoLoaded:
49 crypto = "M2Crypto/OpenSSL"
50 else:
51 crypto = "Python crypto"
52 if s:
53 print("ERROR: %s" % s)
54 print("""\ntls.py version %s (using %s)
56 Commands:
57 server HOST:PORT DIRECTORY
59 client HOST:PORT DIRECTORY
60 """ % (__version__, crypto))
61 sys.exit(-1)
64 def testConnClient(conn):
65 b1 = os.urandom(1)
66 b10 = os.urandom(10)
67 b100 = os.urandom(100)
68 b1000 = os.urandom(1000)
69 conn.write(b1)
70 conn.write(b10)
71 conn.write(b100)
72 conn.write(b1000)
73 assert(conn.read(min=1, max=1) == b1)
74 assert(conn.read(min=10, max=10) == b10)
75 assert(conn.read(min=100, max=100) == b100)
76 assert(conn.read(min=1000, max=1000) == b1000)
78 def clientTestCmd(argv):
80 address = argv[0]
81 dir = argv[1]
83 #Split address into hostname/port tuple
84 address = address.split(":")
85 address = ( address[0], int(address[1]) )
87 def connect():
88 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
89 if hasattr(sock, 'settimeout'): #It's a python 2.3 feature
90 sock.settimeout(5)
91 sock.connect(address)
92 c = TLSConnection(sock)
93 return c
95 test = 0
97 badFault = False
99 print("Test 0 - anonymous handshake")
100 connection = connect()
101 connection.handshakeClientAnonymous()
102 testConnClient(connection)
103 connection.close()
105 print("Test 1 - good X509 (plus SNI)")
106 connection = connect()
107 connection.handshakeClientCert(serverName=address[0])
108 testConnClient(connection)
109 assert(isinstance(connection.session.serverCertChain, X509CertChain))
110 assert(connection.session.serverName == address[0])
111 connection.close()
113 print("Test 1.a - good X509, SSLv3")
114 connection = connect()
115 settings = HandshakeSettings()
116 settings.minVersion = (3,0)
117 settings.maxVersion = (3,0)
118 connection.handshakeClientCert(settings=settings)
119 testConnClient(connection)
120 assert(isinstance(connection.session.serverCertChain, X509CertChain))
121 connection.close()
123 print("Test 1.b - good X509, RC4-MD5")
124 connection = connect()
125 settings = HandshakeSettings()
126 settings.macNames = ["md5"]
127 connection.handshakeClientCert(settings=settings)
128 testConnClient(connection)
129 assert(isinstance(connection.session.serverCertChain, X509CertChain))
130 assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5)
131 connection.close()
133 if tackpyLoaded:
135 settings = HandshakeSettings()
136 settings.useExperimentalTackExtension = True
138 print("Test 2.a - good X.509, TACK")
139 connection = connect()
140 connection.handshakeClientCert(settings=settings)
141 assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3")
142 assert(connection.session.tackExt.activation_flags == 1)
143 testConnClient(connection)
144 connection.close()
146 print("Test 2.b - good X.509, TACK unrelated to cert chain")
147 connection = connect()
148 try:
149 connection.handshakeClientCert(settings=settings)
150 assert(False)
151 except TLSLocalAlert as alert:
152 if alert.description != AlertDescription.illegal_parameter:
153 raise
154 connection.close()
156 print("Test 3 - good SRP")
157 connection = connect()
158 connection.handshakeClientSRP("test", "password")
159 testConnClient(connection)
160 connection.close()
162 print("Test 4 - SRP faults")
163 for fault in Fault.clientSrpFaults + Fault.genericFaults:
164 connection = connect()
165 connection.fault = fault
166 try:
167 connection.handshakeClientSRP("test", "password")
168 print(" Good Fault %s" % (Fault.faultNames[fault]))
169 except TLSFaultError as e:
170 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
171 badFault = True
173 print("Test 6 - good SRP: with X.509 certificate, TLSv1.0")
174 settings = HandshakeSettings()
175 settings.minVersion = (3,1)
176 settings.maxVersion = (3,1)
177 connection = connect()
178 connection.handshakeClientSRP("test", "password", settings=settings)
179 assert(isinstance(connection.session.serverCertChain, X509CertChain))
180 testConnClient(connection)
181 connection.close()
183 print("Test 7 - X.509 with SRP faults")
184 for fault in Fault.clientSrpFaults + Fault.genericFaults:
185 connection = connect()
186 connection.fault = fault
187 try:
188 connection.handshakeClientSRP("test", "password")
189 print(" Good Fault %s" % (Fault.faultNames[fault]))
190 except TLSFaultError as e:
191 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
192 badFault = True
194 print("Test 11 - X.509 faults")
195 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
196 connection = connect()
197 connection.fault = fault
198 try:
199 connection.handshakeClientCert()
200 print(" Good Fault %s" % (Fault.faultNames[fault]))
201 except TLSFaultError as e:
202 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
203 badFault = True
205 print("Test 14 - good mutual X509")
206 x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
207 x509Chain = X509CertChain([x509Cert])
208 s = open(os.path.join(dir, "clientX509Key.pem")).read()
209 x509Key = parsePEMKey(s, private=True)
211 connection = connect()
212 connection.handshakeClientCert(x509Chain, x509Key)
213 testConnClient(connection)
214 assert(isinstance(connection.session.serverCertChain, X509CertChain))
215 connection.close()
217 print("Test 14.a - good mutual X509, SSLv3")
218 connection = connect()
219 settings = HandshakeSettings()
220 settings.minVersion = (3,0)
221 settings.maxVersion = (3,0)
222 connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
223 testConnClient(connection)
224 assert(isinstance(connection.session.serverCertChain, X509CertChain))
225 connection.close()
227 print("Test 15 - mutual X.509 faults")
228 for fault in Fault.clientCertFaults + Fault.genericFaults:
229 connection = connect()
230 connection.fault = fault
231 try:
232 connection.handshakeClientCert(x509Chain, x509Key)
233 print(" Good Fault %s" % (Fault.faultNames[fault]))
234 except TLSFaultError as e:
235 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
236 badFault = True
238 print("Test 18 - good SRP, prepare to resume... (plus SNI)")
239 connection = connect()
240 connection.handshakeClientSRP("test", "password", serverName=address[0])
241 testConnClient(connection)
242 connection.close()
243 session = connection.session
245 print("Test 19 - resumption (plus SNI)")
246 connection = connect()
247 connection.handshakeClientSRP("test", "garbage", serverName=address[0],
248 session=session)
249 testConnClient(connection)
250 #Don't close! -- see below
252 print("Test 20 - invalidated resumption (plus SNI)")
253 connection.sock.close() #Close the socket without a close_notify!
254 connection = connect()
255 try:
256 connection.handshakeClientSRP("test", "garbage",
257 serverName=address[0], session=session)
258 assert(False)
259 except TLSRemoteAlert as alert:
260 if alert.description != AlertDescription.bad_record_mac:
261 raise
262 connection.close()
264 print("Test 21 - HTTPS test X.509")
265 address = address[0], address[1]+1
266 if hasattr(socket, "timeout"):
267 timeoutEx = socket.timeout
268 else:
269 timeoutEx = socket.error
270 while 1:
271 try:
272 time.sleep(2)
273 htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8")
274 fingerprint = None
275 for y in range(2):
276 checker =Checker(x509Fingerprint=fingerprint)
277 h = HTTPTLSConnection(\
278 address[0], address[1], checker=checker)
279 for x in range(3):
280 h.request("GET", "/index.html")
281 r = h.getresponse()
282 assert(r.status == 200)
283 b = bytearray(r.read())
284 assert(b == htmlBody)
285 fingerprint = h.tlsSession.serverCertChain.getFingerprint()
286 assert(fingerprint)
287 time.sleep(2)
288 break
289 except timeoutEx:
290 print("timeout, retrying...")
291 pass
293 address = address[0], address[1]+1
295 implementations = []
296 if m2cryptoLoaded:
297 implementations.append("openssl")
298 if pycryptoLoaded:
299 implementations.append("pycrypto")
300 implementations.append("python")
302 print("Test 22 - different ciphers, TLSv1.0")
303 for implementation in implementations:
304 for cipher in ["aes128", "aes256", "rc4"]:
306 print("Test 22:", end=' ')
307 connection = connect()
309 settings = HandshakeSettings()
310 settings.cipherNames = [cipher]
311 settings.cipherImplementations = [implementation, "python"]
312 settings.minVersion = (3,1)
313 settings.maxVersion = (3,1)
314 connection.handshakeClientCert(settings=settings)
315 testConnClient(connection)
316 print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
317 connection.close()
319 print("Test 23 - throughput test")
320 for implementation in implementations:
321 for cipher in ["aes128gcm", "aes128", "aes256", "3des", "rc4"]:
322 if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
323 continue
324 if cipher == "aes128gcm" and implementation not in ("pycrypto", "python"):
325 continue
327 print("Test 23:", end=' ')
328 connection = connect()
330 settings = HandshakeSettings()
331 settings.cipherNames = [cipher]
332 settings.cipherImplementations = [implementation, "python"]
333 connection.handshakeClientCert(settings=settings)
334 print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ')
336 startTime = time.clock()
337 connection.write(b"hello"*10000)
338 h = connection.read(min=50000, max=50000)
339 stopTime = time.clock()
340 if stopTime-startTime:
341 print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)))
342 else:
343 print("100K exchanged very fast")
345 assert(h == b"hello"*10000)
346 connection.close()
348 print("Test 24.a - Next-Protocol Client Negotiation")
349 connection = connect()
350 connection.handshakeClientCert(nextProtos=[b"http/1.1"])
351 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
352 assert(connection.next_proto == b'http/1.1')
353 connection.close()
355 print("Test 24.b - Next-Protocol Client Negotiation")
356 connection = connect()
357 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
358 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
359 assert(connection.next_proto == b'spdy/2')
360 connection.close()
362 print("Test 24.c - Next-Protocol Client Negotiation")
363 connection = connect()
364 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
365 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
366 assert(connection.next_proto == b'spdy/2')
367 connection.close()
369 print("Test 24.d - Next-Protocol Client Negotiation")
370 connection = connect()
371 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
372 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
373 assert(connection.next_proto == b'spdy/2')
374 connection.close()
376 print("Test 24.e - Next-Protocol Client Negotiation")
377 connection = connect()
378 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
379 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
380 assert(connection.next_proto == b'spdy/3')
381 connection.close()
383 print("Test 24.f - Next-Protocol Client Negotiation")
384 connection = connect()
385 connection.handshakeClientCert(nextProtos=[b"http/1.1"])
386 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
387 assert(connection.next_proto == b'http/1.1')
388 connection.close()
390 print("Test 24.g - Next-Protocol Client Negotiation")
391 connection = connect()
392 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
393 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
394 assert(connection.next_proto == b'spdy/2')
395 connection.close()
397 print('Test 25 - good standard XMLRPC https client')
398 time.sleep(2) # Hack for lack of ability to set timeout here
399 address = address[0], address[1]+1
400 server = xmlrpclib.Server('https://%s:%s' % address)
401 assert server.add(1,2) == 3
402 assert server.pow(2,4) == 16
404 print('Test 26 - good tlslite XMLRPC client')
405 transport = XMLRPCTransport(ignoreAbruptClose=True)
406 server = xmlrpclib.Server('https://%s:%s' % address, transport)
407 assert server.add(1,2) == 3
408 assert server.pow(2,4) == 16
410 print('Test 27 - good XMLRPC ignored protocol')
411 server = xmlrpclib.Server('http://%s:%s' % address, transport)
412 assert server.add(1,2) == 3
413 assert server.pow(2,4) == 16
415 print("Test 28 - Internet servers test")
416 try:
417 i = IMAP4_TLS("cyrus.andrew.cmu.edu")
418 i.login("anonymous", "anonymous@anonymous.net")
419 i.logout()
420 print("Test 28: IMAP4 good")
421 p = POP3_TLS("pop.gmail.com")
422 p.quit()
423 print("Test 29: POP3 good")
424 except socket.error as e:
425 print("Non-critical error: socket error trying to reach internet server: ", e)
427 if not badFault:
428 print("Test succeeded")
429 else:
430 print("Test failed")
434 def testConnServer(connection):
435 count = 0
436 while 1:
437 s = connection.read()
438 count += len(s)
439 if len(s) == 0:
440 break
441 connection.write(s)
442 if count == 1111:
443 break
445 def serverTestCmd(argv):
447 address = argv[0]
448 dir = argv[1]
450 #Split address into hostname/port tuple
451 address = address.split(":")
452 address = ( address[0], int(address[1]) )
454 #Connect to server
455 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
456 lsock.bind(address)
457 lsock.listen(5)
459 def connect():
460 return TLSConnection(lsock.accept()[0])
462 x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
463 x509Chain = X509CertChain([x509Cert])
464 s = open(os.path.join(dir, "serverX509Key.pem")).read()
465 x509Key = parsePEMKey(s, private=True)
467 print("Test 0 - Anonymous server handshake")
468 connection = connect()
469 connection.handshakeServer(anon=True)
470 testConnServer(connection)
471 connection.close()
473 print("Test 1 - good X.509")
474 connection = connect()
475 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
476 assert(connection.session.serverName == address[0])
477 testConnServer(connection)
478 connection.close()
480 print("Test 1.a - good X.509, SSL v3")
481 connection = connect()
482 settings = HandshakeSettings()
483 settings.minVersion = (3,0)
484 settings.maxVersion = (3,0)
485 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
486 testConnServer(connection)
487 connection.close()
489 print("Test 1.b - good X.509, RC4-MD5")
490 connection = connect()
491 settings = HandshakeSettings()
492 settings.macNames = ["sha", "md5"]
493 settings.cipherNames = ["rc4"]
494 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
495 testConnServer(connection)
496 connection.close()
498 if tackpyLoaded:
499 tack = Tack.createFromPem(open("./TACK1.pem", "rU").read())
500 tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read())
502 settings = HandshakeSettings()
503 settings.useExperimentalTackExtension = True
505 print("Test 2.a - good X.509, TACK")
506 connection = connect()
507 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
508 tacks=[tack], activationFlags=1, settings=settings)
509 testConnServer(connection)
510 connection.close()
512 print("Test 2.b - good X.509, TACK unrelated to cert chain")
513 connection = connect()
514 try:
515 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
516 tacks=[tackUnrelated], settings=settings)
517 assert(False)
518 except TLSRemoteAlert as alert:
519 if alert.description != AlertDescription.illegal_parameter:
520 raise
522 print("Test 3 - good SRP")
523 verifierDB = VerifierDB()
524 verifierDB.create()
525 entry = VerifierDB.makeVerifier("test", "password", 1536)
526 verifierDB["test"] = entry
528 connection = connect()
529 connection.handshakeServer(verifierDB=verifierDB)
530 testConnServer(connection)
531 connection.close()
533 print("Test 4 - SRP faults")
534 for fault in Fault.clientSrpFaults + Fault.genericFaults:
535 connection = connect()
536 connection.fault = fault
537 try:
538 connection.handshakeServer(verifierDB=verifierDB)
539 assert()
540 except:
541 pass
542 connection.close()
544 print("Test 6 - good SRP: with X.509 cert")
545 connection = connect()
546 connection.handshakeServer(verifierDB=verifierDB, \
547 certChain=x509Chain, privateKey=x509Key)
548 testConnServer(connection)
549 connection.close()
551 print("Test 7 - X.509 with SRP faults")
552 for fault in Fault.clientSrpFaults + Fault.genericFaults:
553 connection = connect()
554 connection.fault = fault
555 try:
556 connection.handshakeServer(verifierDB=verifierDB, \
557 certChain=x509Chain, privateKey=x509Key)
558 assert()
559 except:
560 pass
561 connection.close()
563 print("Test 11 - X.509 faults")
564 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
565 connection = connect()
566 connection.fault = fault
567 try:
568 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
569 assert()
570 except:
571 pass
572 connection.close()
574 print("Test 14 - good mutual X.509")
575 connection = connect()
576 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
577 testConnServer(connection)
578 assert(isinstance(connection.session.serverCertChain, X509CertChain))
579 connection.close()
581 print("Test 14a - good mutual X.509, SSLv3")
582 connection = connect()
583 settings = HandshakeSettings()
584 settings.minVersion = (3,0)
585 settings.maxVersion = (3,0)
586 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
587 testConnServer(connection)
588 assert(isinstance(connection.session.serverCertChain, X509CertChain))
589 connection.close()
591 print("Test 15 - mutual X.509 faults")
592 for fault in Fault.clientCertFaults + Fault.genericFaults:
593 connection = connect()
594 connection.fault = fault
595 try:
596 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
597 assert()
598 except:
599 pass
600 connection.close()
602 print("Test 18 - good SRP, prepare to resume")
603 sessionCache = SessionCache()
604 connection = connect()
605 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
606 assert(connection.session.serverName == address[0])
607 testConnServer(connection)
608 connection.close()
610 print("Test 19 - resumption")
611 connection = connect()
612 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
613 assert(connection.session.serverName == address[0])
614 testConnServer(connection)
615 #Don't close! -- see next test
617 print("Test 20 - invalidated resumption")
618 try:
619 connection.read(min=1, max=1)
620 assert() #Client is going to close the socket without a close_notify
621 except TLSAbruptCloseError as e:
622 pass
623 connection = connect()
624 try:
625 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
626 except TLSLocalAlert as alert:
627 if alert.description != AlertDescription.bad_record_mac:
628 raise
629 connection.close()
631 print("Test 21 - HTTPS test X.509")
633 #Close the current listening socket
634 lsock.close()
636 #Create and run an HTTP Server using TLSSocketServerMixIn
637 class MyHTTPServer(TLSSocketServerMixIn,
638 HTTPServer):
639 def handshake(self, tlsConnection):
640 tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
641 return True
642 cd = os.getcwd()
643 os.chdir(dir)
644 address = address[0], address[1]+1
645 httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
646 for x in range(6):
647 httpd.handle_request()
648 httpd.server_close()
649 cd = os.chdir(cd)
651 #Re-connect the listening socket
652 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
653 address = address[0], address[1]+1
654 lsock.bind(address)
655 lsock.listen(5)
657 implementations = []
658 if m2cryptoLoaded:
659 implementations.append("openssl")
660 if pycryptoLoaded:
661 implementations.append("pycrypto")
662 implementations.append("python")
664 print("Test 22 - different ciphers")
665 for implementation in ["python"] * len(implementations):
666 for cipher in ["aes128", "aes256", "rc4"]:
668 print("Test 22:", end=' ')
669 connection = connect()
671 settings = HandshakeSettings()
672 settings.cipherNames = [cipher]
673 settings.cipherImplementations = [implementation, "python"]
675 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
676 settings=settings)
677 print(connection.getCipherName(), connection.getCipherImplementation())
678 testConnServer(connection)
679 connection.close()
681 print("Test 23 - throughput test")
682 for implementation in implementations:
683 for cipher in ["aes128gcm", "aes128", "aes256", "3des", "rc4"]:
684 if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
685 continue
686 if cipher == "aes128gcm" and implementation not in ("pycrypto", "python"):
687 continue
689 print("Test 23:", end=' ')
690 connection = connect()
692 settings = HandshakeSettings()
693 settings.cipherNames = [cipher]
694 settings.cipherImplementations = [implementation, "python"]
696 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
697 settings=settings)
698 print(connection.getCipherName(), connection.getCipherImplementation())
699 h = connection.read(min=50000, max=50000)
700 assert(h == b"hello"*10000)
701 connection.write(h)
702 connection.close()
704 print("Test 24.a - Next-Protocol Server Negotiation")
705 connection = connect()
706 settings = HandshakeSettings()
707 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
708 settings=settings, nextProtos=[b"http/1.1"])
709 testConnServer(connection)
710 connection.close()
712 print("Test 24.b - Next-Protocol Server Negotiation")
713 connection = connect()
714 settings = HandshakeSettings()
715 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
716 settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
717 testConnServer(connection)
718 connection.close()
720 print("Test 24.c - Next-Protocol Server Negotiation")
721 connection = connect()
722 settings = HandshakeSettings()
723 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
724 settings=settings, nextProtos=[b"http/1.1", b"spdy/2"])
725 testConnServer(connection)
726 connection.close()
728 print("Test 24.d - Next-Protocol Server Negotiation")
729 connection = connect()
730 settings = HandshakeSettings()
731 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
732 settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
733 testConnServer(connection)
734 connection.close()
736 print("Test 24.e - Next-Protocol Server Negotiation")
737 connection = connect()
738 settings = HandshakeSettings()
739 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
740 settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"])
741 testConnServer(connection)
742 connection.close()
744 print("Test 24.f - Next-Protocol Server Negotiation")
745 connection = connect()
746 settings = HandshakeSettings()
747 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
748 settings=settings, nextProtos=[b"spdy/3", b"spdy/2"])
749 testConnServer(connection)
750 connection.close()
752 print("Test 24.g - Next-Protocol Server Negotiation")
753 connection = connect()
754 settings = HandshakeSettings()
755 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
756 settings=settings, nextProtos=[])
757 testConnServer(connection)
758 connection.close()
760 print("Tests 25-27 - XMLRPXC server")
761 address = address[0], address[1]+1
762 class Server(TLSXMLRPCServer):
764 def handshake(self, tlsConnection):
765 try:
766 tlsConnection.handshakeServer(certChain=x509Chain,
767 privateKey=x509Key,
768 sessionCache=sessionCache)
769 tlsConnection.ignoreAbruptClose = True
770 return True
771 except TLSError as error:
772 print("Handshake failure:", str(error))
773 return False
775 class MyFuncs:
776 def pow(self, x, y): return pow(x, y)
777 def add(self, x, y): return x + y
779 server = Server(address)
780 server.register_instance(MyFuncs())
781 #sa = server.socket.getsockname()
782 #print "Serving HTTPS on", sa[0], "port", sa[1]
783 for i in range(6):
784 server.handle_request()
786 print("Test succeeded")
789 if __name__ == '__main__':
790 if len(sys.argv) < 2:
791 printUsage("Missing command")
792 elif sys.argv[1] == "client"[:len(sys.argv[1])]:
793 clientTestCmd(sys.argv[2:])
794 elif sys.argv[1] == "server"[:len(sys.argv[1])]:
795 serverTestCmd(sys.argv[2:])
796 else:
797 printUsage("Unknown command: %s" % sys.argv[1])