5 # Kees Bos - Added tests for XML-RPC
6 # Dimitris Moraitis - Anon ciphersuites
7 # Marcelo Fernandez - Added test for NPN
8 # Martin von Loewis - python 3 port
11 # See the LICENSE file for legal information regarding use of this file.
12 from __future__
import print_function
20 from BaseHTTPServer
import HTTPServer
21 from SimpleHTTPServer
import SimpleHTTPRequestHandler
23 from http
.server
import HTTPServer
, SimpleHTTPRequestHandler
25 from tlslite
import TLSConnection
, Fault
, HandshakeSettings
, \
26 X509
, X509CertChain
, IMAP4_TLS
, VerifierDB
, Session
, SessionCache
, \
27 parsePEMKey
, constants
, \
28 AlertDescription
, HTTPTLSConnection
, TLSSocketServerMixIn
, \
29 POP3_TLS
, m2cryptoLoaded
, pycryptoLoaded
, gmpyLoaded
, tackpyLoaded
, \
32 from tlslite
.errors
import *
33 from tlslite
.utils
.cryptomath
import prngName
38 from xmlrpc
import client
as xmlrpclib
42 from tack
.structures
.Tack
import Tack
47 def printUsage(s
=None):
49 crypto
= "M2Crypto/OpenSSL"
51 crypto
= "Python crypto"
53 print("ERROR: %s" % s
)
54 print("""\ntls.py version %s (using %s)
57 server HOST:PORT DIRECTORY
59 client HOST:PORT DIRECTORY
60 """ % (__version__
, crypto
))
64 def testConnClient(conn
):
67 b100
= os
.urandom(100)
68 b1000
= os
.urandom(1000)
73 assert(conn
.read(min=1, max=1) == b1
)
74 assert(conn
.read(min=10, max=10) == b10
)
75 assert(conn
.read(min=100, max=100) == b100
)
76 assert(conn
.read(min=1000, max=1000) == b1000
)
78 def clientTestCmd(argv
):
83 #Split address into hostname/port tuple
84 address
= address
.split(":")
85 address
= ( address
[0], int(address
[1]) )
88 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
89 if hasattr(sock
, 'settimeout'): #It's a python 2.3 feature
92 c
= TLSConnection(sock
)
99 print("Test 0 - anonymous handshake")
100 connection
= connect()
101 connection
.handshakeClientAnonymous()
102 testConnClient(connection
)
105 print("Test 1 - good X509 (plus SNI)")
106 connection
= connect()
107 connection
.handshakeClientCert(serverName
=address
[0])
108 testConnClient(connection
)
109 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
110 assert(connection
.session
.serverName
== address
[0])
113 print("Test 1.a - good X509, SSLv3")
114 connection
= connect()
115 settings
= HandshakeSettings()
116 settings
.minVersion
= (3,0)
117 settings
.maxVersion
= (3,0)
118 connection
.handshakeClientCert(settings
=settings
)
119 testConnClient(connection
)
120 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
123 print("Test 1.b - good X509, RC4-MD5")
124 connection
= connect()
125 settings
= HandshakeSettings()
126 settings
.macNames
= ["md5"]
127 connection
.handshakeClientCert(settings
=settings
)
128 testConnClient(connection
)
129 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
130 assert(connection
.session
.cipherSuite
== constants
.CipherSuite
.TLS_RSA_WITH_RC4_128_MD5
)
135 settings
= HandshakeSettings()
136 settings
.useExperimentalTackExtension
= True
138 print("Test 2.a - good X.509, TACK")
139 connection
= connect()
140 connection
.handshakeClientCert(settings
=settings
)
141 assert(connection
.session
.tackExt
.tacks
[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3")
142 assert(connection
.session
.tackExt
.activation_flags
== 1)
143 testConnClient(connection
)
146 print("Test 2.b - good X.509, TACK unrelated to cert chain")
147 connection
= connect()
149 connection
.handshakeClientCert(settings
=settings
)
151 except TLSLocalAlert
as alert
:
152 if alert
.description
!= AlertDescription
.illegal_parameter
:
156 print("Test 3 - good SRP")
157 connection
= connect()
158 connection
.handshakeClientSRP("test", "password")
159 testConnClient(connection
)
162 print("Test 4 - SRP faults")
163 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
164 connection
= connect()
165 connection
.fault
= fault
167 connection
.handshakeClientSRP("test", "password")
168 print(" Good Fault %s" % (Fault
.faultNames
[fault
]))
169 except TLSFaultError
as e
:
170 print(" BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
)))
173 print("Test 6 - good SRP: with X.509 certificate, TLSv1.0")
174 settings
= HandshakeSettings()
175 settings
.minVersion
= (3,1)
176 settings
.maxVersion
= (3,1)
177 connection
= connect()
178 connection
.handshakeClientSRP("test", "password", settings
=settings
)
179 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
180 testConnClient(connection
)
183 print("Test 7 - X.509 with SRP faults")
184 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
185 connection
= connect()
186 connection
.fault
= fault
188 connection
.handshakeClientSRP("test", "password")
189 print(" Good Fault %s" % (Fault
.faultNames
[fault
]))
190 except TLSFaultError
as e
:
191 print(" BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
)))
194 print("Test 11 - X.509 faults")
195 for fault
in Fault
.clientNoAuthFaults
+ Fault
.genericFaults
:
196 connection
= connect()
197 connection
.fault
= fault
199 connection
.handshakeClientCert()
200 print(" Good Fault %s" % (Fault
.faultNames
[fault
]))
201 except TLSFaultError
as e
:
202 print(" BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
)))
205 print("Test 14 - good mutual X509")
206 x509Cert
= X509().parse(open(os
.path
.join(dir, "clientX509Cert.pem")).read())
207 x509Chain
= X509CertChain([x509Cert
])
208 s
= open(os
.path
.join(dir, "clientX509Key.pem")).read()
209 x509Key
= parsePEMKey(s
, private
=True)
211 connection
= connect()
212 connection
.handshakeClientCert(x509Chain
, x509Key
)
213 testConnClient(connection
)
214 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
217 print("Test 14.a - good mutual X509, SSLv3")
218 connection
= connect()
219 settings
= HandshakeSettings()
220 settings
.minVersion
= (3,0)
221 settings
.maxVersion
= (3,0)
222 connection
.handshakeClientCert(x509Chain
, x509Key
, settings
=settings
)
223 testConnClient(connection
)
224 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
227 print("Test 15 - mutual X.509 faults")
228 for fault
in Fault
.clientCertFaults
+ Fault
.genericFaults
:
229 connection
= connect()
230 connection
.fault
= fault
232 connection
.handshakeClientCert(x509Chain
, x509Key
)
233 print(" Good Fault %s" % (Fault
.faultNames
[fault
]))
234 except TLSFaultError
as e
:
235 print(" BAD FAULT %s: %s" % (Fault
.faultNames
[fault
], str(e
)))
238 print("Test 18 - good SRP, prepare to resume... (plus SNI)")
239 connection
= connect()
240 connection
.handshakeClientSRP("test", "password", serverName
=address
[0])
241 testConnClient(connection
)
243 session
= connection
.session
245 print("Test 19 - resumption (plus SNI)")
246 connection
= connect()
247 connection
.handshakeClientSRP("test", "garbage", serverName
=address
[0],
249 testConnClient(connection
)
250 #Don't close! -- see below
252 print("Test 20 - invalidated resumption (plus SNI)")
253 connection
.sock
.close() #Close the socket without a close_notify!
254 connection
= connect()
256 connection
.handshakeClientSRP("test", "garbage",
257 serverName
=address
[0], session
=session
)
259 except TLSRemoteAlert
as alert
:
260 if alert
.description
!= AlertDescription
.bad_record_mac
:
264 print("Test 21 - HTTPS test X.509")
265 address
= address
[0], address
[1]+1
266 if hasattr(socket
, "timeout"):
267 timeoutEx
= socket
.timeout
269 timeoutEx
= socket
.error
273 htmlBody
= bytearray(open(os
.path
.join(dir, "index.html")).read(), "utf-8")
276 checker
=Checker(x509Fingerprint
=fingerprint
)
277 h
= HTTPTLSConnection(\
278 address
[0], address
[1], checker
=checker
)
280 h
.request("GET", "/index.html")
282 assert(r
.status
== 200)
283 b
= bytearray(r
.read())
284 assert(b
== htmlBody
)
285 fingerprint
= h
.tlsSession
.serverCertChain
.getFingerprint()
290 print("timeout, retrying...")
293 address
= address
[0], address
[1]+1
297 implementations
.append("openssl")
299 implementations
.append("pycrypto")
300 implementations
.append("python")
302 print("Test 22 - different ciphers, TLSv1.0")
303 for implementation
in implementations
:
304 for cipher
in ["aes128", "aes256", "rc4"]:
306 print("Test 22:", end
=' ')
307 connection
= connect()
309 settings
= HandshakeSettings()
310 settings
.cipherNames
= [cipher
]
311 settings
.cipherImplementations
= [implementation
, "python"]
312 settings
.minVersion
= (3,1)
313 settings
.maxVersion
= (3,1)
314 connection
.handshakeClientCert(settings
=settings
)
315 testConnClient(connection
)
316 print("%s %s" % (connection
.getCipherName(), connection
.getCipherImplementation()))
319 print("Test 23 - throughput test")
320 for implementation
in implementations
:
321 for cipher
in ["aes128", "aes256", "3des", "rc4"]:
322 if cipher
== "3des" and implementation
not in ("openssl", "pycrypto"):
325 print("Test 23:", end
=' ')
326 connection
= connect()
328 settings
= HandshakeSettings()
329 settings
.cipherNames
= [cipher
]
330 settings
.cipherImplementations
= [implementation
, "python"]
331 connection
.handshakeClientCert(settings
=settings
)
332 print("%s %s:" % (connection
.getCipherName(), connection
.getCipherImplementation()), end
=' ')
334 startTime
= time
.clock()
335 connection
.write(b
"hello"*10000)
336 h
= connection
.read(min=50000, max=50000)
337 stopTime
= time
.clock()
338 if stopTime
-startTime
:
339 print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime
-startTime
)))
341 print("100K exchanged very fast")
343 assert(h
== b
"hello"*10000)
346 print("Test 24.a - Next-Protocol Client Negotiation")
347 connection
= connect()
348 connection
.handshakeClientCert(nextProtos
=[b
"http/1.1"])
349 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
350 assert(connection
.next_proto
== b
'http/1.1')
353 print("Test 24.b - Next-Protocol Client Negotiation")
354 connection
= connect()
355 connection
.handshakeClientCert(nextProtos
=[b
"spdy/2", b
"http/1.1"])
356 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
357 assert(connection
.next_proto
== b
'spdy/2')
360 print("Test 24.c - Next-Protocol Client Negotiation")
361 connection
= connect()
362 connection
.handshakeClientCert(nextProtos
=[b
"spdy/2", b
"http/1.1"])
363 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
364 assert(connection
.next_proto
== b
'spdy/2')
367 print("Test 24.d - Next-Protocol Client Negotiation")
368 connection
= connect()
369 connection
.handshakeClientCert(nextProtos
=[b
"spdy/3", b
"spdy/2", b
"http/1.1"])
370 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
371 assert(connection
.next_proto
== b
'spdy/2')
374 print("Test 24.e - Next-Protocol Client Negotiation")
375 connection
= connect()
376 connection
.handshakeClientCert(nextProtos
=[b
"spdy/3", b
"spdy/2", b
"http/1.1"])
377 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
378 assert(connection
.next_proto
== b
'spdy/3')
381 print("Test 24.f - Next-Protocol Client Negotiation")
382 connection
= connect()
383 connection
.handshakeClientCert(nextProtos
=[b
"http/1.1"])
384 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
385 assert(connection
.next_proto
== b
'http/1.1')
388 print("Test 24.g - Next-Protocol Client Negotiation")
389 connection
= connect()
390 connection
.handshakeClientCert(nextProtos
=[b
"spdy/2", b
"http/1.1"])
391 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
392 assert(connection
.next_proto
== b
'spdy/2')
395 print('Test 25 - good standard XMLRPC https client')
396 time
.sleep(2) # Hack for lack of ability to set timeout here
397 address
= address
[0], address
[1]+1
398 server
= xmlrpclib
.Server('https://%s:%s' % address
)
399 assert server
.add(1,2) == 3
400 assert server
.pow(2,4) == 16
402 print('Test 26 - good tlslite XMLRPC client')
403 transport
= XMLRPCTransport(ignoreAbruptClose
=True)
404 server
= xmlrpclib
.Server('https://%s:%s' % address
, transport
)
405 assert server
.add(1,2) == 3
406 assert server
.pow(2,4) == 16
408 print('Test 27 - good XMLRPC ignored protocol')
409 server
= xmlrpclib
.Server('http://%s:%s' % address
, transport
)
410 assert server
.add(1,2) == 3
411 assert server
.pow(2,4) == 16
413 print("Test 28 - Internet servers test")
415 i
= IMAP4_TLS("cyrus.andrew.cmu.edu")
416 i
.login("anonymous", "anonymous@anonymous.net")
418 print("Test 28: IMAP4 good")
419 p
= POP3_TLS("pop.gmail.com")
421 print("Test 29: POP3 good")
422 except socket
.error
as e
:
423 print("Non-critical error: socket error trying to reach internet server: ", e
)
426 print("Test succeeded")
432 def testConnServer(connection
):
435 s
= connection
.read()
443 def serverTestCmd(argv
):
448 #Split address into hostname/port tuple
449 address
= address
.split(":")
450 address
= ( address
[0], int(address
[1]) )
453 lsock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
458 return TLSConnection(lsock
.accept()[0])
460 x509Cert
= X509().parse(open(os
.path
.join(dir, "serverX509Cert.pem")).read())
461 x509Chain
= X509CertChain([x509Cert
])
462 s
= open(os
.path
.join(dir, "serverX509Key.pem")).read()
463 x509Key
= parsePEMKey(s
, private
=True)
465 print("Test 0 - Anonymous server handshake")
466 connection
= connect()
467 connection
.handshakeServer(anon
=True)
468 testConnServer(connection
)
471 print("Test 1 - good X.509")
472 connection
= connect()
473 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
)
474 assert(connection
.session
.serverName
== address
[0])
475 testConnServer(connection
)
478 print("Test 1.a - good X.509, SSL v3")
479 connection
= connect()
480 settings
= HandshakeSettings()
481 settings
.minVersion
= (3,0)
482 settings
.maxVersion
= (3,0)
483 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, settings
=settings
)
484 testConnServer(connection
)
487 print("Test 1.b - good X.509, RC4-MD5")
488 connection
= connect()
489 settings
= HandshakeSettings()
490 settings
.macNames
= ["sha", "md5"]
491 settings
.cipherNames
= ["rc4"]
492 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, settings
=settings
)
493 testConnServer(connection
)
497 tack
= Tack
.createFromPem(open("./TACK1.pem", "rU").read())
498 tackUnrelated
= Tack
.createFromPem(open("./TACKunrelated.pem", "rU").read())
500 settings
= HandshakeSettings()
501 settings
.useExperimentalTackExtension
= True
503 print("Test 2.a - good X.509, TACK")
504 connection
= connect()
505 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
506 tacks
=[tack
], activationFlags
=1, settings
=settings
)
507 testConnServer(connection
)
510 print("Test 2.b - good X.509, TACK unrelated to cert chain")
511 connection
= connect()
513 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
514 tacks
=[tackUnrelated
], settings
=settings
)
516 except TLSRemoteAlert
as alert
:
517 if alert
.description
!= AlertDescription
.illegal_parameter
:
520 print("Test 3 - good SRP")
521 verifierDB
= VerifierDB()
523 entry
= VerifierDB
.makeVerifier("test", "password", 1536)
524 verifierDB
["test"] = entry
526 connection
= connect()
527 connection
.handshakeServer(verifierDB
=verifierDB
)
528 testConnServer(connection
)
531 print("Test 4 - SRP faults")
532 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
533 connection
= connect()
534 connection
.fault
= fault
536 connection
.handshakeServer(verifierDB
=verifierDB
)
542 print("Test 6 - good SRP: with X.509 cert")
543 connection
= connect()
544 connection
.handshakeServer(verifierDB
=verifierDB
, \
545 certChain
=x509Chain
, privateKey
=x509Key
)
546 testConnServer(connection
)
549 print("Test 7 - X.509 with SRP faults")
550 for fault
in Fault
.clientSrpFaults
+ Fault
.genericFaults
:
551 connection
= connect()
552 connection
.fault
= fault
554 connection
.handshakeServer(verifierDB
=verifierDB
, \
555 certChain
=x509Chain
, privateKey
=x509Key
)
561 print("Test 11 - X.509 faults")
562 for fault
in Fault
.clientNoAuthFaults
+ Fault
.genericFaults
:
563 connection
= connect()
564 connection
.fault
= fault
566 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
)
572 print("Test 14 - good mutual X.509")
573 connection
= connect()
574 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, reqCert
=True)
575 testConnServer(connection
)
576 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
579 print("Test 14a - good mutual X.509, SSLv3")
580 connection
= connect()
581 settings
= HandshakeSettings()
582 settings
.minVersion
= (3,0)
583 settings
.maxVersion
= (3,0)
584 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, reqCert
=True, settings
=settings
)
585 testConnServer(connection
)
586 assert(isinstance(connection
.session
.serverCertChain
, X509CertChain
))
589 print("Test 15 - mutual X.509 faults")
590 for fault
in Fault
.clientCertFaults
+ Fault
.genericFaults
:
591 connection
= connect()
592 connection
.fault
= fault
594 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
, reqCert
=True)
600 print("Test 18 - good SRP, prepare to resume")
601 sessionCache
= SessionCache()
602 connection
= connect()
603 connection
.handshakeServer(verifierDB
=verifierDB
, sessionCache
=sessionCache
)
604 assert(connection
.session
.serverName
== address
[0])
605 testConnServer(connection
)
608 print("Test 19 - resumption")
609 connection
= connect()
610 connection
.handshakeServer(verifierDB
=verifierDB
, sessionCache
=sessionCache
)
611 assert(connection
.session
.serverName
== address
[0])
612 testConnServer(connection
)
613 #Don't close! -- see next test
615 print("Test 20 - invalidated resumption")
617 connection
.read(min=1, max=1)
618 assert() #Client is going to close the socket without a close_notify
619 except TLSAbruptCloseError
as e
:
621 connection
= connect()
623 connection
.handshakeServer(verifierDB
=verifierDB
, sessionCache
=sessionCache
)
624 except TLSLocalAlert
as alert
:
625 if alert
.description
!= AlertDescription
.bad_record_mac
:
629 print("Test 21 - HTTPS test X.509")
631 #Close the current listening socket
634 #Create and run an HTTP Server using TLSSocketServerMixIn
635 class MyHTTPServer(TLSSocketServerMixIn
,
637 def handshake(self
, tlsConnection
):
638 tlsConnection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
)
642 address
= address
[0], address
[1]+1
643 httpd
= MyHTTPServer(address
, SimpleHTTPRequestHandler
)
645 httpd
.handle_request()
649 #Re-connect the listening socket
650 lsock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
651 address
= address
[0], address
[1]+1
657 implementations
.append("openssl")
659 implementations
.append("pycrypto")
660 implementations
.append("python")
662 print("Test 22 - different ciphers")
663 for implementation
in ["python"] * len(implementations
):
664 for cipher
in ["aes128", "aes256", "rc4"]:
666 print("Test 22:", end
=' ')
667 connection
= connect()
669 settings
= HandshakeSettings()
670 settings
.cipherNames
= [cipher
]
671 settings
.cipherImplementations
= [implementation
, "python"]
673 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
675 print(connection
.getCipherName(), connection
.getCipherImplementation())
676 testConnServer(connection
)
679 print("Test 23 - throughput test")
680 for implementation
in implementations
:
681 for cipher
in ["aes128", "aes256", "3des", "rc4"]:
682 if cipher
== "3des" and implementation
not in ("openssl", "pycrypto"):
685 print("Test 23:", end
=' ')
686 connection
= connect()
688 settings
= HandshakeSettings()
689 settings
.cipherNames
= [cipher
]
690 settings
.cipherImplementations
= [implementation
, "python"]
692 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
694 print(connection
.getCipherName(), connection
.getCipherImplementation())
695 h
= connection
.read(min=50000, max=50000)
696 assert(h
== b
"hello"*10000)
700 print("Test 24.a - Next-Protocol Server Negotiation")
701 connection
= connect()
702 settings
= HandshakeSettings()
703 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
704 settings
=settings
, nextProtos
=[b
"http/1.1"])
705 testConnServer(connection
)
708 print("Test 24.b - Next-Protocol Server Negotiation")
709 connection
= connect()
710 settings
= HandshakeSettings()
711 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
712 settings
=settings
, nextProtos
=[b
"spdy/2", b
"http/1.1"])
713 testConnServer(connection
)
716 print("Test 24.c - Next-Protocol Server Negotiation")
717 connection
= connect()
718 settings
= HandshakeSettings()
719 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
720 settings
=settings
, nextProtos
=[b
"http/1.1", b
"spdy/2"])
721 testConnServer(connection
)
724 print("Test 24.d - Next-Protocol Server Negotiation")
725 connection
= connect()
726 settings
= HandshakeSettings()
727 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
728 settings
=settings
, nextProtos
=[b
"spdy/2", b
"http/1.1"])
729 testConnServer(connection
)
732 print("Test 24.e - Next-Protocol Server Negotiation")
733 connection
= connect()
734 settings
= HandshakeSettings()
735 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
736 settings
=settings
, nextProtos
=[b
"http/1.1", b
"spdy/2", b
"spdy/3"])
737 testConnServer(connection
)
740 print("Test 24.f - Next-Protocol Server Negotiation")
741 connection
= connect()
742 settings
= HandshakeSettings()
743 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
744 settings
=settings
, nextProtos
=[b
"spdy/3", b
"spdy/2"])
745 testConnServer(connection
)
748 print("Test 24.g - Next-Protocol Server Negotiation")
749 connection
= connect()
750 settings
= HandshakeSettings()
751 connection
.handshakeServer(certChain
=x509Chain
, privateKey
=x509Key
,
752 settings
=settings
, nextProtos
=[])
753 testConnServer(connection
)
756 print("Tests 25-27 - XMLRPXC server")
757 address
= address
[0], address
[1]+1
758 class Server(TLSXMLRPCServer
):
760 def handshake(self
, tlsConnection
):
762 tlsConnection
.handshakeServer(certChain
=x509Chain
,
764 sessionCache
=sessionCache
)
765 tlsConnection
.ignoreAbruptClose
= True
767 except TLSError
as error
:
768 print("Handshake failure:", str(error
))
772 def pow(self
, x
, y
): return pow(x
, y
)
773 def add(self
, x
, y
): return x
+ y
775 server
= Server(address
)
776 server
.register_instance(MyFuncs())
777 #sa = server.socket.getsockname()
778 #print "Serving HTTPS on", sa[0], "port", sa[1]
780 server
.handle_request()
782 print("Test succeeded")
785 if __name__
== '__main__':
786 if len(sys
.argv
) < 2:
787 printUsage("Missing command")
788 elif sys
.argv
[1] == "client"[:len(sys
.argv
[1])]:
789 clientTestCmd(sys
.argv
[2:])
790 elif sys
.argv
[1] == "server"[:len(sys
.argv
[1])]:
791 serverTestCmd(sys
.argv
[2:])
793 printUsage("Unknown command: %s" % sys
.argv
[1])