Release 2023.11.16
[yt-dlp.git] / test / test_socks.py
blobd8ac88dad5846751756988d5a1bd8d7f3dc36265
1 #!/usr/bin/env python3
2 # Allow direct execution
3 import os
4 import sys
5 import threading
6 import unittest
8 import pytest
10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
12 import abc
13 import contextlib
14 import enum
15 import functools
16 import http.server
17 import json
18 import random
19 import socket
20 import struct
21 import time
22 from socketserver import (
23 BaseRequestHandler,
24 StreamRequestHandler,
25 ThreadingTCPServer,
28 from test.helper import http_server_port
29 from yt_dlp.networking import Request
30 from yt_dlp.networking.exceptions import ProxyError, TransportError
31 from yt_dlp.socks import (
32 SOCKS4_REPLY_VERSION,
33 SOCKS4_VERSION,
34 SOCKS5_USER_AUTH_SUCCESS,
35 SOCKS5_USER_AUTH_VERSION,
36 SOCKS5_VERSION,
37 Socks5AddressType,
38 Socks5Auth,
41 SOCKS5_USER_AUTH_FAILURE = 0x1
44 class Socks4CD(enum.IntEnum):
45 REQUEST_GRANTED = 90
46 REQUEST_REJECTED_OR_FAILED = 91
47 REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
48 REQUEST_REJECTED_DIFFERENT_USERID = 93
51 class Socks5Reply(enum.IntEnum):
52 SUCCEEDED = 0x0
53 GENERAL_FAILURE = 0x1
54 CONNECTION_NOT_ALLOWED = 0x2
55 NETWORK_UNREACHABLE = 0x3
56 HOST_UNREACHABLE = 0x4
57 CONNECTION_REFUSED = 0x5
58 TTL_EXPIRED = 0x6
59 COMMAND_NOT_SUPPORTED = 0x7
60 ADDRESS_TYPE_NOT_SUPPORTED = 0x8
63 class SocksTestRequestHandler(BaseRequestHandler):
65 def __init__(self, *args, socks_info=None, **kwargs):
66 self.socks_info = socks_info
67 super().__init__(*args, **kwargs)
70 class SocksProxyHandler(BaseRequestHandler):
71 def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
72 self.socks_kwargs = socks_server_kwargs or {}
73 self.request_handler_class = request_handler_class
74 super().__init__(*args, **kwargs)
77 class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
79 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
80 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
82 def handle(self):
83 sleep = self.socks_kwargs.get('sleep')
84 if sleep:
85 time.sleep(sleep)
86 version, nmethods = self.connection.recv(2)
87 assert version == SOCKS5_VERSION
88 methods = list(self.connection.recv(nmethods))
90 auth = self.socks_kwargs.get('auth')
92 if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
93 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
94 self.server.close_request(self.request)
95 return
97 elif Socks5Auth.AUTH_USER_PASS in methods:
98 self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
100 _, user_len = struct.unpack('!BB', self.connection.recv(2))
101 username = self.connection.recv(user_len).decode()
102 pass_len = ord(self.connection.recv(1))
103 password = self.connection.recv(pass_len).decode()
105 if username == auth[0] and password == auth[1]:
106 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
107 else:
108 self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
109 self.server.close_request(self.request)
110 return
112 elif Socks5Auth.AUTH_NONE in methods:
113 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
114 else:
115 self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
116 self.server.close_request(self.request)
117 return
119 version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
120 socks_info = {
121 'version': version,
122 'auth_methods': methods,
123 'command': command,
124 'client_address': self.client_address,
125 'ipv4_address': None,
126 'domain_address': None,
127 'ipv6_address': None,
129 if address_type == Socks5AddressType.ATYP_IPV4:
130 socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
131 elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
132 socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
133 elif address_type == Socks5AddressType.ATYP_IPV6:
134 socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
135 else:
136 self.server.close_request(self.request)
138 socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
140 # dummy response, the returned IP is just a placeholder
141 self.connection.sendall(struct.pack(
142 '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
144 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
147 class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
149 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
150 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
152 def _read_until_null(self):
153 return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
155 def handle(self):
156 sleep = self.socks_kwargs.get('sleep')
157 if sleep:
158 time.sleep(sleep)
159 socks_info = {
160 'version': SOCKS4_VERSION,
161 'command': None,
162 'client_address': self.client_address,
163 'ipv4_address': None,
164 'port': None,
165 'domain_address': None,
167 version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
168 socks_info['port'] = dest_port
169 socks_info['command'] = command
170 if version != SOCKS4_VERSION:
171 self.server.close_request(self.request)
172 return
173 use_remote_dns = False
174 if 0x0 < dest_ip <= 0xFF:
175 use_remote_dns = True
176 else:
177 socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
179 user_id = self._read_until_null().decode()
180 if user_id != (self.socks_kwargs.get('user_id') or ''):
181 self.connection.sendall(struct.pack(
182 '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
183 self.server.close_request(self.request)
184 return
186 if use_remote_dns:
187 socks_info['domain_address'] = self._read_until_null().decode()
189 # dummy response, the returned IP is just a placeholder
190 self.connection.sendall(
191 struct.pack(
192 '!BBHI', SOCKS4_REPLY_VERSION,
193 self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
195 self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
198 class IPv6ThreadingTCPServer(ThreadingTCPServer):
199 address_family = socket.AF_INET6
202 class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
203 def do_GET(self):
204 if self.path == '/socks_info':
205 payload = json.dumps(self.socks_info.copy())
206 self.send_response(200)
207 self.send_header('Content-Type', 'application/json; charset=utf-8')
208 self.send_header('Content-Length', str(len(payload)))
209 self.end_headers()
210 self.wfile.write(payload.encode())
213 @contextlib.contextmanager
214 def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
215 server = server_thread = None
216 try:
217 bind_address = bind_ip or '127.0.0.1'
218 server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
219 server = server_type(
220 (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
221 server_port = http_server_port(server)
222 server_thread = threading.Thread(target=server.serve_forever)
223 server_thread.daemon = True
224 server_thread.start()
225 if '.' not in bind_address:
226 yield f'[{bind_address}]:{server_port}'
227 else:
228 yield f'{bind_address}:{server_port}'
229 finally:
230 server.shutdown()
231 server.server_close()
232 server_thread.join(2.0)
235 class SocksProxyTestContext(abc.ABC):
236 REQUEST_HANDLER_CLASS = None
238 def socks_server(self, server_class, *args, **kwargs):
239 return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
241 @abc.abstractmethod
242 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
243 """return a dict of socks_info"""
246 class HTTPSocksTestProxyContext(SocksProxyTestContext):
247 REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
249 def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
250 request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
251 handler.validate(request)
252 return json.loads(handler.send(request).read().decode())
255 CTX_MAP = {
256 'http': HTTPSocksTestProxyContext,
260 @pytest.fixture(scope='module')
261 def ctx(request):
262 return CTX_MAP[request.param]()
265 class TestSocks4Proxy:
266 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
267 def test_socks4_no_auth(self, handler, ctx):
268 with handler() as rh:
269 with ctx.socks_server(Socks4ProxyHandler) as server_address:
270 response = ctx.socks_info_request(
271 rh, proxies={'all': f'socks4://{server_address}'})
272 assert response['version'] == 4
274 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
275 def test_socks4_auth(self, handler, ctx):
276 with handler() as rh:
277 with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
278 with pytest.raises(ProxyError):
279 ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
280 response = ctx.socks_info_request(
281 rh, proxies={'all': f'socks4://user:@{server_address}'})
282 assert response['version'] == 4
284 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
285 def test_socks4a_ipv4_target(self, handler, ctx):
286 with ctx.socks_server(Socks4ProxyHandler) as server_address:
287 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
288 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
289 assert response['version'] == 4
290 assert (response['ipv4_address'] == '127.0.0.1') != (response['domain_address'] == '127.0.0.1')
292 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
293 def test_socks4a_domain_target(self, handler, ctx):
294 with ctx.socks_server(Socks4ProxyHandler) as server_address:
295 with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
296 response = ctx.socks_info_request(rh, target_domain='localhost')
297 assert response['version'] == 4
298 assert response['ipv4_address'] is None
299 assert response['domain_address'] == 'localhost'
301 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
302 def test_ipv4_client_source_address(self, handler, ctx):
303 with ctx.socks_server(Socks4ProxyHandler) as server_address:
304 source_address = f'127.0.0.{random.randint(5, 255)}'
305 with handler(proxies={'all': f'socks4://{server_address}'},
306 source_address=source_address) as rh:
307 response = ctx.socks_info_request(rh)
308 assert response['client_address'][0] == source_address
309 assert response['version'] == 4
311 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
312 @pytest.mark.parametrize('reply_code', [
313 Socks4CD.REQUEST_REJECTED_OR_FAILED,
314 Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
315 Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
317 def test_socks4_errors(self, handler, ctx, reply_code):
318 with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
319 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
320 with pytest.raises(ProxyError):
321 ctx.socks_info_request(rh)
323 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
324 def test_ipv6_socks4_proxy(self, handler, ctx):
325 with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
326 with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
327 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
328 assert response['client_address'][0] == '::1'
329 assert response['ipv4_address'] == '127.0.0.1'
330 assert response['version'] == 4
332 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
333 def test_timeout(self, handler, ctx):
334 with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
335 with handler(proxies={'all': f'socks4://{server_address}'}, timeout=0.5) as rh:
336 with pytest.raises(TransportError):
337 ctx.socks_info_request(rh)
340 class TestSocks5Proxy:
342 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
343 def test_socks5_no_auth(self, handler, ctx):
344 with ctx.socks_server(Socks5ProxyHandler) as server_address:
345 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
346 response = ctx.socks_info_request(rh)
347 assert response['auth_methods'] == [0x0]
348 assert response['version'] == 5
350 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
351 def test_socks5_user_pass(self, handler, ctx):
352 with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
353 with handler() as rh:
354 with pytest.raises(ProxyError):
355 ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
357 response = ctx.socks_info_request(
358 rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
360 assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
361 assert response['version'] == 5
363 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
364 def test_socks5_ipv4_target(self, handler, ctx):
365 with ctx.socks_server(Socks5ProxyHandler) as server_address:
366 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
367 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
368 assert response['ipv4_address'] == '127.0.0.1'
369 assert response['version'] == 5
371 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
372 def test_socks5_domain_target(self, handler, ctx):
373 with ctx.socks_server(Socks5ProxyHandler) as server_address:
374 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
375 response = ctx.socks_info_request(rh, target_domain='localhost')
376 assert (response['ipv4_address'] == '127.0.0.1') != (response['ipv6_address'] == '::1')
377 assert response['version'] == 5
379 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
380 def test_socks5h_domain_target(self, handler, ctx):
381 with ctx.socks_server(Socks5ProxyHandler) as server_address:
382 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
383 response = ctx.socks_info_request(rh, target_domain='localhost')
384 assert response['ipv4_address'] is None
385 assert response['domain_address'] == 'localhost'
386 assert response['version'] == 5
388 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
389 def test_socks5h_ip_target(self, handler, ctx):
390 with ctx.socks_server(Socks5ProxyHandler) as server_address:
391 with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
392 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
393 assert response['ipv4_address'] == '127.0.0.1'
394 assert response['domain_address'] is None
395 assert response['version'] == 5
397 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
398 def test_socks5_ipv6_destination(self, handler, ctx):
399 with ctx.socks_server(Socks5ProxyHandler) as server_address:
400 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
401 response = ctx.socks_info_request(rh, target_domain='[::1]')
402 assert response['ipv6_address'] == '::1'
403 assert response['version'] == 5
405 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
406 def test_ipv6_socks5_proxy(self, handler, ctx):
407 with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
408 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
409 response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
410 assert response['client_address'][0] == '::1'
411 assert response['ipv4_address'] == '127.0.0.1'
412 assert response['version'] == 5
414 # XXX: is there any feasible way of testing IPv6 source addresses?
415 # Same would go for non-proxy source_address test...
416 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
417 def test_ipv4_client_source_address(self, handler, ctx):
418 with ctx.socks_server(Socks5ProxyHandler) as server_address:
419 source_address = f'127.0.0.{random.randint(5, 255)}'
420 with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
421 response = ctx.socks_info_request(rh)
422 assert response['client_address'][0] == source_address
423 assert response['version'] == 5
425 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http'), ('Requests', 'http')], indirect=True)
426 @pytest.mark.parametrize('reply_code', [
427 Socks5Reply.GENERAL_FAILURE,
428 Socks5Reply.CONNECTION_NOT_ALLOWED,
429 Socks5Reply.NETWORK_UNREACHABLE,
430 Socks5Reply.HOST_UNREACHABLE,
431 Socks5Reply.CONNECTION_REFUSED,
432 Socks5Reply.TTL_EXPIRED,
433 Socks5Reply.COMMAND_NOT_SUPPORTED,
434 Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
436 def test_socks5_errors(self, handler, ctx, reply_code):
437 with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
438 with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
439 with pytest.raises(ProxyError):
440 ctx.socks_info_request(rh)
442 @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
443 def test_timeout(self, handler, ctx):
444 with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
445 with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
446 with pytest.raises(TransportError):
447 ctx.socks_info_request(rh)
450 if __name__ == '__main__':
451 unittest.main()