2 # Allow direct execution
10 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
22 from socketserver
import (
28 from test
.helper
import http_server_port
, verify_address_availability
29 from yt_dlp
.networking
import Request
30 from yt_dlp
.networking
.exceptions
import ProxyError
, TransportError
31 from yt_dlp
.socks
import (
34 SOCKS5_USER_AUTH_SUCCESS
,
35 SOCKS5_USER_AUTH_VERSION
,
41 SOCKS5_USER_AUTH_FAILURE
= 0x1
44 class Socks4CD(enum
.IntEnum
):
46 REQUEST_REJECTED_OR_FAILED
= 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD
= 92
48 REQUEST_REJECTED_DIFFERENT_USERID
= 93
51 class Socks5Reply(enum
.IntEnum
):
54 CONNECTION_NOT_ALLOWED
= 0x2
55 NETWORK_UNREACHABLE
= 0x3
56 HOST_UNREACHABLE
= 0x4
57 CONNECTION_REFUSED
= 0x5
59 COMMAND_NOT_SUPPORTED
= 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED
= 0x8
63 class SocksTestRequestHandler(BaseRequestHandler
):
65 def __init__(self
, *args
, socks_info
=None, **kwargs
):
66 self
.socks_info
= socks_info
67 super().__init
__(*args
, **kwargs
)
70 class SocksProxyHandler(BaseRequestHandler
):
71 def __init__(self
, request_handler_class
, socks_server_kwargs
, *args
, **kwargs
):
72 self
.socks_kwargs
= socks_server_kwargs
or {}
73 self
.request_handler_class
= request_handler_class
74 super().__init
__(*args
, **kwargs
)
77 class Socks5ProxyHandler(StreamRequestHandler
, SocksProxyHandler
):
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
83 sleep
= self
.socks_kwargs
.get('sleep')
86 version
, nmethods
= self
.connection
.recv(2)
87 assert version
== SOCKS5_VERSION
88 methods
= list(self
.connection
.recv(nmethods
))
90 auth
= self
.socks_kwargs
.get('auth')
92 if auth
is not None and Socks5Auth
.AUTH_USER_PASS
not in methods
:
93 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NO_ACCEPTABLE
))
94 self
.server
.close_request(self
.request
)
97 elif Socks5Auth
.AUTH_USER_PASS
in methods
:
98 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_USER_PASS
))
100 _
, user_len
= struct
.unpack('!BB', self
.connection
.recv(2))
101 username
= self
.connection
.recv(user_len
).decode()
102 pass_len
= ord(self
.connection
.recv(1))
103 password
= self
.connection
.recv(pass_len
).decode()
105 if username
== auth
[0] and password
== auth
[1]:
106 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_USER_AUTH_VERSION
, SOCKS5_USER_AUTH_SUCCESS
))
108 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_USER_AUTH_VERSION
, SOCKS5_USER_AUTH_FAILURE
))
109 self
.server
.close_request(self
.request
)
112 elif Socks5Auth
.AUTH_NONE
in methods
:
113 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NONE
))
115 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NO_ACCEPTABLE
))
116 self
.server
.close_request(self
.request
)
119 version
, command
, _
, address_type
= struct
.unpack('!BBBB', self
.connection
.recv(4))
122 'auth_methods': methods
,
124 'client_address': self
.client_address
,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
129 if address_type
== Socks5AddressType
.ATYP_IPV4
:
130 socks_info
['ipv4_address'] = socket
.inet_ntoa(self
.connection
.recv(4))
131 elif address_type
== Socks5AddressType
.ATYP_DOMAINNAME
:
132 socks_info
['domain_address'] = self
.connection
.recv(ord(self
.connection
.recv(1))).decode()
133 elif address_type
== Socks5AddressType
.ATYP_IPV6
:
134 socks_info
['ipv6_address'] = socket
.inet_ntop(socket
.AF_INET6
, self
.connection
.recv(16))
136 self
.server
.close_request(self
.request
)
138 socks_info
['port'] = struct
.unpack('!H', self
.connection
.recv(2))[0]
140 # dummy response, the returned IP is just a placeholder
141 self
.connection
.sendall(struct
.pack(
142 '!BBBBIH', SOCKS5_VERSION
, self
.socks_kwargs
.get('reply', Socks5Reply
.SUCCEEDED
), 0x0, 0x1, 0x7f000001, 40000))
144 self
.request_handler_class(self
.request
, self
.client_address
, self
.server
, socks_info
=socks_info
)
147 class Socks4ProxyHandler(StreamRequestHandler
, SocksProxyHandler
):
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
152 def _read_until_null(self
):
153 return b
''.join(iter(functools
.partial(self
.connection
.recv
, 1), b
'\x00'))
156 sleep
= self
.socks_kwargs
.get('sleep')
160 'version': SOCKS4_VERSION
,
162 'client_address': self
.client_address
,
163 'ipv4_address': None,
165 'domain_address': None,
167 version
, command
, dest_port
, dest_ip
= struct
.unpack('!BBHI', self
.connection
.recv(8))
168 socks_info
['port'] = dest_port
169 socks_info
['command'] = command
170 if version
!= SOCKS4_VERSION
:
171 self
.server
.close_request(self
.request
)
173 use_remote_dns
= False
174 if 0x0 < dest_ip
<= 0xFF:
175 use_remote_dns
= True
177 socks_info
['ipv4_address'] = socket
.inet_ntoa(struct
.pack('!I', dest_ip
))
179 user_id
= self
._read
_until
_null
().decode()
180 if user_id
!= (self
.socks_kwargs
.get('user_id') or ''):
181 self
.connection
.sendall(struct
.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION
, Socks4CD
.REQUEST_REJECTED_DIFFERENT_USERID
, 0x00, 0x00000000))
183 self
.server
.close_request(self
.request
)
187 socks_info
['domain_address'] = self
._read
_until
_null
().decode()
189 # dummy response, the returned IP is just a placeholder
190 self
.connection
.sendall(
192 '!BBHI', SOCKS4_REPLY_VERSION
,
193 self
.socks_kwargs
.get('cd_reply', Socks4CD
.REQUEST_GRANTED
), 40000, 0x7f000001))
195 self
.request_handler_class(self
.request
, self
.client_address
, self
.server
, socks_info
=socks_info
)
198 class IPv6ThreadingTCPServer(ThreadingTCPServer
):
199 address_family
= socket
.AF_INET6
202 class SocksHTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
, SocksTestRequestHandler
):
204 if self
.path
== '/socks_info':
205 payload
= json
.dumps(self
.socks_info
.copy())
206 self
.send_response(200)
207 self
.send_header('Content-Type', 'application/json; charset=utf-8')
208 self
.send_header('Content-Length', str(len(payload
)))
210 self
.wfile
.write(payload
.encode())
213 class SocksWebSocketTestRequestHandler(SocksTestRequestHandler
):
215 import websockets
.sync
.server
216 protocol
= websockets
.ServerProtocol()
217 connection
= websockets
.sync
.server
.ServerConnection(socket
=self
.request
, protocol
=protocol
, close_timeout
=0)
218 connection
.handshake()
219 for message
in connection
:
220 if message
== 'socks_info':
221 connection
.send(json
.dumps(self
.socks_info
))
225 @contextlib.contextmanager
226 def socks_server(socks_server_class
, request_handler
, bind_ip
=None, **socks_server_kwargs
):
227 server
= server_thread
= None
229 bind_address
= bind_ip
or '127.0.0.1'
230 server_type
= ThreadingTCPServer
if '.' in bind_address
else IPv6ThreadingTCPServer
231 server
= server_type(
232 (bind_address
, 0), functools
.partial(socks_server_class
, request_handler
, socks_server_kwargs
))
233 server_port
= http_server_port(server
)
234 server_thread
= threading
.Thread(target
=server
.serve_forever
)
235 server_thread
.daemon
= True
236 server_thread
.start()
237 if '.' not in bind_address
:
238 yield f
'[{bind_address}]:{server_port}'
240 yield f
'{bind_address}:{server_port}'
243 server
.server_close()
244 server_thread
.join(2.0)
247 class SocksProxyTestContext(abc
.ABC
):
248 REQUEST_HANDLER_CLASS
= None
250 def socks_server(self
, server_class
, *args
, **kwargs
):
251 return socks_server(server_class
, self
.REQUEST_HANDLER_CLASS
, *args
, **kwargs
)
254 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
) -> dict:
255 """return a dict of socks_info"""
258 class HTTPSocksTestProxyContext(SocksProxyTestContext
):
259 REQUEST_HANDLER_CLASS
= SocksHTTPTestRequestHandler
261 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
):
262 request
= Request(f
'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs
)
263 handler
.validate(request
)
264 return json
.loads(handler
.send(request
).read().decode())
267 class WebSocketSocksTestProxyContext(SocksProxyTestContext
):
268 REQUEST_HANDLER_CLASS
= SocksWebSocketTestRequestHandler
270 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
):
271 request
= Request(f
'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs
)
272 handler
.validate(request
)
273 ws
= handler
.send(request
)
274 ws
.send('socks_info')
275 socks_info
= ws
.recv()
277 return json
.loads(socks_info
)
281 'http': HTTPSocksTestProxyContext
,
282 'ws': WebSocketSocksTestProxyContext
,
286 @pytest.fixture(scope
='module')
288 return CTX_MAP
[request
.param
]()
291 @pytest.mark
.parametrize(
294 ('Requests', 'http'),
295 ('Websockets', 'ws'),
296 ('CurlCFFI', 'http'),
298 class TestSocks4Proxy
:
299 def test_socks4_no_auth(self
, handler
, ctx
):
300 with
handler() as rh
:
301 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
302 response
= ctx
.socks_info_request(
303 rh
, proxies
={'all': f
'socks4://{server_address}'})
304 assert response
['version'] == 4
306 def test_socks4_auth(self
, handler
, ctx
):
307 with
handler() as rh
:
308 with ctx
.socks_server(Socks4ProxyHandler
, user_id
='user') as server_address
:
309 with pytest
.raises(ProxyError
):
310 ctx
.socks_info_request(rh
, proxies
={'all': f
'socks4://{server_address}'})
311 response
= ctx
.socks_info_request(
312 rh
, proxies
={'all': f
'socks4://user:@{server_address}'})
313 assert response
['version'] == 4
315 def test_socks4a_ipv4_target(self
, handler
, ctx
):
316 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
317 with
handler(proxies
={'all': f
'socks4a://{server_address}'}) as rh
:
318 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
319 assert response
['version'] == 4
320 assert (response
['ipv4_address'] == '127.0.0.1') != (response
['domain_address'] == '127.0.0.1')
322 def test_socks4a_domain_target(self
, handler
, ctx
):
323 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
324 with
handler(proxies
={'all': f
'socks4a://{server_address}'}) as rh
:
325 response
= ctx
.socks_info_request(rh
, target_domain
='localhost')
326 assert response
['version'] == 4
327 assert response
['ipv4_address'] is None
328 assert response
['domain_address'] == 'localhost'
330 def test_ipv4_client_source_address(self
, handler
, ctx
):
331 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
332 source_address
= f
'127.0.0.{random.randint(5, 255)}'
333 verify_address_availability(source_address
)
334 with
handler(proxies
={'all': f
'socks4://{server_address}'},
335 source_address
=source_address
) as rh
:
336 response
= ctx
.socks_info_request(rh
)
337 assert response
['client_address'][0] == source_address
338 assert response
['version'] == 4
340 @pytest.mark
.parametrize('reply_code', [
341 Socks4CD
.REQUEST_REJECTED_OR_FAILED
,
342 Socks4CD
.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD
,
343 Socks4CD
.REQUEST_REJECTED_DIFFERENT_USERID
,
345 def test_socks4_errors(self
, handler
, ctx
, reply_code
):
346 with ctx
.socks_server(Socks4ProxyHandler
, cd_reply
=reply_code
) as server_address
:
347 with
handler(proxies
={'all': f
'socks4://{server_address}'}) as rh
:
348 with pytest
.raises(ProxyError
):
349 ctx
.socks_info_request(rh
)
351 def test_ipv6_socks4_proxy(self
, handler
, ctx
):
352 with ctx
.socks_server(Socks4ProxyHandler
, bind_ip
='::1') as server_address
:
353 with
handler(proxies
={'all': f
'socks4://{server_address}'}) as rh
:
354 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
355 assert response
['client_address'][0] == '::1'
356 assert response
['ipv4_address'] == '127.0.0.1'
357 assert response
['version'] == 4
359 def test_timeout(self
, handler
, ctx
):
360 with ctx
.socks_server(Socks4ProxyHandler
, sleep
=2) as server_address
:
361 with
handler(proxies
={'all': f
'socks4://{server_address}'}, timeout
=0.5) as rh
:
362 with pytest
.raises(TransportError
):
363 ctx
.socks_info_request(rh
)
366 @pytest.mark
.parametrize(
369 ('Requests', 'http'),
370 ('Websockets', 'ws'),
371 ('CurlCFFI', 'http'),
373 class TestSocks5Proxy
:
375 def test_socks5_no_auth(self
, handler
, ctx
):
376 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
377 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
378 response
= ctx
.socks_info_request(rh
)
379 assert response
['auth_methods'] == [0x0]
380 assert response
['version'] == 5
382 def test_socks5_user_pass(self
, handler
, ctx
):
383 with ctx
.socks_server(Socks5ProxyHandler
, auth
=('test', 'testpass')) as server_address
:
384 with
handler() as rh
:
385 with pytest
.raises(ProxyError
):
386 ctx
.socks_info_request(rh
, proxies
={'all': f
'socks5://{server_address}'})
388 response
= ctx
.socks_info_request(
389 rh
, proxies
={'all': f
'socks5://test:testpass@{server_address}'})
391 assert response
['auth_methods'] == [Socks5Auth
.AUTH_NONE
, Socks5Auth
.AUTH_USER_PASS
]
392 assert response
['version'] == 5
394 def test_socks5_ipv4_target(self
, handler
, ctx
):
395 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
396 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
397 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
398 assert response
['ipv4_address'] == '127.0.0.1'
399 assert response
['version'] == 5
401 def test_socks5_domain_target(self
, handler
, ctx
):
402 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
403 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
404 response
= ctx
.socks_info_request(rh
, target_domain
='localhost')
405 assert (response
['ipv4_address'] == '127.0.0.1') != (response
['ipv6_address'] == '::1')
406 assert response
['version'] == 5
408 def test_socks5h_domain_target(self
, handler
, ctx
):
409 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
410 with
handler(proxies
={'all': f
'socks5h://{server_address}'}) as rh
:
411 response
= ctx
.socks_info_request(rh
, target_domain
='localhost')
412 assert response
['ipv4_address'] is None
413 assert response
['domain_address'] == 'localhost'
414 assert response
['version'] == 5
416 def test_socks5h_ip_target(self
, handler
, ctx
):
417 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
418 with
handler(proxies
={'all': f
'socks5h://{server_address}'}) as rh
:
419 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
420 assert response
['ipv4_address'] == '127.0.0.1'
421 assert response
['domain_address'] is None
422 assert response
['version'] == 5
424 def test_socks5_ipv6_destination(self
, handler
, ctx
):
425 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
426 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
427 response
= ctx
.socks_info_request(rh
, target_domain
='[::1]')
428 assert response
['ipv6_address'] == '::1'
429 assert response
['version'] == 5
431 def test_ipv6_socks5_proxy(self
, handler
, ctx
):
432 with ctx
.socks_server(Socks5ProxyHandler
, bind_ip
='::1') as server_address
:
433 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
434 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
435 assert response
['client_address'][0] == '::1'
436 assert response
['ipv4_address'] == '127.0.0.1'
437 assert response
['version'] == 5
439 # XXX: is there any feasible way of testing IPv6 source addresses?
440 # Same would go for non-proxy source_address test...
441 def test_ipv4_client_source_address(self
, handler
, ctx
):
442 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
443 source_address
= f
'127.0.0.{random.randint(5, 255)}'
444 verify_address_availability(source_address
)
445 with
handler(proxies
={'all': f
'socks5://{server_address}'}, source_address
=source_address
) as rh
:
446 response
= ctx
.socks_info_request(rh
)
447 assert response
['client_address'][0] == source_address
448 assert response
['version'] == 5
450 @pytest.mark
.parametrize('reply_code', [
451 Socks5Reply
.GENERAL_FAILURE
,
452 Socks5Reply
.CONNECTION_NOT_ALLOWED
,
453 Socks5Reply
.NETWORK_UNREACHABLE
,
454 Socks5Reply
.HOST_UNREACHABLE
,
455 Socks5Reply
.CONNECTION_REFUSED
,
456 Socks5Reply
.TTL_EXPIRED
,
457 Socks5Reply
.COMMAND_NOT_SUPPORTED
,
458 Socks5Reply
.ADDRESS_TYPE_NOT_SUPPORTED
,
460 def test_socks5_errors(self
, handler
, ctx
, reply_code
):
461 with ctx
.socks_server(Socks5ProxyHandler
, reply
=reply_code
) as server_address
:
462 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
463 with pytest
.raises(ProxyError
):
464 ctx
.socks_info_request(rh
)
466 def test_timeout(self
, handler
, ctx
):
467 with ctx
.socks_server(Socks5ProxyHandler
, sleep
=2) as server_address
:
468 with
handler(proxies
={'all': f
'socks5://{server_address}'}, timeout
=1) as rh
:
469 with pytest
.raises(TransportError
):
470 ctx
.socks_info_request(rh
)
473 if __name__
== '__main__':