2 # Allow direct execution
10 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
22 from socketserver
import (
28 from test
.helper
import http_server_port
, verify_address_availability
29 from yt_dlp
.networking
import Request
30 from yt_dlp
.networking
.exceptions
import ProxyError
, TransportError
31 from yt_dlp
.socks
import (
34 SOCKS5_USER_AUTH_SUCCESS
,
35 SOCKS5_USER_AUTH_VERSION
,
41 SOCKS5_USER_AUTH_FAILURE
= 0x1
44 class Socks4CD(enum
.IntEnum
):
46 REQUEST_REJECTED_OR_FAILED
= 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD
= 92
48 REQUEST_REJECTED_DIFFERENT_USERID
= 93
51 class Socks5Reply(enum
.IntEnum
):
54 CONNECTION_NOT_ALLOWED
= 0x2
55 NETWORK_UNREACHABLE
= 0x3
56 HOST_UNREACHABLE
= 0x4
57 CONNECTION_REFUSED
= 0x5
59 COMMAND_NOT_SUPPORTED
= 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED
= 0x8
63 class SocksTestRequestHandler(BaseRequestHandler
):
65 def __init__(self
, *args
, socks_info
=None, **kwargs
):
66 self
.socks_info
= socks_info
67 super().__init
__(*args
, **kwargs
)
70 class SocksProxyHandler(BaseRequestHandler
):
71 def __init__(self
, request_handler_class
, socks_server_kwargs
, *args
, **kwargs
):
72 self
.socks_kwargs
= socks_server_kwargs
or {}
73 self
.request_handler_class
= request_handler_class
74 super().__init
__(*args
, **kwargs
)
77 class Socks5ProxyHandler(StreamRequestHandler
, SocksProxyHandler
):
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
83 sleep
= self
.socks_kwargs
.get('sleep')
86 version
, nmethods
= self
.connection
.recv(2)
87 assert version
== SOCKS5_VERSION
88 methods
= list(self
.connection
.recv(nmethods
))
90 auth
= self
.socks_kwargs
.get('auth')
92 if auth
is not None and Socks5Auth
.AUTH_USER_PASS
not in methods
:
93 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NO_ACCEPTABLE
))
94 self
.server
.close_request(self
.request
)
97 elif Socks5Auth
.AUTH_USER_PASS
in methods
:
98 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_USER_PASS
))
100 _
, user_len
= struct
.unpack('!BB', self
.connection
.recv(2))
101 username
= self
.connection
.recv(user_len
).decode()
102 pass_len
= ord(self
.connection
.recv(1))
103 password
= self
.connection
.recv(pass_len
).decode()
105 if username
== auth
[0] and password
== auth
[1]:
106 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_USER_AUTH_VERSION
, SOCKS5_USER_AUTH_SUCCESS
))
108 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_USER_AUTH_VERSION
, SOCKS5_USER_AUTH_FAILURE
))
109 self
.server
.close_request(self
.request
)
112 elif Socks5Auth
.AUTH_NONE
in methods
:
113 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NONE
))
115 self
.connection
.sendall(struct
.pack('!BB', SOCKS5_VERSION
, Socks5Auth
.AUTH_NO_ACCEPTABLE
))
116 self
.server
.close_request(self
.request
)
119 version
, command
, _
, address_type
= struct
.unpack('!BBBB', self
.connection
.recv(4))
122 'auth_methods': methods
,
124 'client_address': self
.client_address
,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
129 if address_type
== Socks5AddressType
.ATYP_IPV4
:
130 socks_info
['ipv4_address'] = socket
.inet_ntoa(self
.connection
.recv(4))
131 elif address_type
== Socks5AddressType
.ATYP_DOMAINNAME
:
132 socks_info
['domain_address'] = self
.connection
.recv(ord(self
.connection
.recv(1))).decode()
133 elif address_type
== Socks5AddressType
.ATYP_IPV6
:
134 socks_info
['ipv6_address'] = socket
.inet_ntop(socket
.AF_INET6
, self
.connection
.recv(16))
136 self
.server
.close_request(self
.request
)
138 socks_info
['port'] = struct
.unpack('!H', self
.connection
.recv(2))[0]
140 # dummy response, the returned IP is just a placeholder
141 self
.connection
.sendall(struct
.pack(
142 '!BBBBIH', SOCKS5_VERSION
, self
.socks_kwargs
.get('reply', Socks5Reply
.SUCCEEDED
), 0x0, 0x1, 0x7f000001, 40000))
144 self
.request_handler_class(self
.request
, self
.client_address
, self
.server
, socks_info
=socks_info
)
147 class Socks4ProxyHandler(StreamRequestHandler
, SocksProxyHandler
):
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
152 def _read_until_null(self
):
153 return b
''.join(iter(functools
.partial(self
.connection
.recv
, 1), b
'\x00'))
156 sleep
= self
.socks_kwargs
.get('sleep')
160 'version': SOCKS4_VERSION
,
162 'client_address': self
.client_address
,
163 'ipv4_address': None,
165 'domain_address': None,
167 version
, command
, dest_port
, dest_ip
= struct
.unpack('!BBHI', self
.connection
.recv(8))
168 socks_info
['port'] = dest_port
169 socks_info
['command'] = command
170 if version
!= SOCKS4_VERSION
:
171 self
.server
.close_request(self
.request
)
173 use_remote_dns
= False
174 if 0x0 < dest_ip
<= 0xFF:
175 use_remote_dns
= True
177 socks_info
['ipv4_address'] = socket
.inet_ntoa(struct
.pack('!I', dest_ip
))
179 user_id
= self
._read
_until
_null
().decode()
180 if user_id
!= (self
.socks_kwargs
.get('user_id') or ''):
181 self
.connection
.sendall(struct
.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION
, Socks4CD
.REQUEST_REJECTED_DIFFERENT_USERID
, 0x00, 0x00000000))
183 self
.server
.close_request(self
.request
)
187 socks_info
['domain_address'] = self
._read
_until
_null
().decode()
189 # dummy response, the returned IP is just a placeholder
190 self
.connection
.sendall(
192 '!BBHI', SOCKS4_REPLY_VERSION
,
193 self
.socks_kwargs
.get('cd_reply', Socks4CD
.REQUEST_GRANTED
), 40000, 0x7f000001))
195 self
.request_handler_class(self
.request
, self
.client_address
, self
.server
, socks_info
=socks_info
)
198 class IPv6ThreadingTCPServer(ThreadingTCPServer
):
199 address_family
= socket
.AF_INET6
202 class SocksHTTPTestRequestHandler(http
.server
.BaseHTTPRequestHandler
, SocksTestRequestHandler
):
204 if self
.path
== '/socks_info':
205 payload
= json
.dumps(self
.socks_info
.copy())
206 self
.send_response(200)
207 self
.send_header('Content-Type', 'application/json; charset=utf-8')
208 self
.send_header('Content-Length', str(len(payload
)))
210 self
.wfile
.write(payload
.encode())
213 class SocksWebSocketTestRequestHandler(SocksTestRequestHandler
):
215 import websockets
.sync
.server
216 protocol
= websockets
.ServerProtocol()
217 connection
= websockets
.sync
.server
.ServerConnection(socket
=self
.request
, protocol
=protocol
, close_timeout
=0)
218 connection
.handshake()
219 connection
.send(json
.dumps(self
.socks_info
))
223 @contextlib.contextmanager
224 def socks_server(socks_server_class
, request_handler
, bind_ip
=None, **socks_server_kwargs
):
225 server
= server_thread
= None
227 bind_address
= bind_ip
or '127.0.0.1'
228 server_type
= ThreadingTCPServer
if '.' in bind_address
else IPv6ThreadingTCPServer
229 server
= server_type(
230 (bind_address
, 0), functools
.partial(socks_server_class
, request_handler
, socks_server_kwargs
))
231 server_port
= http_server_port(server
)
232 server_thread
= threading
.Thread(target
=server
.serve_forever
)
233 server_thread
.daemon
= True
234 server_thread
.start()
235 if '.' not in bind_address
:
236 yield f
'[{bind_address}]:{server_port}'
238 yield f
'{bind_address}:{server_port}'
241 server
.server_close()
242 server_thread
.join(2.0)
245 class SocksProxyTestContext(abc
.ABC
):
246 REQUEST_HANDLER_CLASS
= None
248 def socks_server(self
, server_class
, *args
, **kwargs
):
249 return socks_server(server_class
, self
.REQUEST_HANDLER_CLASS
, *args
, **kwargs
)
252 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
) -> dict:
253 """return a dict of socks_info"""
256 class HTTPSocksTestProxyContext(SocksProxyTestContext
):
257 REQUEST_HANDLER_CLASS
= SocksHTTPTestRequestHandler
259 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
):
260 request
= Request(f
'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs
)
261 handler
.validate(request
)
262 return json
.loads(handler
.send(request
).read().decode())
265 class WebSocketSocksTestProxyContext(SocksProxyTestContext
):
266 REQUEST_HANDLER_CLASS
= SocksWebSocketTestRequestHandler
268 def socks_info_request(self
, handler
, target_domain
=None, target_port
=None, **req_kwargs
):
269 request
= Request(f
'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs
)
270 handler
.validate(request
)
271 ws
= handler
.send(request
)
272 ws
.send('socks_info')
273 socks_info
= ws
.recv()
275 return json
.loads(socks_info
)
279 'http': HTTPSocksTestProxyContext
,
280 'ws': WebSocketSocksTestProxyContext
,
284 @pytest.fixture(scope
='module')
286 return CTX_MAP
[request
.param
]()
289 @pytest.mark
.parametrize(
292 ('Requests', 'http'),
293 ('Websockets', 'ws'),
294 ('CurlCFFI', 'http'),
296 class TestSocks4Proxy
:
297 def test_socks4_no_auth(self
, handler
, ctx
):
298 with
handler() as rh
:
299 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
300 response
= ctx
.socks_info_request(
301 rh
, proxies
={'all': f
'socks4://{server_address}'})
302 assert response
['version'] == 4
304 def test_socks4_auth(self
, handler
, ctx
):
305 with
handler() as rh
:
306 with ctx
.socks_server(Socks4ProxyHandler
, user_id
='user') as server_address
:
307 with pytest
.raises(ProxyError
):
308 ctx
.socks_info_request(rh
, proxies
={'all': f
'socks4://{server_address}'})
309 response
= ctx
.socks_info_request(
310 rh
, proxies
={'all': f
'socks4://user:@{server_address}'})
311 assert response
['version'] == 4
313 def test_socks4a_ipv4_target(self
, handler
, ctx
):
314 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
315 with
handler(proxies
={'all': f
'socks4a://{server_address}'}) as rh
:
316 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
317 assert response
['version'] == 4
318 assert (response
['ipv4_address'] == '127.0.0.1') != (response
['domain_address'] == '127.0.0.1')
320 def test_socks4a_domain_target(self
, handler
, ctx
):
321 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
322 with
handler(proxies
={'all': f
'socks4a://{server_address}'}) as rh
:
323 response
= ctx
.socks_info_request(rh
, target_domain
='localhost')
324 assert response
['version'] == 4
325 assert response
['ipv4_address'] is None
326 assert response
['domain_address'] == 'localhost'
328 def test_ipv4_client_source_address(self
, handler
, ctx
):
329 with ctx
.socks_server(Socks4ProxyHandler
) as server_address
:
330 source_address
= f
'127.0.0.{random.randint(5, 255)}'
331 verify_address_availability(source_address
)
332 with
handler(proxies
={'all': f
'socks4://{server_address}'},
333 source_address
=source_address
) as rh
:
334 response
= ctx
.socks_info_request(rh
)
335 assert response
['client_address'][0] == source_address
336 assert response
['version'] == 4
338 @pytest.mark
.parametrize('reply_code', [
339 Socks4CD
.REQUEST_REJECTED_OR_FAILED
,
340 Socks4CD
.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD
,
341 Socks4CD
.REQUEST_REJECTED_DIFFERENT_USERID
,
343 def test_socks4_errors(self
, handler
, ctx
, reply_code
):
344 with ctx
.socks_server(Socks4ProxyHandler
, cd_reply
=reply_code
) as server_address
:
345 with
handler(proxies
={'all': f
'socks4://{server_address}'}) as rh
:
346 with pytest
.raises(ProxyError
):
347 ctx
.socks_info_request(rh
)
349 def test_ipv6_socks4_proxy(self
, handler
, ctx
):
350 with ctx
.socks_server(Socks4ProxyHandler
, bind_ip
='::1') as server_address
:
351 with
handler(proxies
={'all': f
'socks4://{server_address}'}) as rh
:
352 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
353 assert response
['client_address'][0] == '::1'
354 assert response
['ipv4_address'] == '127.0.0.1'
355 assert response
['version'] == 4
357 def test_timeout(self
, handler
, ctx
):
358 with ctx
.socks_server(Socks4ProxyHandler
, sleep
=2) as server_address
:
359 with
handler(proxies
={'all': f
'socks4://{server_address}'}, timeout
=0.5) as rh
:
360 with pytest
.raises(TransportError
):
361 ctx
.socks_info_request(rh
)
364 @pytest.mark
.parametrize(
367 ('Requests', 'http'),
368 ('Websockets', 'ws'),
369 ('CurlCFFI', 'http'),
371 class TestSocks5Proxy
:
373 def test_socks5_no_auth(self
, handler
, ctx
):
374 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
375 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
376 response
= ctx
.socks_info_request(rh
)
377 assert response
['auth_methods'] == [0x0]
378 assert response
['version'] == 5
380 def test_socks5_user_pass(self
, handler
, ctx
):
381 with ctx
.socks_server(Socks5ProxyHandler
, auth
=('test', 'testpass')) as server_address
:
382 with
handler() as rh
:
383 with pytest
.raises(ProxyError
):
384 ctx
.socks_info_request(rh
, proxies
={'all': f
'socks5://{server_address}'})
386 response
= ctx
.socks_info_request(
387 rh
, proxies
={'all': f
'socks5://test:testpass@{server_address}'})
389 assert response
['auth_methods'] == [Socks5Auth
.AUTH_NONE
, Socks5Auth
.AUTH_USER_PASS
]
390 assert response
['version'] == 5
392 def test_socks5_ipv4_target(self
, handler
, ctx
):
393 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
394 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
395 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
396 assert response
['ipv4_address'] == '127.0.0.1'
397 assert response
['version'] == 5
399 def test_socks5_domain_target(self
, handler
, ctx
):
400 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
401 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
402 response
= ctx
.socks_info_request(rh
, target_domain
='localhost')
403 assert (response
['ipv4_address'] == '127.0.0.1') != (response
['ipv6_address'] == '::1')
404 assert response
['version'] == 5
406 def test_socks5h_domain_target(self
, handler
, ctx
):
407 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
408 with
handler(proxies
={'all': f
'socks5h://{server_address}'}) as rh
:
409 response
= ctx
.socks_info_request(rh
, target_domain
='localhost')
410 assert response
['ipv4_address'] is None
411 assert response
['domain_address'] == 'localhost'
412 assert response
['version'] == 5
414 def test_socks5h_ip_target(self
, handler
, ctx
):
415 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
416 with
handler(proxies
={'all': f
'socks5h://{server_address}'}) as rh
:
417 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
418 assert response
['ipv4_address'] == '127.0.0.1'
419 assert response
['domain_address'] is None
420 assert response
['version'] == 5
422 def test_socks5_ipv6_destination(self
, handler
, ctx
):
423 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
424 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
425 response
= ctx
.socks_info_request(rh
, target_domain
='[::1]')
426 assert response
['ipv6_address'] == '::1'
427 assert response
['version'] == 5
429 def test_ipv6_socks5_proxy(self
, handler
, ctx
):
430 with ctx
.socks_server(Socks5ProxyHandler
, bind_ip
='::1') as server_address
:
431 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
432 response
= ctx
.socks_info_request(rh
, target_domain
='127.0.0.1')
433 assert response
['client_address'][0] == '::1'
434 assert response
['ipv4_address'] == '127.0.0.1'
435 assert response
['version'] == 5
437 # XXX: is there any feasible way of testing IPv6 source addresses?
438 # Same would go for non-proxy source_address test...
439 def test_ipv4_client_source_address(self
, handler
, ctx
):
440 with ctx
.socks_server(Socks5ProxyHandler
) as server_address
:
441 source_address
= f
'127.0.0.{random.randint(5, 255)}'
442 verify_address_availability(source_address
)
443 with
handler(proxies
={'all': f
'socks5://{server_address}'}, source_address
=source_address
) as rh
:
444 response
= ctx
.socks_info_request(rh
)
445 assert response
['client_address'][0] == source_address
446 assert response
['version'] == 5
448 @pytest.mark
.parametrize('reply_code', [
449 Socks5Reply
.GENERAL_FAILURE
,
450 Socks5Reply
.CONNECTION_NOT_ALLOWED
,
451 Socks5Reply
.NETWORK_UNREACHABLE
,
452 Socks5Reply
.HOST_UNREACHABLE
,
453 Socks5Reply
.CONNECTION_REFUSED
,
454 Socks5Reply
.TTL_EXPIRED
,
455 Socks5Reply
.COMMAND_NOT_SUPPORTED
,
456 Socks5Reply
.ADDRESS_TYPE_NOT_SUPPORTED
,
458 def test_socks5_errors(self
, handler
, ctx
, reply_code
):
459 with ctx
.socks_server(Socks5ProxyHandler
, reply
=reply_code
) as server_address
:
460 with
handler(proxies
={'all': f
'socks5://{server_address}'}) as rh
:
461 with pytest
.raises(ProxyError
):
462 ctx
.socks_info_request(rh
)
464 def test_timeout(self
, handler
, ctx
):
465 with ctx
.socks_server(Socks5ProxyHandler
, sleep
=2) as server_address
:
466 with
handler(proxies
={'all': f
'socks5://{server_address}'}, timeout
=1) as rh
:
467 with pytest
.raises(TransportError
):
468 ctx
.socks_info_request(rh
)
471 if __name__
== '__main__':