Release 2024.12.03
[yt-dlp.git] / test / test_socks.py
blobf601fc8a5e47ef40b6b8ca03c3279d39a491e990
1 #!/usr/bin/env python3
2 # Allow direct execution
3 import os
4 import sys
5 import threading
6 import unittest
8 import pytest
10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
12 import abc
13 import contextlib
14 import enum
15 import functools
16 import http.server
17 import json
18 import random
19 import socket
20 import struct
21 import time
22 from socketserver import (
23 BaseRequestHandler,
24 StreamRequestHandler,
25 ThreadingTCPServer,
28 from test.helper import http_server_port, verify_address_availability
29 from yt_dlp.networking import Request
30 from yt_dlp.networking.exceptions import ProxyError, TransportError
31 from yt_dlp.socks import (
32 SOCKS4_REPLY_VERSION,
33 SOCKS4_VERSION,
34 SOCKS5_USER_AUTH_SUCCESS,
35 SOCKS5_USER_AUTH_VERSION,
36 SOCKS5_VERSION,
37 Socks5AddressType,
38 Socks5Auth,
41 SOCKS5_USER_AUTH_FAILURE = 0x1
44 class Socks4CD(enum.IntEnum):
45 REQUEST_GRANTED = 90
46 REQUEST_REJECTED_OR_FAILED = 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
48 REQUEST_REJECTED_DIFFERENT_USERID = 93
51 class Socks5Reply(enum.IntEnum):
52 SUCCEEDED = 0x0
53 GENERAL_FAILURE = 0x1
54 CONNECTION_NOT_ALLOWED = 0x2
55 NETWORK_UNREACHABLE = 0x3
56 HOST_UNREACHABLE = 0x4
57 CONNECTION_REFUSED = 0x5
58 TTL_EXPIRED = 0x6
59 COMMAND_NOT_SUPPORTED = 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED = 0x8
63 class SocksTestRequestHandler(BaseRequestHandler):
65 def __init__(self, *args, socks_info=None, **kwargs):
66 self.socks_info = socks_info
67 super().__init__(*args, **kwargs)
70 class SocksProxyHandler(BaseRequestHandler):
71 def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
72 self.socks_kwargs = socks_server_kwargs or {}
73 self.request_handler_class = request_handler_class
74 super().__init__(*args, **kwargs)
77 class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
82 def handle(self):
83 sleep = self.socks_kwargs.get('sleep')
84 if sleep:
85 time.sleep(sleep)
86 version, nmethods = self.connection.recv(2)
87 assert version == SOCKS5_VERSION
88 methods = list(self.connection.recv(nmethods))
90 auth = self.socks_kwargs.get('auth')
92 if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
93 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
94 self.server.close_request(self.request)
95 return
97 elif Socks5Auth.AUTH_USER_PASS in methods:
98 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
100 _, user_len = struct.unpack('!BB', self.connection.recv(2))
101 username = self.connection.recv(user_len).decode()
102 pass_len = ord(self.connection.recv(1))
103 password = self.connection.recv(pass_len).decode()
105 if username == auth[0] and password == auth[1]:
106 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
107 else:
108 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
109 self.server.close_request(self.request)
110 return
112 elif Socks5Auth.AUTH_NONE in methods:
113 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
114 else:
115 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
116 self.server.close_request(self.request)
117 return
119 version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
120 socks_info = {
121 'version': version,
122 'auth_methods': methods,
123 'command': command,
124 'client_address': self.client_address,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
129 if address_type == Socks5AddressType.ATYP_IPV4:
130 socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
131 elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
132 socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
133 elif address_type == Socks5AddressType.ATYP_IPV6:
134 socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
135 else:
136 self.server.close_request(self.request)
138 socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
140 # dummy response, the returned IP is just a placeholder
141 self.connection.sendall(struct.pack(
142 '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
144 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
147 class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
152 def _read_until_null(self):
153 return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
155 def handle(self):
156 sleep = self.socks_kwargs.get('sleep')
157 if sleep:
158 time.sleep(sleep)
159 socks_info = {
160 'version': SOCKS4_VERSION,
161 'command': None,
162 'client_address': self.client_address,
163 'ipv4_address': None,
164 'port': None,
165 'domain_address': None,
167 version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
168 socks_info['port'] = dest_port
169 socks_info['command'] = command
170 if version != SOCKS4_VERSION:
171 self.server.close_request(self.request)
172 return
173 use_remote_dns = False
174 if 0x0 < dest_ip <= 0xFF:
175 use_remote_dns = True
176 else:
177 socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack('!I', dest_ip))
179 user_id = self._read_until_null().decode()
180 if user_id != (self.socks_kwargs.get('user_id') or ''):
181 self.connection.sendall(struct.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
183 self.server.close_request(self.request)
184 return
186 if use_remote_dns:
187 socks_info['domain_address'] = self._read_until_null().decode()
189 # dummy response, the returned IP is just a placeholder
190 self.connection.sendall(
191 struct.pack(
192 '!BBHI', SOCKS4_REPLY_VERSION,
193 self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
195 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
198 class IPv6ThreadingTCPServer(ThreadingTCPServer):
199 address_family = socket.AF_INET6
202 class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
203 def do_GET(self):
204 if self.path == '/socks_info':
205 payload = json.dumps(self.socks_info.copy())
206 self.send_response(200)
207 self.send_header('Content-Type', 'application/json; charset=utf-8')
208 self.send_header('Content-Length', str(len(payload)))
209 self.end_headers()
210 self.wfile.write(payload.encode())
213 class SocksWebSocketTestRequestHandler(SocksTestRequestHandler):
214 def handle(self):
215 import websockets.sync.server
216 protocol = websockets.ServerProtocol()
217 connection = websockets.sync.server.ServerConnection(socket=self.request, protocol=protocol, close_timeout=0)
218 connection.handshake()
219 for message in connection:
220 if message == 'socks_info':
221 connection.send(json.dumps(self.socks_info))
222 connection.close()
225 @contextlib.contextmanager
226 def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
227 server = server_thread = None
228 try:
229 bind_address = bind_ip or '127.0.0.1'
230 server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
231 server = server_type(
232 (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
233 server_port = http_server_port(server)
234 server_thread = threading.Thread(target=server.serve_forever)
235 server_thread.daemon = True
236 server_thread.start()
237 if '.' not in bind_address:
238 yield f'[{bind_address}]:{server_port}'
239 else:
240 yield f'{bind_address}:{server_port}'
241 finally:
242 server.shutdown()
243 server.server_close()
244 server_thread.join(2.0)
247 class SocksProxyTestContext(abc.ABC):
248 REQUEST_HANDLER_CLASS = None
250 def socks_server(self, server_class, *args, **kwargs):
251 return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
253 @abc.abstractmethod
254 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
255 """return a dict of socks_info"""
258 class HTTPSocksTestProxyContext(SocksProxyTestContext):
259 REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
261 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
262 request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
263 handler.validate(request)
264 return json.loads(handler.send(request).read().decode())
267 class WebSocketSocksTestProxyContext(SocksProxyTestContext):
268 REQUEST_HANDLER_CLASS = SocksWebSocketTestRequestHandler
270 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
271 request = Request(f'ws://{target_domain or "127.0.0.1"}:{target_port or "40000"}', **req_kwargs)
272 handler.validate(request)
273 ws = handler.send(request)
274 ws.send('socks_info')
275 socks_info = ws.recv()
276 ws.close()
277 return json.loads(socks_info)
280 CTX_MAP = {
281 'http': HTTPSocksTestProxyContext,
282 'ws': WebSocketSocksTestProxyContext,
286 @pytest.fixture(scope='module')
287 def ctx(request):
288 return CTX_MAP[request.param]()
291 @pytest.mark.parametrize(
292 'handler,ctx', [
293 ('Urllib', 'http'),
294 ('Requests', 'http'),
295 ('Websockets', 'ws'),
296 ('CurlCFFI', 'http'),
297 ], indirect=True)
298 class TestSocks4Proxy:
299 def test_socks4_no_auth(self, handler, ctx):
300 with handler() as rh:
301 with ctx.socks_server(Socks4ProxyHandler) as server_address:
302 response = ctx.socks_info_request(
303 rh, proxies={'all': f'socks4://{server_address}'})
304 assert response['version'] == 4
306 def test_socks4_auth(self, handler, ctx):
307 with handler() as rh:
308 with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
309 with pytest.raises(ProxyError):
310 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
311 response = ctx.socks_info_request(
312 rh, proxies={'all': f'socks4://user:@{server_address}'})
313 assert response['version'] == 4
315 def test_socks4a_ipv4_target(self, handler, ctx):
316 with ctx.socks_server(Socks4ProxyHandler) as server_address:
317 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
318 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
319 assert response['version'] == 4
320 assert (response['ipv4_address'] == '127.0.0.1') != (response['domain_address'] == '127.0.0.1')
322 def test_socks4a_domain_target(self, handler, ctx):
323 with ctx.socks_server(Socks4ProxyHandler) as server_address:
324 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
325 response = ctx.socks_info_request(rh, target_domain='localhost')
326 assert response['version'] == 4
327 assert response['ipv4_address'] is None
328 assert response['domain_address'] == 'localhost'
330 def test_ipv4_client_source_address(self, handler, ctx):
331 with ctx.socks_server(Socks4ProxyHandler) as server_address:
332 source_address = f'127.0.0.{random.randint(5, 255)}'
333 verify_address_availability(source_address)
334 with handler(proxies={'all': f'socks4://{server_address}'},
335 source_address=source_address) as rh:
336 response = ctx.socks_info_request(rh)
337 assert response['client_address'][0] == source_address
338 assert response['version'] == 4
340 @pytest.mark.parametrize('reply_code', [
341 Socks4CD.REQUEST_REJECTED_OR_FAILED,
342 Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
343 Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
345 def test_socks4_errors(self, handler, ctx, reply_code):
346 with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
347 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
348 with pytest.raises(ProxyError):
349 ctx.socks_info_request(rh)
351 def test_ipv6_socks4_proxy(self, handler, ctx):
352 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
353 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
354 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
355 assert response['client_address'][0] == '::1'
356 assert response['ipv4_address'] == '127.0.0.1'
357 assert response['version'] == 4
359 def test_timeout(self, handler, ctx):
360 with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
361 with handler(proxies={'all': f'socks4://{server_address}'}, timeout=0.5) as rh:
362 with pytest.raises(TransportError):
363 ctx.socks_info_request(rh)
366 @pytest.mark.parametrize(
367 'handler,ctx', [
368 ('Urllib', 'http'),
369 ('Requests', 'http'),
370 ('Websockets', 'ws'),
371 ('CurlCFFI', 'http'),
372 ], indirect=True)
373 class TestSocks5Proxy:
375 def test_socks5_no_auth(self, handler, ctx):
376 with ctx.socks_server(Socks5ProxyHandler) as server_address:
377 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
378 response = ctx.socks_info_request(rh)
379 assert response['auth_methods'] == [0x0]
380 assert response['version'] == 5
382 def test_socks5_user_pass(self, handler, ctx):
383 with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
384 with handler() as rh:
385 with pytest.raises(ProxyError):
386 ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
388 response = ctx.socks_info_request(
389 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
391 assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
392 assert response['version'] == 5
394 def test_socks5_ipv4_target(self, handler, ctx):
395 with ctx.socks_server(Socks5ProxyHandler) as server_address:
396 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
397 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
398 assert response['ipv4_address'] == '127.0.0.1'
399 assert response['version'] == 5
401 def test_socks5_domain_target(self, handler, ctx):
402 with ctx.socks_server(Socks5ProxyHandler) as server_address:
403 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
404 response = ctx.socks_info_request(rh, target_domain='localhost')
405 assert (response['ipv4_address'] == '127.0.0.1') != (response['ipv6_address'] == '::1')
406 assert response['version'] == 5
408 def test_socks5h_domain_target(self, handler, ctx):
409 with ctx.socks_server(Socks5ProxyHandler) as server_address:
410 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
411 response = ctx.socks_info_request(rh, target_domain='localhost')
412 assert response['ipv4_address'] is None
413 assert response['domain_address'] == 'localhost'
414 assert response['version'] == 5
416 def test_socks5h_ip_target(self, handler, ctx):
417 with ctx.socks_server(Socks5ProxyHandler) as server_address:
418 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
419 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
420 assert response['ipv4_address'] == '127.0.0.1'
421 assert response['domain_address'] is None
422 assert response['version'] == 5
424 def test_socks5_ipv6_destination(self, handler, ctx):
425 with ctx.socks_server(Socks5ProxyHandler) as server_address:
426 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
427 response = ctx.socks_info_request(rh, target_domain='[::1]')
428 assert response['ipv6_address'] == '::1'
429 assert response['version'] == 5
431 def test_ipv6_socks5_proxy(self, handler, ctx):
432 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
433 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
434 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
435 assert response['client_address'][0] == '::1'
436 assert response['ipv4_address'] == '127.0.0.1'
437 assert response['version'] == 5
439 # XXX: is there any feasible way of testing IPv6 source addresses?
440 # Same would go for non-proxy source_address test...
441 def test_ipv4_client_source_address(self, handler, ctx):
442 with ctx.socks_server(Socks5ProxyHandler) as server_address:
443 source_address = f'127.0.0.{random.randint(5, 255)}'
444 verify_address_availability(source_address)
445 with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
446 response = ctx.socks_info_request(rh)
447 assert response['client_address'][0] == source_address
448 assert response['version'] == 5
450 @pytest.mark.parametrize('reply_code', [
451 Socks5Reply.GENERAL_FAILURE,
452 Socks5Reply.CONNECTION_NOT_ALLOWED,
453 Socks5Reply.NETWORK_UNREACHABLE,
454 Socks5Reply.HOST_UNREACHABLE,
455 Socks5Reply.CONNECTION_REFUSED,
456 Socks5Reply.TTL_EXPIRED,
457 Socks5Reply.COMMAND_NOT_SUPPORTED,
458 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
460 def test_socks5_errors(self, handler, ctx, reply_code):
461 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
462 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
463 with pytest.raises(ProxyError):
464 ctx.socks_info_request(rh)
466 def test_timeout(self, handler, ctx):
467 with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
468 with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
469 with pytest.raises(TransportError):
470 ctx.socks_info_request(rh)
473 if __name__ == '__main__':
474 unittest.main()