5 # Kees Bos - Added tests for XML-RPC
6 # Dimitris Moraitis - Anon ciphersuites
7 # Marcelo Fernandez - Added test for NPN
8 # Martin von Loewis - python 3 port
11 # See the LICENSE file for legal information regarding use of this file.
12 from __future__
import print_function
20 from BaseHTTPServer
import HTTPServer
21 from SimpleHTTPServer
import SimpleHTTPRequestHandler
23 from http
.server
import HTTPServer
, SimpleHTTPRequestHandler
25 from tlslite
import TLSConnection
, Fault
, HandshakeSettings
, \
26 X509
, X509CertChain
, IMAP4_TLS
, VerifierDB
, Session
, SessionCache
, \
27 parsePEMKey
, constants
, \
28 AlertDescription
, HTTPTLSConnection
, TLSSocketServerMixIn
, \
29 POP3_TLS
, m2cryptoLoaded
, pycryptoLoaded
, gmpyLoaded
, tackpyLoaded
, \
32 from tlslite
.errors
import *
33 from tlslite
.utils
.cryptomath
import prngName
38 from xmlrpc
import client
as xmlrpclib
42 from tack
.structures
.Tack
import Tack
47 def printUsage(s
=None):
49 crypto
= "M2Crypto/OpenSSL"
51 crypto
= "Python crypto"
53 print("ERROR: %s" % s
)
54 print("""\ntls.py version %s (using %s)
57 server HOST:PORT DIRECTORY
59 client HOST:PORT DIRECTORY
60 """ % (__version__
, crypto
))
64 def testConnClient(conn
):
67 b100
= os
.urandom(100)
68 b1000
= os
.urandom(1000)
73 assert(conn
.read(min=1, max=1) == b1
)
74 assert(conn
.read(min=10, max=10) == b10
)
75 assert(conn
.read(min=100, max=100) == b100
)
76 assert(conn
.read(min=1000, max=1000) == b1000
)
78 def clientTestCmd(argv
):
83 #Split address into hostname/port tuple
84 address
= address
.split(":")
85 address
= ( address
[0], int(address
[1]) )
88 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
89 if hasattr(sock
, 'settimeout'): #It's a python 2.3 feature
92 c
= TLSConnection(sock
)
99 print("Test 0 - anonymous handshake")
100 connection
= connect()
101 connection
.handshakeClientAnonymous()
102 testConnClient(connection
)
105 print("Test 1 - good X509 (plus SNI)")
106 connection
= connect()
107 connection
.handshakeClientCert(serverName
=address
[0])
108 testConnClient(connection
)
109 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
110 assert(connection
.session
.serverName
== address
[0])
113 print("Test 1.a - good X509, SSLv3")
114 connection
= connect()
115 settings
= HandshakeSettings()
116 settings
.minVersion
= (3,0)
117 settings
.maxVersion
= (3,0)
118 connection
.handshakeClientCert(settings
=settings
)
119 testConnClient(connection
)
120 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
123 print("Test 1.b - good X509, RC4-MD5")
124 connection
= connect()
125 settings
= HandshakeSettings()
126 settings
.macNames
= ["md5"]
127 connection
.handshakeClientCert(settings
=settings
)
128 testConnClient(connection
)
129 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
130 assert(connection
.session
.cipherSuite
== constants
.CipherSuite
.TLS_RSA_WITH_RC4_128_MD5
)
135 settings
= HandshakeSettings()
136 settings
.useExperimentalTackExtension
= True
138 print("Test 2.a - good X.509, TACK")
139 connection
= connect()
140 connection
.handshakeClientCert(settings
=settings
)
141 assert(connection
.session
.tackExt
.tacks
[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3")
142 assert(connection
.session
.tackExt
.activation_flags
== 1)
143 testConnClient(connection
)
146 print("Test 2.b - good X.509, TACK unrelated to cert chain")
147 connection
= connect()
149 connection
.handshakeClientCert(settings
=settings
)
151 except TLSLocalAlert
as alert
:
152 if alert
.description
!= AlertDescription
.illegal_parameter
:
156 print("Test 3 - good SRP")
157 connection
= connect()
158 connection
.handshakeClientSRP("test", "password")
159 testConnClient(connection
)
162 print("Test 4 - SRP faults")
163 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
164 connection
= connect()
165 connection
.fault
= fault
167 connection
.handshakeClientSRP("test", "password")
168 print(" Good Fault %s" % (Fault
.faultNames
[fault
]))
169 except TLSFaultError
as e
:
170 print(" BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
)))
173 print("Test 6 - good SRP: with X.509 certificate, TLSv1.0")
174 settings
= HandshakeSettings()
175 settings
.minVersion
= (3,1)
176 settings
.maxVersion
= (3,1)
177 connection
= connect()
178 connection
.handshakeClientSRP("test", "password", settings
=settings
)
179 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
180 testConnClient(connection
)
183 print("Test 7 - X.509 with SRP faults")
184 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
185 connection
= connect()
186 connection
.fault
= fault
188 connection
.handshakeClientSRP("test", "password")
189 print(" Good Fault %s" % (Fault
.faultNames
[fault
]))
190 except TLSFaultError
as e
:
191 print(" BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
)))
194 print("Test 11 - X.509 faults")
195 for fault
in Fault
.clientNoAuthFaults
+ Fault
.genericFaults
:
196 connection
= connect()
197 connection
.fault
= fault
199 connection
.handshakeClientCert()
200 print(" Good Fault %s" % (Fault
.faultNames
[fault
]))
201 except TLSFaultError
as e
:
202 print(" BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
)))
205 print("Test 14 - good mutual X509")
206 x509Cert
= X509().parse(open(os
.path
.join(dir, "clientX509Cert.pem")).read())
207 x509Chain
= X509CertChain([x509Cert
])
208 s
= open(os
.path
.join(dir, "clientX509Key.pem")).read()
209 x509Key
= parsePEMKey(s
, private
=True)
211 connection
= connect()
212 connection
.handshakeClientCert(x509Chain
, x509Key
)
213 testConnClient(connection
)
214 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
217 print("Test 14.a - good mutual X509, SSLv3")
218 connection
= connect()
219 settings
= HandshakeSettings()
220 settings
.minVersion
= (3,0)
221 settings
.maxVersion
= (3,0)
222 connection
.handshakeClientCert(x509Chain
, x509Key
, settings
=settings
)
223 testConnClient(connection
)
224 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
227 print("Test 15 - mutual X.509 faults")
228 for fault
in Fault
.clientCertFaults
+ Fault
.genericFaults
:
229 connection
= connect()
230 connection
.fault
= fault
232 connection
.handshakeClientCert(x509Chain
, x509Key
)
233 print(" Good Fault %s" % (Fault
.faultNames
[fault
]))
234 except TLSFaultError
as e
:
235 print(" BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
)))
238 print("Test 18 - good SRP, prepare to resume... (plus SNI)")
239 connection
= connect()
240 connection
.handshakeClientSRP("test", "password", serverName
=address
[0])
241 testConnClient(connection
)
243 session
= connection
.session
245 print("Test 19 - resumption (plus SNI)")
246 connection
= connect()
247 connection
.handshakeClientSRP("test", "garbage", serverName
=address
[0],
249 testConnClient(connection
)
250 #Don't close! -- see below
252 print("Test 20 - invalidated resumption (plus SNI)")
253 connection
.sock
.close() #Close the socket without a close_notify!
254 connection
= connect()
256 connection
.handshakeClientSRP("test", "garbage",
257 serverName
=address
[0], session
=session
)
259 except TLSRemoteAlert
as alert
:
260 if alert
.description
!= AlertDescription
.bad_record_mac
:
264 print("Test 21 - HTTPS test X.509")
265 address
= address
[0], address
[1]+1
266 if hasattr(socket
, "timeout"):
267 timeoutEx
= socket
.timeout
269 timeoutEx
= socket
.error
273 htmlBody
= bytearray(open(os
.path
.join(dir, "index.html")).read(), "utf-8")
276 checker
=Checker(x509Fingerprint
=fingerprint
)
277 h
= HTTPTLSConnection(\
278 address
[0], address
[1], checker
=checker
)
280 h
.request("GET", "/index.html")
282 assert(r
.status
== 200)
283 b
= bytearray(r
.read())
284 assert(b
== htmlBody
)
285 fingerprint
= h
.tlsSession
.serverCertChain
.getFingerprint()
290 print("timeout, retrying...")
293 address
= address
[0], address
[1]+1
297 implementations
.append("openssl")
299 implementations
.append("pycrypto")
300 implementations
.append("python")
302 print("Test 22 - different ciphers, TLSv1.0")
303 for implementation
in implementations
:
304 for cipher
in ["aes128", "aes256", "rc4"]:
306 print("Test 22:", end
=' ')
307 connection
= connect()
309 settings
= HandshakeSettings()
310 settings
.cipherNames
= [cipher
]
311 settings
.cipherImplementations
= [implementation
, "python"]
312 settings
.minVersion
= (3,1)
313 settings
.maxVersion
= (3,1)
314 connection
.handshakeClientCert(settings
=settings
)
315 testConnClient(connection
)
316 print("%s %s" % (connection
.getCipherName(), connection
.getCipherImplementation()))
319 print("Test 23 - throughput test")
320 for implementation
in implementations
:
321 for cipher
in ["aes128gcm", "aes128", "aes256", "3des", "rc4"]:
322 if cipher
== "3des" and implementation
not in ("openssl", "pycrypto"):
324 if cipher
== "aes128gcm" and implementation
not in ("pycrypto", "python"):
327 print("Test 23:", end
=' ')
328 connection
= connect()
330 settings
= HandshakeSettings()
331 settings
.cipherNames
= [cipher
]
332 settings
.cipherImplementations
= [implementation
, "python"]
333 connection
.handshakeClientCert(settings
=settings
)
334 print("%s %s:" % (connection
.getCipherName(), connection
.getCipherImplementation()), end
=' ')
336 startTime
= time
.clock()
337 connection
.write(b
"hello"*10000)
338 h
= connection
.read(min=50000, max=50000)
339 stopTime
= time
.clock()
340 if stopTime
-startTime
:
341 print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime
-startTime
)))
343 print("100K exchanged very fast")
345 assert(h
== b
"hello"*10000)
348 print("Test 24.a - Next-Protocol Client Negotiation")
349 connection
= connect()
350 connection
.handshakeClientCert(nextProtos
=[b
"http/1.1"])
351 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
352 assert(connection
.next_proto
== b
'http/1.1')
355 print("Test 24.b - Next-Protocol Client Negotiation")
356 connection
= connect()
357 connection
.handshakeClientCert(nextProtos
=[b
"spdy/2", b
"http/1.1"])
358 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
359 assert(connection
.next_proto
== b
'spdy/2')
362 print("Test 24.c - Next-Protocol Client Negotiation")
363 connection
= connect()
364 connection
.handshakeClientCert(nextProtos
=[b
"spdy/2", b
"http/1.1"])
365 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
366 assert(connection
.next_proto
== b
'spdy/2')
369 print("Test 24.d - Next-Protocol Client Negotiation")
370 connection
= connect()
371 connection
.handshakeClientCert(nextProtos
=[b
"spdy/3", b
"spdy/2", b
"http/1.1"])
372 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
373 assert(connection
.next_proto
== b
'spdy/2')
376 print("Test 24.e - Next-Protocol Client Negotiation")
377 connection
= connect()
378 connection
.handshakeClientCert(nextProtos
=[b
"spdy/3", b
"spdy/2", b
"http/1.1"])
379 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
380 assert(connection
.next_proto
== b
'spdy/3')
383 print("Test 24.f - Next-Protocol Client Negotiation")
384 connection
= connect()
385 connection
.handshakeClientCert(nextProtos
=[b
"http/1.1"])
386 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
387 assert(connection
.next_proto
== b
'http/1.1')
390 print("Test 24.g - Next-Protocol Client Negotiation")
391 connection
= connect()
392 connection
.handshakeClientCert(nextProtos
=[b
"spdy/2", b
"http/1.1"])
393 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
394 assert(connection
.next_proto
== b
'spdy/2')
397 print('Test 25 - good standard XMLRPC https client')
398 time
.sleep(2) # Hack for lack of ability to set timeout here
399 address
= address
[0], address
[1]+1
400 server
= xmlrpclib
.Server('https://%s:%s' % address
)
401 assert server
.add(1,2) == 3
402 assert server
.pow(2,4) == 16
404 print('Test 26 - good tlslite XMLRPC client')
405 transport
= XMLRPCTransport(ignoreAbruptClose
=True)
406 server
= xmlrpclib
.Server('https://%s:%s' % address
, transport
)
407 assert server
.add(1,2) == 3
408 assert server
.pow(2,4) == 16
410 print('Test 27 - good XMLRPC ignored protocol')
411 server
= xmlrpclib
.Server('http://%s:%s' % address
, transport
)
412 assert server
.add(1,2) == 3
413 assert server
.pow(2,4) == 16
415 print("Test 28 - Internet servers test")
417 i
= IMAP4_TLS("cyrus.andrew.cmu.edu")
418 i
.login("anonymous", "anonymous@anonymous.net")
420 print("Test 28: IMAP4 good")
421 p
= POP3_TLS("pop.gmail.com")
423 print("Test 29: POP3 good")
424 except socket
.error
as e
:
425 print("Non-critical error: socket error trying to reach internet server: ", e
)
428 print("Test succeeded")
434 def testConnServer(connection
):
437 s
= connection
.read()
445 def serverTestCmd(argv
):
450 #Split address into hostname/port tuple
451 address
= address
.split(":")
452 address
= ( address
[0], int(address
[1]) )
455 lsock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
460 return TLSConnection(lsock
.accept()[0])
462 x509Cert
= X509().parse(open(os
.path
.join(dir, "serverX509Cert.pem")).read())
463 x509Chain
= X509CertChain([x509Cert
])
464 s
= open(os
.path
.join(dir, "serverX509Key.pem")).read()
465 x509Key
= parsePEMKey(s
, private
=True)
467 print("Test 0 - Anonymous server handshake")
468 connection
= connect()
469 connection
.handshakeServer(anon
=True)
470 testConnServer(connection
)
473 print("Test 1 - good X.509")
474 connection
= connect()
475 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
)
476 assert(connection
.session
.serverName
== address
[0])
477 testConnServer(connection
)
480 print("Test 1.a - good X.509, SSL v3")
481 connection
= connect()
482 settings
= HandshakeSettings()
483 settings
.minVersion
= (3,0)
484 settings
.maxVersion
= (3,0)
485 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, settings
=settings
)
486 testConnServer(connection
)
489 print("Test 1.b - good X.509, RC4-MD5")
490 connection
= connect()
491 settings
= HandshakeSettings()
492 settings
.macNames
= ["sha", "md5"]
493 settings
.cipherNames
= ["rc4"]
494 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, settings
=settings
)
495 testConnServer(connection
)
499 tack
= Tack
.createFromPem(open("./TACK1.pem", "rU").read())
500 tackUnrelated
= Tack
.createFromPem(open("./TACKunrelated.pem", "rU").read())
502 settings
= HandshakeSettings()
503 settings
.useExperimentalTackExtension
= True
505 print("Test 2.a - good X.509, TACK")
506 connection
= connect()
507 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
508 tacks
=[tack
], activationFlags
=1, settings
=settings
)
509 testConnServer(connection
)
512 print("Test 2.b - good X.509, TACK unrelated to cert chain")
513 connection
= connect()
515 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
516 tacks
=[tackUnrelated
], settings
=settings
)
518 except TLSRemoteAlert
as alert
:
519 if alert
.description
!= AlertDescription
.illegal_parameter
:
522 print("Test 3 - good SRP")
523 verifierDB
= VerifierDB()
525 entry
= VerifierDB
.makeVerifier("test", "password", 1536)
526 verifierDB
["test"] = entry
528 connection
= connect()
529 connection
.handshakeServer(verifierDB
=verifierDB
)
530 testConnServer(connection
)
533 print("Test 4 - SRP faults")
534 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
535 connection
= connect()
536 connection
.fault
= fault
538 connection
.handshakeServer(verifierDB
=verifierDB
)
544 print("Test 6 - good SRP: with X.509 cert")
545 connection
= connect()
546 connection
.handshakeServer(verifierDB
=verifierDB
, \
547 certChain
=x509Chain
, privateKey
=x509Key
)
548 testConnServer(connection
)
551 print("Test 7 - X.509 with SRP faults")
552 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
553 connection
= connect()
554 connection
.fault
= fault
556 connection
.handshakeServer(verifierDB
=verifierDB
, \
557 certChain
=x509Chain
, privateKey
=x509Key
)
563 print("Test 11 - X.509 faults")
564 for fault
in Fault
.clientNoAuthFaults
+ Fault
.genericFaults
:
565 connection
= connect()
566 connection
.fault
= fault
568 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
)
574 print("Test 14 - good mutual X.509")
575 connection
= connect()
576 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, reqCert
=True)
577 testConnServer(connection
)
578 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
581 print("Test 14a - good mutual X.509, SSLv3")
582 connection
= connect()
583 settings
= HandshakeSettings()
584 settings
.minVersion
= (3,0)
585 settings
.maxVersion
= (3,0)
586 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, reqCert
=True, settings
=settings
)
587 testConnServer(connection
)
588 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
591 print("Test 15 - mutual X.509 faults")
592 for fault
in Fault
.clientCertFaults
+ Fault
.genericFaults
:
593 connection
= connect()
594 connection
.fault
= fault
596 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, reqCert
=True)
602 print("Test 18 - good SRP, prepare to resume")
603 sessionCache
= SessionCache()
604 connection
= connect()
605 connection
.handshakeServer(verifierDB
=verifierDB
, sessionCache
=sessionCache
)
606 assert(connection
.session
.serverName
== address
[0])
607 testConnServer(connection
)
610 print("Test 19 - resumption")
611 connection
= connect()
612 connection
.handshakeServer(verifierDB
=verifierDB
, sessionCache
=sessionCache
)
613 assert(connection
.session
.serverName
== address
[0])
614 testConnServer(connection
)
615 #Don't close! -- see next test
617 print("Test 20 - invalidated resumption")
619 connection
.read(min=1, max=1)
620 assert() #Client is going to close the socket without a close_notify
621 except TLSAbruptCloseError
as e
:
623 connection
= connect()
625 connection
.handshakeServer(verifierDB
=verifierDB
, sessionCache
=sessionCache
)
626 except TLSLocalAlert
as alert
:
627 if alert
.description
!= AlertDescription
.bad_record_mac
:
631 print("Test 21 - HTTPS test X.509")
633 #Close the current listening socket
636 #Create and run an HTTP Server using TLSSocketServerMixIn
637 class MyHTTPServer(TLSSocketServerMixIn
,
639 def handshake(self
, tlsConnection
):
640 tlsConnection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
)
644 address
= address
[0], address
[1]+1
645 httpd
= MyHTTPServer(address
, SimpleHTTPRequestHandler
)
647 httpd
.handle_request()
651 #Re-connect the listening socket
652 lsock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
653 address
= address
[0], address
[1]+1
659 implementations
.append("openssl")
661 implementations
.append("pycrypto")
662 implementations
.append("python")
664 print("Test 22 - different ciphers")
665 for implementation
in ["python"] * len(implementations
):
666 for cipher
in ["aes128", "aes256", "rc4"]:
668 print("Test 22:", end
=' ')
669 connection
= connect()
671 settings
= HandshakeSettings()
672 settings
.cipherNames
= [cipher
]
673 settings
.cipherImplementations
= [implementation
, "python"]
675 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
677 print(connection
.getCipherName(), connection
.getCipherImplementation())
678 testConnServer(connection
)
681 print("Test 23 - throughput test")
682 for implementation
in implementations
:
683 for cipher
in ["aes128gcm", "aes128", "aes256", "3des", "rc4"]:
684 if cipher
== "3des" and implementation
not in ("openssl", "pycrypto"):
686 if cipher
== "aes128gcm" and implementation
not in ("pycrypto", "python"):
689 print("Test 23:", end
=' ')
690 connection
= connect()
692 settings
= HandshakeSettings()
693 settings
.cipherNames
= [cipher
]
694 settings
.cipherImplementations
= [implementation
, "python"]
696 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
698 print(connection
.getCipherName(), connection
.getCipherImplementation())
699 h
= connection
.read(min=50000, max=50000)
700 assert(h
== b
"hello"*10000)
704 print("Test 24.a - Next-Protocol Server Negotiation")
705 connection
= connect()
706 settings
= HandshakeSettings()
707 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
708 settings
=settings
, nextProtos
=[b
"http/1.1"])
709 testConnServer(connection
)
712 print("Test 24.b - Next-Protocol Server Negotiation")
713 connection
= connect()
714 settings
= HandshakeSettings()
715 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
716 settings
=settings
, nextProtos
=[b
"spdy/2", b
"http/1.1"])
717 testConnServer(connection
)
720 print("Test 24.c - Next-Protocol Server Negotiation")
721 connection
= connect()
722 settings
= HandshakeSettings()
723 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
724 settings
=settings
, nextProtos
=[b
"http/1.1", b
"spdy/2"])
725 testConnServer(connection
)
728 print("Test 24.d - Next-Protocol Server Negotiation")
729 connection
= connect()
730 settings
= HandshakeSettings()
731 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
732 settings
=settings
, nextProtos
=[b
"spdy/2", b
"http/1.1"])
733 testConnServer(connection
)
736 print("Test 24.e - Next-Protocol Server Negotiation")
737 connection
= connect()
738 settings
= HandshakeSettings()
739 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
740 settings
=settings
, nextProtos
=[b
"http/1.1", b
"spdy/2", b
"spdy/3"])
741 testConnServer(connection
)
744 print("Test 24.f - Next-Protocol Server Negotiation")
745 connection
= connect()
746 settings
= HandshakeSettings()
747 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
748 settings
=settings
, nextProtos
=[b
"spdy/3", b
"spdy/2"])
749 testConnServer(connection
)
752 print("Test 24.g - Next-Protocol Server Negotiation")
753 connection
= connect()
754 settings
= HandshakeSettings()
755 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
756 settings
=settings
, nextProtos
=[])
757 testConnServer(connection
)
760 print("Tests 25-27 - XMLRPXC server")
761 address
= address
[0], address
[1]+1
762 class Server(TLSXMLRPCServer
):
764 def handshake(self
, tlsConnection
):
766 tlsConnection
.handshakeServer(certChain
=x509Chain
,
768 sessionCache
=sessionCache
)
769 tlsConnection
.ignoreAbruptClose
= True
771 except TLSError
as error
:
772 print("Handshake failure:", str(error
))
776 def pow(self
, x
, y
): return pow(x
, y
)
777 def add(self
, x
, y
): return x
+ y
779 server
= Server(address
)
780 server
.register_instance(MyFuncs())
781 #sa = server.socket.getsockname()
782 #print "Serving HTTPS on", sa[0], "port", sa[1]
784 server
.handle_request()
786 print("Test succeeded")
789 if __name__
== '__main__':
790 if len(sys
.argv
) < 2:
791 printUsage("Missing command")
792 elif sys
.argv
[1] == "client"[:len(sys
.argv
[1])]:
793 clientTestCmd(sys
.argv
[2:])
794 elif sys
.argv
[1] == "server"[:len(sys
.argv
[1])]:
795 serverTestCmd(sys
.argv
[2:])
797 printUsage("Unknown command: %s" % sys
.argv
[1])