[ie/youtube] Add age-gate workaround for some embeddable videos (#11821)
[yt-dlp.git] / test / test_networking.py
blobd96624af18ec34532d4676b9cb5802b74aec8655
1 #!/usr/bin/env python3
3 # Allow direct execution
4 import os
5 import sys
7 import pytest
9 from yt_dlp.networking.common import Features, DEFAULT_TIMEOUT
11 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
13 import gzip
14 import http.client
15 import http.cookiejar
16 import http.server
17 import io
18 import logging
19 import pathlib
20 import random
21 import ssl
22 import tempfile
23 import threading
24 import time
25 import urllib.error
26 import urllib.request
27 import warnings
28 import zlib
29 from email.message import Message
30 from http.cookiejar import CookieJar
32 from test.helper import (
33 FakeYDL,
34 http_server_port,
35 validate_and_send,
36 verify_address_availability,
38 from yt_dlp.cookies import YoutubeDLCookieJar
39 from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
40 from yt_dlp.networking import (
41 HEADRequest,
42 PUTRequest,
43 Request,
44 RequestDirector,
45 RequestHandler,
46 Response,
48 from yt_dlp.networking._urllib import UrllibRH
49 from yt_dlp.networking.exceptions import (
50 CertificateVerifyError,
51 HTTPError,
52 IncompleteRead,
53 NoSupportingHandlers,
54 ProxyError,
55 RequestError,
56 SSLError,
57 TransportError,
58 UnsupportedRequest,
60 from yt_dlp.networking.impersonate import (
61 ImpersonateRequestHandler,
62 ImpersonateTarget,
64 from yt_dlp.utils import YoutubeDLError
65 from yt_dlp.utils._utils import _YDLLogger as FakeLogger
66 from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
68 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
71 class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
72 protocol_version = 'HTTP/1.1'
73 default_request_version = 'HTTP/1.1'
75 def log_message(self, format, *args):
76 pass
78 def _headers(self):
79 payload = str(self.headers).encode()
80 self.send_response(200)
81 self.send_header('Content-Type', 'application/json')
82 self.send_header('Content-Length', str(len(payload)))
83 self.end_headers()
84 self.wfile.write(payload)
86 def _redirect(self):
87 self.send_response(int(self.path[len('/redirect_'):]))
88 self.send_header('Location', '/method')
89 self.send_header('Content-Length', '0')
90 self.end_headers()
92 def _method(self, method, payload=None):
93 self.send_response(200)
94 self.send_header('Content-Length', str(len(payload or '')))
95 self.send_header('Method', method)
96 self.end_headers()
97 if payload:
98 self.wfile.write(payload)
100 def _status(self, status):
101 payload = f'<html>{status} NOT FOUND</html>'.encode()
102 self.send_response(int(status))
103 self.send_header('Content-Type', 'text/html; charset=utf-8')
104 self.send_header('Content-Length', str(len(payload)))
105 self.end_headers()
106 self.wfile.write(payload)
108 def _read_data(self):
109 if 'Content-Length' in self.headers:
110 return self.rfile.read(int(self.headers['Content-Length']))
111 else:
112 return b''
114 def do_POST(self):
115 data = self._read_data() + str(self.headers).encode()
116 if self.path.startswith('/redirect_'):
117 self._redirect()
118 elif self.path.startswith('/method'):
119 self._method('POST', data)
120 elif self.path.startswith('/headers'):
121 self._headers()
122 else:
123 self._status(404)
125 def do_HEAD(self):
126 if self.path.startswith('/redirect_'):
127 self._redirect()
128 elif self.path.startswith('/method'):
129 self._method('HEAD')
130 else:
131 self._status(404)
133 def do_PUT(self):
134 data = self._read_data() + str(self.headers).encode()
135 if self.path.startswith('/redirect_'):
136 self._redirect()
137 elif self.path.startswith('/method'):
138 self._method('PUT', data)
139 else:
140 self._status(404)
142 def do_GET(self):
143 if self.path == '/video.html':
144 payload = b'<html><video src="/vid.mp4" /></html>'
145 self.send_response(200)
146 self.send_header('Content-Type', 'text/html; charset=utf-8')
147 self.send_header('Content-Length', str(len(payload)))
148 self.end_headers()
149 self.wfile.write(payload)
150 elif self.path == '/vid.mp4':
151 payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
152 self.send_response(200)
153 self.send_header('Content-Type', 'video/mp4')
154 self.send_header('Content-Length', str(len(payload)))
155 self.end_headers()
156 self.wfile.write(payload)
157 elif self.path == '/%E4%B8%AD%E6%96%87.html':
158 payload = b'<html><video src="/vid.mp4" /></html>'
159 self.send_response(200)
160 self.send_header('Content-Type', 'text/html; charset=utf-8')
161 self.send_header('Content-Length', str(len(payload)))
162 self.end_headers()
163 self.wfile.write(payload)
164 elif self.path == '/%c7%9f':
165 payload = b'<html><video src="/vid.mp4" /></html>'
166 self.send_response(200)
167 self.send_header('Content-Type', 'text/html; charset=utf-8')
168 self.send_header('Content-Length', str(len(payload)))
169 self.end_headers()
170 self.wfile.write(payload)
171 elif self.path.startswith('/redirect_loop'):
172 self.send_response(301)
173 self.send_header('Location', self.path)
174 self.send_header('Content-Length', '0')
175 self.end_headers()
176 elif self.path == '/redirect_dotsegments':
177 self.send_response(301)
178 # redirect to /headers but with dot segments before
179 self.send_header('Location', '/a/b/./../../headers')
180 self.send_header('Content-Length', '0')
181 self.end_headers()
182 elif self.path == '/redirect_dotsegments_absolute':
183 self.send_response(301)
184 # redirect to /headers but with dot segments before - absolute url
185 self.send_header('Location', f'http://127.0.0.1:{http_server_port(self.server)}/a/b/./../../headers')
186 self.send_header('Content-Length', '0')
187 self.end_headers()
188 elif self.path.startswith('/redirect_'):
189 self._redirect()
190 elif self.path.startswith('/method'):
191 self._method('GET', str(self.headers).encode())
192 elif self.path.startswith('/headers'):
193 self._headers()
194 elif self.path.startswith('/308-to-headers'):
195 self.send_response(308)
196 # redirect to "localhost" for testing cookie redirection handling
197 self.send_header('Location', f'http://localhost:{self.connection.getsockname()[1]}/headers')
198 self.send_header('Content-Length', '0')
199 self.end_headers()
200 elif self.path == '/trailing_garbage':
201 payload = b'<html><video src="/vid.mp4" /></html>'
202 self.send_response(200)
203 self.send_header('Content-Type', 'text/html; charset=utf-8')
204 self.send_header('Content-Encoding', 'gzip')
205 buf = io.BytesIO()
206 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
207 f.write(payload)
208 compressed = buf.getvalue() + b'trailing garbage'
209 self.send_header('Content-Length', str(len(compressed)))
210 self.end_headers()
211 self.wfile.write(compressed)
212 elif self.path == '/302-non-ascii-redirect':
213 new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
214 self.send_response(301)
215 self.send_header('Location', new_url)
216 self.send_header('Content-Length', '0')
217 self.end_headers()
218 elif self.path == '/content-encoding':
219 encodings = self.headers.get('ytdl-encoding', '')
220 payload = b'<html><video src="/vid.mp4" /></html>'
221 for encoding in filter(None, (e.strip() for e in encodings.split(','))):
222 if encoding == 'br' and brotli:
223 payload = brotli.compress(payload)
224 elif encoding == 'gzip':
225 buf = io.BytesIO()
226 with gzip.GzipFile(fileobj=buf, mode='wb') as f:
227 f.write(payload)
228 payload = buf.getvalue()
229 elif encoding == 'deflate':
230 payload = zlib.compress(payload)
231 elif encoding == 'unsupported':
232 payload = b'raw'
233 break
234 else:
235 self._status(415)
236 return
237 self.send_response(200)
238 self.send_header('Content-Encoding', encodings)
239 self.send_header('Content-Length', str(len(payload)))
240 self.end_headers()
241 self.wfile.write(payload)
242 elif self.path.startswith('/gen_'):
243 payload = b'<html></html>'
244 self.send_response(int(self.path[len('/gen_'):]))
245 self.send_header('Content-Type', 'text/html; charset=utf-8')
246 self.send_header('Content-Length', str(len(payload)))
247 self.end_headers()
248 self.wfile.write(payload)
249 elif self.path.startswith('/incompleteread'):
250 payload = b'<html></html>'
251 self.send_response(200)
252 self.send_header('Content-Type', 'text/html; charset=utf-8')
253 self.send_header('Content-Length', '234234')
254 self.end_headers()
255 self.wfile.write(payload)
256 self.finish()
257 elif self.path.startswith('/timeout_'):
258 time.sleep(int(self.path[len('/timeout_'):]))
259 self._headers()
260 elif self.path == '/source_address':
261 payload = str(self.client_address[0]).encode()
262 self.send_response(200)
263 self.send_header('Content-Type', 'text/html; charset=utf-8')
264 self.send_header('Content-Length', str(len(payload)))
265 self.end_headers()
266 self.wfile.write(payload)
267 self.finish()
268 elif self.path == '/get_cookie':
269 self.send_response(200)
270 self.send_header('Set-Cookie', 'test=ytdlp; path=/')
271 self.end_headers()
272 self.finish()
273 else:
274 self._status(404)
276 def send_header(self, keyword, value):
278 Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
279 This is against what is defined in RFC 3986, however we need to test we support this
280 since some sites incorrectly do this.
282 if keyword.lower() == 'connection':
283 return super().send_header(keyword, value)
285 if not hasattr(self, '_headers_buffer'):
286 self._headers_buffer = []
288 self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
291 class TestRequestHandlerBase:
292 @classmethod
293 def setup_class(cls):
294 cls.http_httpd = http.server.ThreadingHTTPServer(
295 ('127.0.0.1', 0), HTTPTestRequestHandler)
296 cls.http_port = http_server_port(cls.http_httpd)
297 cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
298 # FIXME: we should probably stop the http server thread after each test
299 # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
300 cls.http_server_thread.daemon = True
301 cls.http_server_thread.start()
303 # HTTPS server
304 certfn = os.path.join(TEST_DIR, 'testcert.pem')
305 cls.https_httpd = http.server.ThreadingHTTPServer(
306 ('127.0.0.1', 0), HTTPTestRequestHandler)
307 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
308 sslctx.load_cert_chain(certfn, None)
309 cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
310 cls.https_port = http_server_port(cls.https_httpd)
311 cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
312 cls.https_server_thread.daemon = True
313 cls.https_server_thread.start()
316 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
317 class TestHTTPRequestHandler(TestRequestHandlerBase):
319 def test_verify_cert(self, handler):
320 with handler() as rh:
321 with pytest.raises(CertificateVerifyError):
322 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
324 with handler(verify=False) as rh:
325 r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
326 assert r.status == 200
327 r.close()
329 def test_ssl_error(self, handler):
330 # HTTPS server with too old TLS version
331 # XXX: is there a better way to test this than to create a new server?
332 https_httpd = http.server.ThreadingHTTPServer(
333 ('127.0.0.1', 0), HTTPTestRequestHandler)
334 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
335 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
336 https_port = http_server_port(https_httpd)
337 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
338 https_server_thread.daemon = True
339 https_server_thread.start()
341 with handler(verify=False) as rh:
342 with pytest.raises(SSLError, match=r'(?i)ssl(?:v3|/tls).alert.handshake.failure') as exc_info:
343 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
344 assert not issubclass(exc_info.type, CertificateVerifyError)
346 @pytest.mark.skip_handler('CurlCFFI', 'legacy_ssl ignored by CurlCFFI')
347 def test_legacy_ssl_extension(self, handler):
348 # HTTPS server with old ciphers
349 # XXX: is there a better way to test this than to create a new server?
350 https_httpd = http.server.ThreadingHTTPServer(
351 ('127.0.0.1', 0), HTTPTestRequestHandler)
352 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
353 sslctx.maximum_version = ssl.TLSVersion.TLSv1_2
354 sslctx.set_ciphers('SHA1:AESCCM:aDSS:eNULL:aNULL')
355 sslctx.load_cert_chain(os.path.join(TEST_DIR, 'testcert.pem'), None)
356 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
357 https_port = http_server_port(https_httpd)
358 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
359 https_server_thread.daemon = True
360 https_server_thread.start()
362 with handler(verify=False) as rh:
363 res = validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers', extensions={'legacy_ssl': True}))
364 assert res.status == 200
365 res.close()
367 # Ensure only applies to request extension
368 with pytest.raises(SSLError):
369 validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
371 @pytest.mark.skip_handler('CurlCFFI', 'legacy_ssl ignored by CurlCFFI')
372 def test_legacy_ssl_support(self, handler):
373 # HTTPS server with old ciphers
374 # XXX: is there a better way to test this than to create a new server?
375 https_httpd = http.server.ThreadingHTTPServer(
376 ('127.0.0.1', 0), HTTPTestRequestHandler)
377 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
378 sslctx.maximum_version = ssl.TLSVersion.TLSv1_2
379 sslctx.set_ciphers('SHA1:AESCCM:aDSS:eNULL:aNULL')
380 sslctx.load_cert_chain(os.path.join(TEST_DIR, 'testcert.pem'), None)
381 https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
382 https_port = http_server_port(https_httpd)
383 https_server_thread = threading.Thread(target=https_httpd.serve_forever)
384 https_server_thread.daemon = True
385 https_server_thread.start()
387 with handler(verify=False, legacy_ssl_support=True) as rh:
388 res = validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
389 assert res.status == 200
390 res.close()
392 def test_percent_encode(self, handler):
393 with handler() as rh:
394 # Unicode characters should be encoded with uppercase percent-encoding
395 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
396 assert res.status == 200
397 res.close()
398 # don't normalize existing percent encodings
399 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
400 assert res.status == 200
401 res.close()
403 @pytest.mark.parametrize('path', [
404 '/a/b/./../../headers',
405 '/redirect_dotsegments',
406 # https://github.com/yt-dlp/yt-dlp/issues/9020
407 '/redirect_dotsegments_absolute',
409 def test_remove_dot_segments(self, handler, path):
410 with handler(verbose=True) as rh:
411 # This isn't a comprehensive test,
412 # but it should be enough to check whether the handler is removing dot segments in required scenarios
413 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}{path}'))
414 assert res.status == 200
415 assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
416 res.close()
418 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi (non-standard)')
419 def test_unicode_path_redirection(self, handler):
420 with handler() as rh:
421 r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
422 assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
423 r.close()
425 def test_raise_http_error(self, handler):
426 with handler() as rh:
427 for bad_status in (400, 500, 599, 302):
428 with pytest.raises(HTTPError):
429 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_{bad_status}'))
431 # Should not raise an error
432 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200')).close()
434 def test_response_url(self, handler):
435 with handler() as rh:
436 # Response url should be that of the last url in redirect chain
437 res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
438 assert res.url == f'http://127.0.0.1:{self.http_port}/method'
439 res.close()
440 res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
441 assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
442 res2.close()
444 # Covers some basic cases we expect some level of consistency between request handlers for
445 @pytest.mark.parametrize('redirect_status,method,expected', [
446 # A 303 must either use GET or HEAD for subsequent request
447 (303, 'POST', ('', 'GET', False)),
448 (303, 'HEAD', ('', 'HEAD', False)),
450 # 301 and 302 turn POST only into a GET
451 (301, 'POST', ('', 'GET', False)),
452 (301, 'HEAD', ('', 'HEAD', False)),
453 (302, 'POST', ('', 'GET', False)),
454 (302, 'HEAD', ('', 'HEAD', False)),
456 # 307 and 308 should not change method
457 (307, 'POST', ('testdata', 'POST', True)),
458 (308, 'POST', ('testdata', 'POST', True)),
459 (307, 'HEAD', ('', 'HEAD', False)),
460 (308, 'HEAD', ('', 'HEAD', False)),
462 def test_redirect(self, handler, redirect_status, method, expected):
463 with handler() as rh:
464 data = b'testdata' if method == 'POST' else None
465 headers = {}
466 if data is not None:
467 headers['Content-Type'] = 'application/test'
468 res = validate_and_send(
469 rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data,
470 headers=headers))
472 headers = b''
473 data_recv = b''
474 if data is not None:
475 data_recv += res.read(len(data))
476 if data_recv != data:
477 headers += data_recv
478 data_recv = b''
480 headers += res.read()
482 assert expected[0] == data_recv.decode()
483 assert expected[1] == res.headers.get('method')
484 assert expected[2] == ('content-length' in headers.decode().lower())
486 def test_request_cookie_header(self, handler):
487 # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
488 with handler() as rh:
489 # Specified Cookie header should be used
490 res = validate_and_send(
491 rh, Request(
492 f'http://127.0.0.1:{self.http_port}/headers',
493 headers={'Cookie': 'test=test'})).read().decode()
494 assert 'cookie: test=test' in res.lower()
496 # Specified Cookie header should be removed on any redirect
497 res = validate_and_send(
498 rh, Request(
499 f'http://127.0.0.1:{self.http_port}/308-to-headers',
500 headers={'Cookie': 'test=test2'})).read().decode()
501 assert 'cookie: test=test2' not in res.lower()
503 # Specified Cookie header should override global cookiejar for that request
504 # Whether cookies from the cookiejar is applied on the redirect is considered undefined for now
505 cookiejar = YoutubeDLCookieJar()
506 cookiejar.set_cookie(http.cookiejar.Cookie(
507 version=0, name='test', value='ytdlp', port=None, port_specified=False,
508 domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
509 path_specified=True, secure=False, expires=None, discard=False, comment=None,
510 comment_url=None, rest={}))
512 with handler(cookiejar=cookiejar) as rh:
513 data = validate_and_send(
514 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test3'})).read()
515 assert b'cookie: test=ytdlp' not in data.lower()
516 assert b'cookie: test=test3' in data.lower()
518 def test_redirect_loop(self, handler):
519 with handler() as rh:
520 with pytest.raises(HTTPError, match='redirect loop'):
521 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
523 def test_incompleteread(self, handler):
524 with handler(timeout=2) as rh:
525 with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
526 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/incompleteread')).read()
528 def test_cookies(self, handler):
529 cookiejar = YoutubeDLCookieJar()
530 cookiejar.set_cookie(http.cookiejar.Cookie(
531 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
532 False, '/headers', True, False, None, False, None, None, {}))
534 with handler(cookiejar=cookiejar) as rh:
535 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
536 assert b'cookie: test=ytdlp' in data.lower()
538 # Per request
539 with handler() as rh:
540 data = validate_and_send(
541 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
542 assert b'cookie: test=ytdlp' in data.lower()
544 def test_cookie_sync_only_cookiejar(self, handler):
545 # Ensure that cookies are ONLY being handled by the cookiejar
546 with handler() as rh:
547 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/get_cookie', extensions={'cookiejar': YoutubeDLCookieJar()}))
548 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': YoutubeDLCookieJar()})).read()
549 assert b'cookie: test=ytdlp' not in data.lower()
551 def test_cookie_sync_delete_cookie(self, handler):
552 # Ensure that cookies are ONLY being handled by the cookiejar
553 cookiejar = YoutubeDLCookieJar()
554 with handler(cookiejar=cookiejar) as rh:
555 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/get_cookie'))
556 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
557 assert b'cookie: test=ytdlp' in data.lower()
558 cookiejar.clear_session_cookies()
559 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
560 assert b'cookie: test=ytdlp' not in data.lower()
562 def test_headers(self, handler):
564 with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
565 # Global Headers
566 data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read().lower()
567 assert b'test1: test' in data
569 # Per request headers, merged with global
570 data = validate_and_send(rh, Request(
571 f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read().lower()
572 assert b'test1: test' in data
573 assert b'test2: changed' in data
574 assert b'test2: test2' not in data
575 assert b'test3: test3' in data
577 def test_read_timeout(self, handler):
578 with handler() as rh:
579 # Default timeout is 20 seconds, so this should go through
580 validate_and_send(
581 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
583 with handler(timeout=0.1) as rh:
584 with pytest.raises(TransportError):
585 validate_and_send(
586 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
588 # Per request timeout, should override handler timeout
589 validate_and_send(
590 rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
592 def test_connect_timeout(self, handler):
593 # nothing should be listening on this port
594 connect_timeout_url = 'http://10.255.255.255'
595 with handler(timeout=0.01) as rh, pytest.raises(TransportError):
596 now = time.time()
597 validate_and_send(rh, Request(connect_timeout_url))
598 assert time.time() - now < DEFAULT_TIMEOUT
600 # Per request timeout, should override handler timeout
601 request = Request(connect_timeout_url, extensions={'timeout': 0.01})
602 with handler() as rh, pytest.raises(TransportError):
603 now = time.time()
604 validate_and_send(rh, request)
605 assert time.time() - now < DEFAULT_TIMEOUT
607 def test_source_address(self, handler):
608 source_address = f'127.0.0.{random.randint(5, 255)}'
609 # on some systems these loopback addresses we need for testing may not be available
610 # see: https://github.com/yt-dlp/yt-dlp/issues/8890
611 verify_address_availability(source_address)
612 with handler(source_address=source_address) as rh:
613 data = validate_and_send(
614 rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
615 assert source_address == data
617 # Not supported by CurlCFFI
618 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
619 def test_gzip_trailing_garbage(self, handler):
620 with handler() as rh:
621 data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
622 assert data == '<html><video src="/vid.mp4" /></html>'
624 @pytest.mark.skip_handler('CurlCFFI', 'not applicable to curl-cffi')
625 @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
626 def test_brotli(self, handler):
627 with handler() as rh:
628 res = validate_and_send(
629 rh, Request(
630 f'http://127.0.0.1:{self.http_port}/content-encoding',
631 headers={'ytdl-encoding': 'br'}))
632 assert res.headers.get('Content-Encoding') == 'br'
633 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
635 def test_deflate(self, handler):
636 with handler() as rh:
637 res = validate_and_send(
638 rh, Request(
639 f'http://127.0.0.1:{self.http_port}/content-encoding',
640 headers={'ytdl-encoding': 'deflate'}))
641 assert res.headers.get('Content-Encoding') == 'deflate'
642 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
644 def test_gzip(self, handler):
645 with handler() as rh:
646 res = validate_and_send(
647 rh, Request(
648 f'http://127.0.0.1:{self.http_port}/content-encoding',
649 headers={'ytdl-encoding': 'gzip'}))
650 assert res.headers.get('Content-Encoding') == 'gzip'
651 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
653 def test_multiple_encodings(self, handler):
654 with handler() as rh:
655 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
656 res = validate_and_send(
657 rh, Request(
658 f'http://127.0.0.1:{self.http_port}/content-encoding',
659 headers={'ytdl-encoding': pair}))
660 assert res.headers.get('Content-Encoding') == pair
661 assert res.read() == b'<html><video src="/vid.mp4" /></html>'
663 @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
664 def test_unsupported_encoding(self, handler):
665 with handler() as rh:
666 res = validate_and_send(
667 rh, Request(
668 f'http://127.0.0.1:{self.http_port}/content-encoding',
669 headers={'ytdl-encoding': 'unsupported', 'Accept-Encoding': '*'}))
670 assert res.headers.get('Content-Encoding') == 'unsupported'
671 assert res.read() == b'raw'
673 def test_read(self, handler):
674 with handler() as rh:
675 res = validate_and_send(
676 rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
677 assert res.readable()
678 assert res.read(1) == b'H'
679 assert res.read(3) == b'ost'
680 assert res.read().decode().endswith('\n\n')
681 assert res.read() == b''
683 def test_request_disable_proxy(self, handler):
684 for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
685 # Given the handler is configured with a proxy
686 with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
687 # When a proxy is explicitly set to None for the request
688 res = validate_and_send(
689 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'http': None}))
690 # Then no proxy should be used
691 res.close()
692 assert res.status == 200
694 @pytest.mark.skip_handlers_if(
695 lambda _, handler: Features.NO_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support NO_PROXY')
696 def test_noproxy(self, handler):
697 for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
698 # Given the handler is configured with a proxy
699 with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
700 for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
701 # When request no proxy includes the request url host
702 nop_response = validate_and_send(
703 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy}))
704 # Then the proxy should not be used
705 assert nop_response.status == 200
706 nop_response.close()
708 @pytest.mark.skip_handlers_if(
709 lambda _, handler: Features.ALL_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support ALL_PROXY')
710 def test_allproxy(self, handler):
711 # This is a bit of a hacky test, but it should be enough to check whether the handler is using the proxy.
712 # 0.1s might not be enough of a timeout if proxy is not used in all cases, but should still get failures.
713 with handler(proxies={'all': 'http://10.255.255.255'}, timeout=0.1) as rh:
714 with pytest.raises(TransportError):
715 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).close()
717 with handler(timeout=0.1) as rh:
718 with pytest.raises(TransportError):
719 validate_and_send(
720 rh, Request(
721 f'http://127.0.0.1:{self.http_port}/headers', proxies={'all': 'http://10.255.255.255'})).close()
724 @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
725 class TestClientCertificate:
726 @classmethod
727 def setup_class(cls):
728 certfn = os.path.join(TEST_DIR, 'testcert.pem')
729 cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
730 cacertfn = os.path.join(cls.certdir, 'ca.crt')
731 cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
732 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
733 sslctx.verify_mode = ssl.CERT_REQUIRED
734 sslctx.load_verify_locations(cafile=cacertfn)
735 sslctx.load_cert_chain(certfn, None)
736 cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
737 cls.port = http_server_port(cls.httpd)
738 cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
739 cls.server_thread.daemon = True
740 cls.server_thread.start()
742 def _run_test(self, handler, **handler_kwargs):
743 with handler(
744 # Disable client-side validation of unacceptable self-signed testcert.pem
745 # The test is of a check on the server side, so unaffected
746 verify=False,
747 **handler_kwargs,
748 ) as rh:
749 validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
751 def test_certificate_combined_nopass(self, handler):
752 self._run_test(handler, client_cert={
753 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
756 def test_certificate_nocombined_nopass(self, handler):
757 self._run_test(handler, client_cert={
758 'client_certificate': os.path.join(self.certdir, 'client.crt'),
759 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
762 def test_certificate_combined_pass(self, handler):
763 self._run_test(handler, client_cert={
764 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
765 'client_certificate_password': 'foobar',
768 def test_certificate_nocombined_pass(self, handler):
769 self._run_test(handler, client_cert={
770 'client_certificate': os.path.join(self.certdir, 'client.crt'),
771 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
772 'client_certificate_password': 'foobar',
776 @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
777 class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
778 def test_supported_impersonate_targets(self, handler):
779 with handler(headers=std_headers) as rh:
780 # note: this assumes the impersonate request handler supports the impersonate extension
781 for target in rh.supported_targets:
782 res = validate_and_send(rh, Request(
783 f'http://127.0.0.1:{self.http_port}/headers', extensions={'impersonate': target}))
784 assert res.status == 200
785 assert std_headers['user-agent'].lower() not in res.read().decode().lower()
787 def test_response_extensions(self, handler):
788 with handler() as rh:
789 for target in rh.supported_targets:
790 request = Request(
791 f'http://127.0.0.1:{self.http_port}/gen_200', extensions={'impersonate': target})
792 res = validate_and_send(rh, request)
793 assert res.extensions['impersonate'] == rh._get_request_target(request)
795 def test_http_error_response_extensions(self, handler):
796 with handler() as rh:
797 for target in rh.supported_targets:
798 request = Request(
799 f'http://127.0.0.1:{self.http_port}/gen_404', extensions={'impersonate': target})
800 try:
801 validate_and_send(rh, request)
802 except HTTPError as e:
803 res = e.response
804 assert res.extensions['impersonate'] == rh._get_request_target(request)
807 class TestRequestHandlerMisc:
808 """Misc generic tests for request handlers, not related to request or validation testing"""
809 @pytest.mark.parametrize('handler,logger_name', [
810 ('Requests', 'urllib3'),
811 ('Websockets', 'websockets.client'),
812 ('Websockets', 'websockets.server'),
813 ], indirect=['handler'])
814 def test_remove_logging_handler(self, handler, logger_name):
815 # Ensure any logging handlers, which may contain a YoutubeDL instance,
816 # are removed when we close the request handler
817 # See: https://github.com/yt-dlp/yt-dlp/issues/8922
818 logging_handlers = logging.getLogger(logger_name).handlers
819 before_count = len(logging_handlers)
820 rh = handler()
821 assert len(logging_handlers) == before_count + 1
822 rh.close()
823 assert len(logging_handlers) == before_count
825 def test_wrap_request_errors(self):
826 class TestRequestHandler(RequestHandler):
827 def _validate(self, request):
828 if request.headers.get('x-fail'):
829 raise UnsupportedRequest('test error')
831 def _send(self, request: Request):
832 raise RequestError('test error')
834 with TestRequestHandler(logger=FakeLogger()) as rh:
835 with pytest.raises(UnsupportedRequest, match='test error') as exc_info:
836 rh.validate(Request('http://example.com', headers={'x-fail': '1'}))
837 assert exc_info.value.handler is rh
839 with pytest.raises(RequestError, match='test error') as exc_info:
840 rh.send(Request('http://example.com'))
841 assert exc_info.value.handler is rh
844 @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
845 class TestUrllibRequestHandler(TestRequestHandlerBase):
846 def test_file_urls(self, handler):
847 # See https://github.com/ytdl-org/youtube-dl/issues/8227
848 tf = tempfile.NamedTemporaryFile(delete=False)
849 tf.write(b'foobar')
850 tf.close()
851 req = Request(pathlib.Path(tf.name).as_uri())
852 with handler() as rh:
853 with pytest.raises(UnsupportedRequest):
854 rh.validate(req)
856 # Test that urllib never loaded FileHandler
857 with pytest.raises(TransportError):
858 rh.send(req)
860 with handler(enable_file_urls=True) as rh:
861 res = validate_and_send(rh, req)
862 assert res.read() == b'foobar'
863 res.close()
865 os.unlink(tf.name)
867 def test_http_error_returns_content(self, handler):
868 # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
869 def get_response():
870 with handler() as rh:
871 # headers url
872 try:
873 validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
874 except HTTPError as e:
875 return e.response
877 assert get_response().read() == b'<html></html>'
879 def test_verify_cert_error_text(self, handler):
880 # Check the output of the error message
881 with handler() as rh:
882 with pytest.raises(
883 CertificateVerifyError,
884 match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate',
886 validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
888 @pytest.mark.parametrize('req,match,version_check', [
889 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
890 # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
892 Request('http://127.0.0.1', method='GET\n'),
893 'method can\'t contain control characters',
894 lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5),
896 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
897 # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
899 Request('http://127.0.0. 1', method='GET'),
900 'URL can\'t contain control characters',
901 lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3),
903 # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
904 (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
906 def test_httplib_validation_errors(self, handler, req, match, version_check):
907 if version_check and version_check(sys.version_info):
908 pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
910 with handler() as rh:
911 with pytest.raises(RequestError, match=match) as exc_info:
912 validate_and_send(rh, req)
913 assert not isinstance(exc_info.value, TransportError)
916 @pytest.mark.parametrize('handler', ['Requests'], indirect=True)
917 class TestRequestsRequestHandler(TestRequestHandlerBase):
918 @pytest.mark.parametrize('raised,expected', [
919 (lambda: requests.exceptions.ConnectTimeout(), TransportError),
920 (lambda: requests.exceptions.ReadTimeout(), TransportError),
921 (lambda: requests.exceptions.Timeout(), TransportError),
922 (lambda: requests.exceptions.ConnectionError(), TransportError),
923 (lambda: requests.exceptions.ProxyError(), ProxyError),
924 (lambda: requests.exceptions.SSLError('12[CERTIFICATE_VERIFY_FAILED]34'), CertificateVerifyError),
925 (lambda: requests.exceptions.SSLError(), SSLError),
926 (lambda: requests.exceptions.InvalidURL(), RequestError),
927 (lambda: requests.exceptions.InvalidHeader(), RequestError),
928 # catch-all: https://github.com/psf/requests/blob/main/src/requests/adapters.py#L535
929 (lambda: urllib3.exceptions.HTTPError(), TransportError),
930 (lambda: requests.exceptions.RequestException(), RequestError),
931 # (lambda: requests.exceptions.TooManyRedirects(), HTTPError) - Needs a response object
933 def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
934 with handler() as rh:
935 def mock_get_instance(*args, **kwargs):
936 class MockSession:
937 def request(self, *args, **kwargs):
938 raise raised()
939 return MockSession()
941 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
943 with pytest.raises(expected) as exc_info:
944 rh.send(Request('http://fake'))
946 assert exc_info.type is expected
948 @pytest.mark.parametrize('raised,expected,match', [
949 (lambda: urllib3.exceptions.SSLError(), SSLError, None),
950 (lambda: urllib3.exceptions.TimeoutError(), TransportError, None),
951 (lambda: urllib3.exceptions.ReadTimeoutError(None, None, None), TransportError, None),
952 (lambda: urllib3.exceptions.ProtocolError(), TransportError, None),
953 (lambda: urllib3.exceptions.DecodeError(), TransportError, None),
954 (lambda: urllib3.exceptions.HTTPError(), TransportError, None), # catch-all
956 lambda: urllib3.exceptions.ProtocolError('error', http.client.IncompleteRead(partial=b'abc', expected=4)),
957 IncompleteRead,
958 '3 bytes read, 4 more expected',
961 lambda: urllib3.exceptions.ProtocolError('error', urllib3.exceptions.IncompleteRead(partial=3, expected=5)),
962 IncompleteRead,
963 '3 bytes read, 5 more expected',
966 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
967 from requests.models import Response as RequestsResponse
968 from urllib3.response import HTTPResponse as Urllib3Response
970 from yt_dlp.networking._requests import RequestsResponseAdapter
971 requests_res = RequestsResponse()
972 requests_res.raw = Urllib3Response(body=b'', status=200)
973 res = RequestsResponseAdapter(requests_res)
975 def mock_read(*args, **kwargs):
976 raise raised()
977 monkeypatch.setattr(res.fp, 'read', mock_read)
979 with pytest.raises(expected, match=match) as exc_info:
980 res.read()
982 assert exc_info.type is expected
984 def test_close(self, handler, monkeypatch):
985 rh = handler()
986 session = rh._get_instance(cookiejar=rh.cookiejar)
987 called = False
988 original_close = session.close
990 def mock_close(*args, **kwargs):
991 nonlocal called
992 called = True
993 return original_close(*args, **kwargs)
995 monkeypatch.setattr(session, 'close', mock_close)
996 rh.close()
997 assert called
1000 @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
1001 class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
1003 @pytest.mark.parametrize('params,extensions', [
1004 ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
1005 ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
1007 def test_impersonate(self, handler, params, extensions):
1008 with handler(headers=std_headers, **params) as rh:
1009 res = validate_and_send(
1010 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions=extensions)).read().decode()
1011 assert 'sec-ch-ua: "Chromium";v="110"' in res
1012 # Check that user agent is added over ours
1013 assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
1015 def test_headers(self, handler):
1016 with handler(headers=std_headers) as rh:
1017 # Ensure curl-impersonate overrides our standard headers (usually added
1018 res = validate_and_send(
1019 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
1020 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
1022 assert std_headers['user-agent'].lower() not in res
1023 assert std_headers['accept-language'].lower() not in res
1024 assert std_headers['sec-fetch-mode'].lower() not in res
1025 # other than UA, custom headers that differ from std_headers should be kept
1026 assert 'sec-fetch-mode: custom' in res
1027 assert 'x-custom: test' in res
1028 # but when not impersonating don't remove std_headers
1029 res = validate_and_send(
1030 rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
1031 # std_headers should be present
1032 for k, v in std_headers.items():
1033 assert f'{k}: {v}'.lower() in res
1035 @pytest.mark.parametrize('raised,expected,match', [
1036 (lambda: curl_cffi.requests.errors.RequestsError(
1037 '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
1038 (lambda: curl_cffi.requests.errors.RequestsError(
1039 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1040 (lambda: curl_cffi.requests.errors.RequestsError(
1041 '', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
1043 def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
1044 import curl_cffi.requests
1046 from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
1047 curl_res = curl_cffi.requests.Response()
1048 res = CurlCFFIResponseAdapter(curl_res)
1050 def mock_read(*args, **kwargs):
1051 try:
1052 raise raised()
1053 except Exception as e:
1054 e.response = curl_res
1055 raise
1056 monkeypatch.setattr(res.fp, 'read', mock_read)
1058 with pytest.raises(expected, match=match) as exc_info:
1059 res.read()
1061 assert exc_info.type is expected
1063 @pytest.mark.parametrize('raised,expected,match', [
1064 (lambda: curl_cffi.requests.errors.RequestsError(
1065 '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
1066 (lambda: curl_cffi.requests.errors.RequestsError(
1067 '', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
1068 (lambda: curl_cffi.requests.errors.RequestsError(
1069 '', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
1070 (lambda: curl_cffi.requests.errors.RequestsError(
1071 '', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
1072 (lambda: curl_cffi.requests.errors.RequestsError(
1073 '', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
1075 def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
1076 import curl_cffi.requests
1077 curl_res = curl_cffi.requests.Response()
1078 curl_res.status_code = 301
1080 with handler() as rh:
1081 original_get_instance = rh._get_instance
1083 def mock_get_instance(*args, **kwargs):
1084 instance = original_get_instance(*args, **kwargs)
1086 def request(*_, **__):
1087 try:
1088 raise raised()
1089 except Exception as e:
1090 e.response = curl_res
1091 raise
1092 monkeypatch.setattr(instance, 'request', request)
1093 return instance
1095 monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
1097 with pytest.raises(expected) as exc_info:
1098 rh.send(Request('http://fake'))
1100 assert exc_info.type is expected
1102 def test_response_reader(self, handler):
1103 class FakeResponse:
1104 def __init__(self, raise_error=False):
1105 self.raise_error = raise_error
1106 self.closed = False
1108 def iter_content(self):
1109 yield b'foo'
1110 yield b'bar'
1111 yield b'z'
1112 if self.raise_error:
1113 raise Exception('test')
1115 def close(self):
1116 self.closed = True
1118 from yt_dlp.networking._curlcffi import CurlCFFIResponseReader
1120 res = CurlCFFIResponseReader(FakeResponse())
1121 assert res.readable
1122 assert res.bytes_read == 0
1123 assert res.read(1) == b'f'
1124 assert res.bytes_read == 3
1125 assert res._buffer == b'oo'
1127 assert res.read(2) == b'oo'
1128 assert res.bytes_read == 3
1129 assert res._buffer == b''
1131 assert res.read(2) == b'ba'
1132 assert res.bytes_read == 6
1133 assert res._buffer == b'r'
1135 assert res.read(3) == b'rz'
1136 assert res.bytes_read == 7
1137 assert res._buffer == b''
1138 assert res.closed
1139 assert res._response.closed
1141 # should handle no size param
1142 res2 = CurlCFFIResponseReader(FakeResponse())
1143 assert res2.read() == b'foobarz'
1144 assert res2.bytes_read == 7
1145 assert res2._buffer == b''
1146 assert res2.closed
1148 # should close on an exception
1149 res3 = CurlCFFIResponseReader(FakeResponse(raise_error=True))
1150 with pytest.raises(Exception, match='test'):
1151 res3.read()
1152 assert res3._buffer == b''
1153 assert res3.bytes_read == 7
1154 assert res3.closed
1156 # buffer should be cleared on close
1157 res4 = CurlCFFIResponseReader(FakeResponse())
1158 res4.read(2)
1159 assert res4._buffer == b'o'
1160 res4.close()
1161 assert res4.closed
1162 assert res4._buffer == b''
1165 def run_validation(handler, error, req, **handler_kwargs):
1166 with handler(**handler_kwargs) as rh:
1167 if error:
1168 with pytest.raises(error):
1169 rh.validate(req)
1170 else:
1171 rh.validate(req)
1174 class TestRequestHandlerValidation:
1176 class ValidationRH(RequestHandler):
1177 def _send(self, request):
1178 raise RequestError('test')
1180 class NoCheckRH(ValidationRH):
1181 _SUPPORTED_FEATURES = None
1182 _SUPPORTED_PROXY_SCHEMES = None
1183 _SUPPORTED_URL_SCHEMES = None
1185 def _check_extensions(self, extensions):
1186 extensions.clear()
1188 class HTTPSupportedRH(ValidationRH):
1189 _SUPPORTED_URL_SCHEMES = ('http',)
1191 URL_SCHEME_TESTS = [
1192 # scheme, expected to fail, handler kwargs
1193 ('Urllib', [
1194 ('http', False, {}),
1195 ('https', False, {}),
1196 ('data', False, {}),
1197 ('ftp', False, {}),
1198 ('file', UnsupportedRequest, {}),
1199 ('file', False, {'enable_file_urls': True}),
1201 ('Requests', [
1202 ('http', False, {}),
1203 ('https', False, {}),
1205 ('Websockets', [
1206 ('ws', False, {}),
1207 ('wss', False, {}),
1209 ('CurlCFFI', [
1210 ('http', False, {}),
1211 ('https', False, {}),
1213 (NoCheckRH, [('http', False, {})]),
1214 (ValidationRH, [('http', UnsupportedRequest, {})]),
1217 PROXY_SCHEME_TESTS = [
1218 # proxy scheme, expected to fail
1219 ('Urllib', 'http', [
1220 ('http', False),
1221 ('https', UnsupportedRequest),
1222 ('socks4', False),
1223 ('socks4a', False),
1224 ('socks5', False),
1225 ('socks5h', False),
1226 ('socks', UnsupportedRequest),
1228 ('Requests', 'http', [
1229 ('http', False),
1230 ('https', False),
1231 ('socks4', False),
1232 ('socks4a', False),
1233 ('socks5', False),
1234 ('socks5h', False),
1236 ('CurlCFFI', 'http', [
1237 ('http', False),
1238 ('https', False),
1239 ('socks4', False),
1240 ('socks4a', False),
1241 ('socks5', False),
1242 ('socks5h', False),
1244 ('Websockets', 'ws', [
1245 ('http', UnsupportedRequest),
1246 ('https', UnsupportedRequest),
1247 ('socks4', False),
1248 ('socks4a', False),
1249 ('socks5', False),
1250 ('socks5h', False),
1252 (NoCheckRH, 'http', [('http', False)]),
1253 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1254 (NoCheckRH, 'http', [('http', False)]),
1255 (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
1258 PROXY_KEY_TESTS = [
1259 # proxy key, proxy scheme, expected to fail
1260 ('Urllib', 'http', [
1261 ('all', 'http', False),
1262 ('unrelated', 'http', False),
1264 ('Requests', 'http', [
1265 ('all', 'http', False),
1266 ('unrelated', 'http', False),
1268 ('CurlCFFI', 'http', [
1269 ('all', 'http', False),
1270 ('unrelated', 'http', False),
1272 ('Websockets', 'ws', [
1273 ('all', 'socks5', False),
1274 ('unrelated', 'socks5', False),
1276 (NoCheckRH, 'http', [('all', 'http', False)]),
1277 (HTTPSupportedRH, 'http', [('all', 'http', UnsupportedRequest)]),
1278 (HTTPSupportedRH, 'http', [('no', 'http', UnsupportedRequest)]),
1281 EXTENSION_TESTS = [
1282 ('Urllib', 'http', [
1283 ({'cookiejar': 'notacookiejar'}, AssertionError),
1284 ({'cookiejar': YoutubeDLCookieJar()}, False),
1285 ({'cookiejar': CookieJar()}, AssertionError),
1286 ({'timeout': 1}, False),
1287 ({'timeout': 'notatimeout'}, AssertionError),
1288 ({'unsupported': 'value'}, UnsupportedRequest),
1289 ({'legacy_ssl': False}, False),
1290 ({'legacy_ssl': True}, False),
1291 ({'legacy_ssl': 'notabool'}, AssertionError),
1293 ('Requests', 'http', [
1294 ({'cookiejar': 'notacookiejar'}, AssertionError),
1295 ({'cookiejar': YoutubeDLCookieJar()}, False),
1296 ({'timeout': 1}, False),
1297 ({'timeout': 'notatimeout'}, AssertionError),
1298 ({'unsupported': 'value'}, UnsupportedRequest),
1299 ({'legacy_ssl': False}, False),
1300 ({'legacy_ssl': True}, False),
1301 ({'legacy_ssl': 'notabool'}, AssertionError),
1303 ('CurlCFFI', 'http', [
1304 ({'cookiejar': 'notacookiejar'}, AssertionError),
1305 ({'cookiejar': YoutubeDLCookieJar()}, False),
1306 ({'timeout': 1}, False),
1307 ({'timeout': 'notatimeout'}, AssertionError),
1308 ({'unsupported': 'value'}, UnsupportedRequest),
1309 ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
1310 ({'impersonate': 123}, AssertionError),
1311 ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
1312 ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
1313 ({'impersonate': ImpersonateTarget()}, False),
1314 ({'impersonate': 'chrome'}, AssertionError),
1315 ({'legacy_ssl': False}, False),
1316 ({'legacy_ssl': True}, False),
1317 ({'legacy_ssl': 'notabool'}, AssertionError),
1319 (NoCheckRH, 'http', [
1320 ({'cookiejar': 'notacookiejar'}, False),
1321 ({'somerandom': 'test'}, False), # but any extension is allowed through
1323 ('Websockets', 'ws', [
1324 ({'cookiejar': YoutubeDLCookieJar()}, False),
1325 ({'timeout': 2}, False),
1326 ({'legacy_ssl': False}, False),
1327 ({'legacy_ssl': True}, False),
1328 ({'legacy_ssl': 'notabool'}, AssertionError),
1332 @pytest.mark.parametrize('handler,fail,scheme', [
1333 ('Urllib', False, 'http'),
1334 ('Requests', False, 'http'),
1335 ('CurlCFFI', False, 'http'),
1336 ('Websockets', False, 'ws'),
1337 ], indirect=['handler'])
1338 def test_no_proxy(self, handler, fail, scheme):
1339 run_validation(handler, fail, Request(f'{scheme}://', proxies={'no': '127.0.0.1,github.com'}))
1340 run_validation(handler, fail, Request(f'{scheme}://'), proxies={'no': '127.0.0.1,github.com'})
1342 @pytest.mark.parametrize('handler,scheme', [
1343 ('Urllib', 'http'),
1344 (HTTPSupportedRH, 'http'),
1345 ('Requests', 'http'),
1346 ('CurlCFFI', 'http'),
1347 ('Websockets', 'ws'),
1348 ], indirect=['handler'])
1349 def test_empty_proxy(self, handler, scheme):
1350 run_validation(handler, False, Request(f'{scheme}://', proxies={scheme: None}))
1351 run_validation(handler, False, Request(f'{scheme}://'), proxies={scheme: None})
1353 @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
1354 @pytest.mark.parametrize('handler,scheme', [
1355 ('Urllib', 'http'),
1356 (HTTPSupportedRH, 'http'),
1357 ('Requests', 'http'),
1358 ('CurlCFFI', 'http'),
1359 ('Websockets', 'ws'),
1360 ], indirect=['handler'])
1361 def test_invalid_proxy_url(self, handler, scheme, proxy_url):
1362 run_validation(handler, UnsupportedRequest, Request(f'{scheme}://', proxies={scheme: proxy_url}))
1364 @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
1365 (handler_tests[0], scheme, fail, handler_kwargs)
1366 for handler_tests in URL_SCHEME_TESTS
1367 for scheme, fail, handler_kwargs in handler_tests[1]
1368 ], indirect=['handler'])
1369 def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
1370 run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
1372 @pytest.mark.parametrize('handler,scheme,proxy_key,proxy_scheme,fail', [
1373 (handler_tests[0], handler_tests[1], proxy_key, proxy_scheme, fail)
1374 for handler_tests in PROXY_KEY_TESTS
1375 for proxy_key, proxy_scheme, fail in handler_tests[2]
1376 ], indirect=['handler'])
1377 def test_proxy_key(self, handler, scheme, proxy_key, proxy_scheme, fail):
1378 run_validation(handler, fail, Request(f'{scheme}://', proxies={proxy_key: f'{proxy_scheme}://example.com'}))
1379 run_validation(handler, fail, Request(f'{scheme}://'), proxies={proxy_key: f'{proxy_scheme}://example.com'})
1381 @pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
1382 (handler_tests[0], handler_tests[1], scheme, fail)
1383 for handler_tests in PROXY_SCHEME_TESTS
1384 for scheme, fail in handler_tests[2]
1385 ], indirect=['handler'])
1386 def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
1387 run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
1388 run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
1390 @pytest.mark.parametrize('handler,scheme,extensions,fail', [
1391 (handler_tests[0], handler_tests[1], extensions, fail)
1392 for handler_tests in EXTENSION_TESTS
1393 for extensions, fail in handler_tests[2]
1394 ], indirect=['handler'])
1395 def test_extension(self, handler, scheme, extensions, fail):
1396 run_validation(
1397 handler, fail, Request(f'{scheme}://', extensions=extensions))
1399 def test_invalid_request_type(self):
1400 rh = self.ValidationRH(logger=FakeLogger())
1401 for method in (rh.validate, rh.send):
1402 with pytest.raises(TypeError, match='Expected an instance of Request'):
1403 method('not a request')
1406 class FakeResponse(Response):
1407 def __init__(self, request):
1408 # XXX: we could make request part of standard response interface
1409 self.request = request
1410 super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
1413 class FakeRH(RequestHandler):
1415 def __init__(self, *args, **params):
1416 self.params = params
1417 super().__init__(*args, **params)
1419 def _validate(self, request):
1420 return
1422 def _send(self, request: Request):
1423 if request.url.startswith('ssl://'):
1424 raise SSLError(request.url[len('ssl://'):])
1425 return FakeResponse(request)
1428 class FakeRHYDL(FakeYDL):
1429 def __init__(self, *args, **kwargs):
1430 super().__init__(*args, **kwargs)
1431 self._request_director = self.build_request_director([FakeRH])
1434 class AllUnsupportedRHYDL(FakeYDL):
1436 def __init__(self, *args, **kwargs):
1438 class UnsupportedRH(RequestHandler):
1439 def _send(self, request: Request):
1440 pass
1442 _SUPPORTED_FEATURES = ()
1443 _SUPPORTED_PROXY_SCHEMES = ()
1444 _SUPPORTED_URL_SCHEMES = ()
1446 super().__init__(*args, **kwargs)
1447 self._request_director = self.build_request_director([UnsupportedRH])
1450 class TestRequestDirector:
1452 def test_handler_operations(self):
1453 director = RequestDirector(logger=FakeLogger())
1454 handler = FakeRH(logger=FakeLogger())
1455 director.add_handler(handler)
1456 assert director.handlers.get(FakeRH.RH_KEY) is handler
1458 # Handler should overwrite
1459 handler2 = FakeRH(logger=FakeLogger())
1460 director.add_handler(handler2)
1461 assert director.handlers.get(FakeRH.RH_KEY) is not handler
1462 assert director.handlers.get(FakeRH.RH_KEY) is handler2
1463 assert len(director.handlers) == 1
1465 class AnotherFakeRH(FakeRH):
1466 pass
1467 director.add_handler(AnotherFakeRH(logger=FakeLogger()))
1468 assert len(director.handlers) == 2
1469 assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
1471 director.handlers.pop(FakeRH.RH_KEY, None)
1472 assert director.handlers.get(FakeRH.RH_KEY) is None
1473 assert len(director.handlers) == 1
1475 # RequestErrors should passthrough
1476 with pytest.raises(SSLError):
1477 director.send(Request('ssl://something'))
1479 def test_send(self):
1480 director = RequestDirector(logger=FakeLogger())
1481 with pytest.raises(RequestError):
1482 director.send(Request('any://'))
1483 director.add_handler(FakeRH(logger=FakeLogger()))
1484 assert isinstance(director.send(Request('http://')), FakeResponse)
1486 def test_unsupported_handlers(self):
1487 class SupportedRH(RequestHandler):
1488 _SUPPORTED_URL_SCHEMES = ['http']
1490 def _send(self, request: Request):
1491 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1493 director = RequestDirector(logger=FakeLogger())
1494 director.add_handler(SupportedRH(logger=FakeLogger()))
1495 director.add_handler(FakeRH(logger=FakeLogger()))
1497 # First should take preference
1498 assert director.send(Request('http://')).read() == b'supported'
1499 assert director.send(Request('any://')).read() == b''
1501 director.handlers.pop(FakeRH.RH_KEY)
1502 with pytest.raises(NoSupportingHandlers):
1503 director.send(Request('any://'))
1505 def test_unexpected_error(self):
1506 director = RequestDirector(logger=FakeLogger())
1508 class UnexpectedRH(FakeRH):
1509 def _send(self, request: Request):
1510 raise TypeError('something')
1512 director.add_handler(UnexpectedRH(logger=FakeLogger))
1513 with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
1514 director.send(Request('any://'))
1516 director.handlers.clear()
1517 assert len(director.handlers) == 0
1519 # Should not be fatal
1520 director.add_handler(FakeRH(logger=FakeLogger()))
1521 director.add_handler(UnexpectedRH(logger=FakeLogger))
1522 assert director.send(Request('any://'))
1524 def test_preference(self):
1525 director = RequestDirector(logger=FakeLogger())
1526 director.add_handler(FakeRH(logger=FakeLogger()))
1528 class SomeRH(RequestHandler):
1529 _SUPPORTED_URL_SCHEMES = ['http']
1531 def _send(self, request: Request):
1532 return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
1534 def some_preference(rh, request):
1535 return (0 if not isinstance(rh, SomeRH)
1536 else 100 if 'prefer' in request.headers
1537 else -1)
1539 director.add_handler(SomeRH(logger=FakeLogger()))
1540 director.preferences.add(some_preference)
1542 assert director.send(Request('http://')).read() == b''
1543 assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
1545 def test_close(self, monkeypatch):
1546 director = RequestDirector(logger=FakeLogger())
1547 director.add_handler(FakeRH(logger=FakeLogger()))
1548 called = False
1550 def mock_close(*args, **kwargs):
1551 nonlocal called
1552 called = True
1554 monkeypatch.setattr(director.handlers[FakeRH.RH_KEY], 'close', mock_close)
1555 director.close()
1556 assert called
1559 # XXX: do we want to move this to test_YoutubeDL.py?
1560 class TestYoutubeDLNetworking:
1562 @staticmethod
1563 def build_handler(ydl, handler: RequestHandler = FakeRH):
1564 return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
1566 def test_compat_opener(self):
1567 with FakeYDL() as ydl:
1568 with warnings.catch_warnings():
1569 warnings.simplefilter('ignore', category=DeprecationWarning)
1570 assert isinstance(ydl._opener, urllib.request.OpenerDirector)
1572 @pytest.mark.parametrize('proxy,expected', [
1573 ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
1574 ('', {'all': '__noproxy__'}),
1575 (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}), # env, set https
1577 def test_proxy(self, proxy, expected, monkeypatch):
1578 monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
1579 with FakeYDL({'proxy': proxy}) as ydl:
1580 assert ydl.proxies == expected
1582 def test_compat_request(self):
1583 with FakeRHYDL() as ydl:
1584 assert ydl.urlopen('test://')
1585 urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
1586 urllib_req.add_unredirected_header('Cookie', 'bob=bob')
1587 urllib_req.timeout = 2
1588 with warnings.catch_warnings():
1589 warnings.simplefilter('ignore', category=DeprecationWarning)
1590 req = ydl.urlopen(urllib_req).request
1591 assert req.url == urllib_req.get_full_url()
1592 assert req.data == urllib_req.data
1593 assert req.method == urllib_req.get_method()
1594 assert 'X-Test' in req.headers
1595 assert 'Cookie' in req.headers
1596 assert req.extensions.get('timeout') == 2
1598 with pytest.raises(AssertionError):
1599 ydl.urlopen(None)
1601 def test_extract_basic_auth(self):
1602 with FakeRHYDL() as ydl:
1603 res = ydl.urlopen(Request('http://user:pass@foo.bar'))
1604 assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
1606 def test_sanitize_url(self):
1607 with FakeRHYDL() as ydl:
1608 res = ydl.urlopen(Request('httpss://foo.bar'))
1609 assert res.request.url == 'https://foo.bar'
1611 def test_file_urls_error(self):
1612 # use urllib handler
1613 with FakeYDL() as ydl:
1614 with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
1615 ydl.urlopen('file://')
1617 @pytest.mark.parametrize('scheme', (['ws', 'wss']))
1618 def test_websocket_unavailable_error(self, scheme):
1619 with AllUnsupportedRHYDL() as ydl:
1620 with pytest.raises(RequestError, match=r'This request requires WebSocket support'):
1621 ydl.urlopen(f'{scheme}://')
1623 def test_legacy_server_connect_error(self):
1624 with FakeRHYDL() as ydl:
1625 for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
1626 with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
1627 ydl.urlopen(f'ssl://{error}')
1629 with pytest.raises(SSLError, match='testerror'):
1630 ydl.urlopen('ssl://testerror')
1632 def test_unsupported_impersonate_target(self):
1633 class FakeImpersonationRHYDL(FakeYDL):
1634 def __init__(self, *args, **kwargs):
1635 class HTTPRH(RequestHandler):
1636 def _send(self, request: Request):
1637 pass
1638 _SUPPORTED_URL_SCHEMES = ('http',)
1639 _SUPPORTED_PROXY_SCHEMES = None
1641 super().__init__(*args, **kwargs)
1642 self._request_director = self.build_request_director([HTTPRH])
1644 with FakeImpersonationRHYDL() as ydl:
1645 with pytest.raises(
1646 RequestError,
1647 match=r'Impersonate target "test" is not available',
1649 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1651 def test_unsupported_impersonate_extension(self):
1652 class FakeHTTPRHYDL(FakeYDL):
1653 def __init__(self, *args, **kwargs):
1654 class IRH(ImpersonateRequestHandler):
1655 def _send(self, request: Request):
1656 pass
1658 _SUPPORTED_URL_SCHEMES = ('http',)
1659 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
1660 _SUPPORTED_PROXY_SCHEMES = None
1662 super().__init__(*args, **kwargs)
1663 self._request_director = self.build_request_director([IRH])
1665 with FakeHTTPRHYDL() as ydl:
1666 with pytest.raises(
1667 RequestError,
1668 match=r'Impersonate target "test" is not available',
1670 ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
1672 def test_raise_impersonate_error(self):
1673 with pytest.raises(
1674 YoutubeDLError,
1675 match=r'Impersonate target "test" is not available',
1677 FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
1679 def test_pass_impersonate_param(self, monkeypatch):
1681 class IRH(ImpersonateRequestHandler):
1682 def _send(self, request: Request):
1683 pass
1685 _SUPPORTED_URL_SCHEMES = ('http',)
1686 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
1688 # Bypass the check on initialize
1689 brh = FakeYDL.build_request_director
1690 monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
1692 with FakeYDL({
1693 'impersonate': ImpersonateTarget('abc', None, None, None),
1694 }) as ydl:
1695 rh = self.build_handler(ydl, IRH)
1696 assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
1698 def test_get_impersonate_targets(self):
1699 handlers = []
1700 for target_client in ('abc', 'xyz', 'asd'):
1701 class TestRH(ImpersonateRequestHandler):
1702 def _send(self, request: Request):
1703 pass
1704 _SUPPORTED_URL_SCHEMES = ('http',)
1705 _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client): 'test'}
1706 RH_KEY = target_client
1707 RH_NAME = target_client
1708 handlers.append(TestRH)
1710 with FakeYDL() as ydl:
1711 ydl._request_director = ydl.build_request_director(handlers)
1712 assert set(ydl._get_available_impersonate_targets()) == {
1713 (ImpersonateTarget('xyz'), 'xyz'),
1714 (ImpersonateTarget('abc'), 'abc'),
1715 (ImpersonateTarget('asd'), 'asd'),
1717 assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
1718 assert ydl._impersonate_target_available(ImpersonateTarget())
1719 assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
1721 @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
1722 ('http', '__noproxy__', None),
1723 ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
1724 ('https', 'example.com', 'http://example.com'),
1725 ('https', '//example.com', 'http://example.com'),
1726 ('https', 'socks5://example.com', 'socks5h://example.com'),
1727 ('http', 'socks://example.com', 'socks4://example.com'),
1728 ('http', 'socks4://example.com', 'socks4://example.com'),
1729 ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
1731 def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
1732 # proxies should be cleaned in urlopen()
1733 with FakeRHYDL() as ydl:
1734 req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
1735 assert req.proxies[proxy_key] == expected
1737 # and should also be cleaned when building the handler
1738 monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
1739 with FakeYDL() as ydl:
1740 rh = self.build_handler(ydl)
1741 assert rh.proxies[proxy_key] == expected
1743 def test_clean_proxy_header(self):
1744 with FakeRHYDL() as ydl:
1745 req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
1746 assert 'ytdl-request-proxy' not in req.headers
1747 assert req.proxies == {'all': 'http://foo.bar'}
1749 with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
1750 rh = self.build_handler(ydl)
1751 assert 'ytdl-request-proxy' not in rh.headers
1752 assert rh.proxies == {'all': 'http://foo.bar'}
1754 def test_clean_header(self):
1755 with FakeRHYDL() as ydl:
1756 res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
1757 assert 'Youtubedl-no-compression' not in res.request.headers
1758 assert res.request.headers.get('Accept-Encoding') == 'identity'
1760 with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
1761 rh = self.build_handler(ydl)
1762 assert 'Youtubedl-no-compression' not in rh.headers
1763 assert rh.headers.get('Accept-Encoding') == 'identity'
1765 with FakeYDL({'http_headers': {'Ytdl-socks-proxy': 'socks://localhost:1080'}}) as ydl:
1766 rh = self.build_handler(ydl)
1767 assert 'Ytdl-socks-proxy' not in rh.headers
1769 def test_build_handler_params(self):
1770 with FakeYDL({
1771 'http_headers': {'test': 'testtest'},
1772 'socket_timeout': 2,
1773 'proxy': 'http://127.0.0.1:8080',
1774 'source_address': '127.0.0.45',
1775 'debug_printtraffic': True,
1776 'compat_opts': ['no-certifi'],
1777 'nocheckcertificate': True,
1778 'legacyserverconnect': True,
1779 }) as ydl:
1780 rh = self.build_handler(ydl)
1781 assert rh.headers.get('test') == 'testtest'
1782 assert 'Accept' in rh.headers # ensure std_headers are still there
1783 assert rh.timeout == 2
1784 assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
1785 assert rh.source_address == '127.0.0.45'
1786 assert rh.verbose is True
1787 assert rh.prefer_system_certs is True
1788 assert rh.verify is False
1789 assert rh.legacy_ssl_support is True
1791 @pytest.mark.parametrize('ydl_params', [
1792 {'client_certificate': 'fakecert.crt'},
1793 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
1794 {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1795 {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
1797 def test_client_certificate(self, ydl_params):
1798 with FakeYDL(ydl_params) as ydl:
1799 rh = self.build_handler(ydl)
1800 assert rh._client_cert == ydl_params # XXX: Too bound to implementation
1802 def test_urllib_file_urls(self):
1803 with FakeYDL({'enable_file_urls': False}) as ydl:
1804 rh = self.build_handler(ydl, UrllibRH)
1805 assert rh.enable_file_urls is False
1807 with FakeYDL({'enable_file_urls': True}) as ydl:
1808 rh = self.build_handler(ydl, UrllibRH)
1809 assert rh.enable_file_urls is True
1811 def test_compat_opt_prefer_urllib(self):
1812 # This assumes urllib only has a preference when this compat opt is given
1813 with FakeYDL({'compat_opts': ['prefer-legacy-http-handler']}) as ydl:
1814 director = ydl.build_request_director([UrllibRH])
1815 assert len(director.preferences) == 1
1816 assert director.preferences.pop()(UrllibRH, None)
1819 class TestRequest:
1821 def test_query(self):
1822 req = Request('http://example.com?q=something', query={'v': 'xyz'})
1823 assert req.url == 'http://example.com?q=something&v=xyz'
1825 req.update(query={'v': '123'})
1826 assert req.url == 'http://example.com?q=something&v=123'
1827 req.update(url='http://example.com', query={'v': 'xyz'})
1828 assert req.url == 'http://example.com?v=xyz'
1830 def test_method(self):
1831 req = Request('http://example.com')
1832 assert req.method == 'GET'
1833 req.data = b'test'
1834 assert req.method == 'POST'
1835 req.data = None
1836 assert req.method == 'GET'
1837 req.data = b'test2'
1838 req.method = 'PUT'
1839 assert req.method == 'PUT'
1840 req.data = None
1841 assert req.method == 'PUT'
1842 with pytest.raises(TypeError):
1843 req.method = 1
1845 def test_request_helpers(self):
1846 assert HEADRequest('http://example.com').method == 'HEAD'
1847 assert PUTRequest('http://example.com').method == 'PUT'
1849 def test_headers(self):
1850 req = Request('http://example.com', headers={'tesT': 'test'})
1851 assert req.headers == HTTPHeaderDict({'test': 'test'})
1852 req.update(headers={'teSt2': 'test2'})
1853 assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
1855 req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
1856 assert req.headers == HTTPHeaderDict({'test': 'test'})
1857 assert req.headers is new_headers
1859 # test converts dict to case insensitive dict
1860 req.headers = new_headers = {'test2': 'test2'}
1861 assert isinstance(req.headers, HTTPHeaderDict)
1862 assert req.headers is not new_headers
1864 with pytest.raises(TypeError):
1865 req.headers = None
1867 def test_data_type(self):
1868 req = Request('http://example.com')
1869 assert req.data is None
1870 # test bytes is allowed
1871 req.data = b'test'
1872 assert req.data == b'test'
1873 # test iterable of bytes is allowed
1874 i = [b'test', b'test2']
1875 req.data = i
1876 assert req.data == i
1878 # test file-like object is allowed
1879 f = io.BytesIO(b'test')
1880 req.data = f
1881 assert req.data == f
1883 # common mistake: test str not allowed
1884 with pytest.raises(TypeError):
1885 req.data = 'test'
1886 assert req.data != 'test'
1888 # common mistake: test dict is not allowed
1889 with pytest.raises(TypeError):
1890 req.data = {'test': 'test'}
1891 assert req.data != {'test': 'test'}
1893 def test_content_length_header(self):
1894 req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
1895 assert req.headers.get('Content-Length') == '0'
1897 req.data = b'test'
1898 assert 'Content-Length' not in req.headers
1900 req = Request('http://example.com', headers={'Content-Length': '10'})
1901 assert 'Content-Length' not in req.headers
1903 def test_content_type_header(self):
1904 req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
1905 assert req.headers.get('Content-Type') == 'test'
1906 req.data = b'test2'
1907 assert req.headers.get('Content-Type') == 'test'
1908 req.data = None
1909 assert 'Content-Type' not in req.headers
1910 req.data = b'test3'
1911 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1913 def test_update_req(self):
1914 req = Request('http://example.com')
1915 assert req.data is None
1916 assert req.method == 'GET'
1917 assert 'Content-Type' not in req.headers
1918 # Test that zero-byte payloads will be sent
1919 req.update(data=b'')
1920 assert req.data == b''
1921 assert req.method == 'POST'
1922 assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
1924 def test_proxies(self):
1925 req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
1926 assert req.proxies == {'http': 'http://127.0.0.1:8080'}
1928 def test_extensions(self):
1929 req = Request(url='http://example.com', extensions={'timeout': 2})
1930 assert req.extensions == {'timeout': 2}
1932 def test_copy(self):
1933 req = Request(
1934 url='http://example.com',
1935 extensions={'cookiejar': CookieJar()},
1936 headers={'Accept-Encoding': 'br'},
1937 proxies={'http': 'http://127.0.0.1'},
1938 data=[b'123'],
1940 req_copy = req.copy()
1941 assert req_copy is not req
1942 assert req_copy.url == req.url
1943 assert req_copy.headers == req.headers
1944 assert req_copy.headers is not req.headers
1945 assert req_copy.proxies == req.proxies
1946 assert req_copy.proxies is not req.proxies
1948 # Data is not able to be copied
1949 assert req_copy.data == req.data
1950 assert req_copy.data is req.data
1952 # Shallow copy extensions
1953 assert req_copy.extensions is not req.extensions
1954 assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
1956 # Subclasses are copied by default
1957 class AnotherRequest(Request):
1958 pass
1960 req = AnotherRequest(url='http://127.0.0.1')
1961 assert isinstance(req.copy(), AnotherRequest)
1963 def test_url(self):
1964 req = Request(url='https://фtest.example.com/ some spaceв?ä=c')
1965 assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
1967 assert Request(url='//example.com').url == 'http://example.com'
1969 with pytest.raises(TypeError):
1970 Request(url='https://').url = None
1973 class TestResponse:
1975 @pytest.mark.parametrize('reason,status,expected', [
1976 ('custom', 200, 'custom'),
1977 (None, 404, 'Not Found'), # fallback status
1978 ('', 403, 'Forbidden'),
1979 (None, 999, None),
1981 def test_reason(self, reason, status, expected):
1982 res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
1983 assert res.reason == expected
1985 def test_headers(self):
1986 headers = Message()
1987 headers.add_header('Test', 'test')
1988 headers.add_header('Test', 'test2')
1989 headers.add_header('content-encoding', 'br')
1990 res = Response(io.BytesIO(b''), headers=headers, url='test://')
1991 assert res.headers.get_all('test') == ['test', 'test2']
1992 assert 'Content-Encoding' in res.headers
1994 def test_get_header(self):
1995 headers = Message()
1996 headers.add_header('Set-Cookie', 'cookie1')
1997 headers.add_header('Set-cookie', 'cookie2')
1998 headers.add_header('Test', 'test')
1999 headers.add_header('Test', 'test2')
2000 res = Response(io.BytesIO(b''), headers=headers, url='test://')
2001 assert res.get_header('test') == 'test, test2'
2002 assert res.get_header('set-Cookie') == 'cookie1'
2003 assert res.get_header('notexist', 'default') == 'default'
2005 def test_compat(self):
2006 res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
2007 with warnings.catch_warnings():
2008 warnings.simplefilter('ignore', category=DeprecationWarning)
2009 assert res.code == res.getcode() == res.status
2010 assert res.geturl() == res.url
2011 assert res.info() is res.headers
2012 assert res.getheader('test') == res.get_header('test')
2015 class TestImpersonateTarget:
2016 @pytest.mark.parametrize('target_str,expected', [
2017 ('abc', ImpersonateTarget('abc', None, None, None)),
2018 ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
2019 ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
2020 ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
2021 ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
2022 ('abc:', ImpersonateTarget('abc', None, None, None)),
2023 ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
2024 (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
2025 (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
2026 (':', ImpersonateTarget(None, None, None, None)),
2027 ('', ImpersonateTarget(None, None, None, None)),
2029 def test_target_from_str(self, target_str, expected):
2030 assert ImpersonateTarget.from_str(target_str) == expected
2032 @pytest.mark.parametrize('target_str', [
2033 '-120', ':-12.0', '-12:-12', '-:-',
2034 '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:',
2036 def test_target_from_invalid_str(self, target_str):
2037 with pytest.raises(ValueError):
2038 ImpersonateTarget.from_str(target_str)
2040 @pytest.mark.parametrize('target,expected', [
2041 (ImpersonateTarget('abc', None, None, None), 'abc'),
2042 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
2043 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
2044 (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
2045 (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
2046 (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
2047 (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
2048 (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
2049 (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
2050 (ImpersonateTarget('abc'), 'abc'),
2051 (ImpersonateTarget(None, None, None, None), ''),
2053 def test_str(self, target, expected):
2054 assert str(target) == expected
2056 @pytest.mark.parametrize('args', [
2057 ('abc', None, None, '5'),
2058 ('abc', '120', None, '5'),
2059 (None, '120', None, None),
2060 (None, '120', None, '5'),
2061 (None, None, None, '5'),
2062 (None, '120', 'xyz', '5'),
2064 def test_invalid_impersonate_target(self, args):
2065 with pytest.raises(ValueError):
2066 ImpersonateTarget(*args)
2068 @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
2069 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
2070 (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
2071 (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
2072 (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
2073 (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
2074 (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
2075 (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
2076 (ImpersonateTarget(), ImpersonateTarget(), True, True),
2078 def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
2079 assert (target1 in target2) is is_in
2080 assert (target1 == target2) is is_eq