[ie/mlbtv] Fix extractor (#10515)
[yt-dlp.git] / yt_dlp / networking / _curlcffi.py
blobe8a67b7347d4dd7b68ee1839bbbf15457112559e
1 from __future__ import annotations
3 import io
4 import math
5 import re
6 import urllib.parse
8 from ._helper import InstanceStoreMixin, select_proxy
9 from .common import (
10 Features,
11 Request,
12 Response,
13 register_preference,
14 register_rh,
16 from .exceptions import (
17 CertificateVerifyError,
18 HTTPError,
19 IncompleteRead,
20 ProxyError,
21 SSLError,
22 TransportError,
24 from .impersonate import ImpersonateRequestHandler, ImpersonateTarget
25 from ..dependencies import curl_cffi, certifi
26 from ..utils import int_or_none
28 if curl_cffi is None:
29 raise ImportError('curl_cffi is not installed')
32 curl_cffi_version = tuple(map(int, re.split(r'[^\d]+', curl_cffi.__version__)[:3]))
34 if curl_cffi_version != (0, 5, 10) and not ((0, 7, 0) <= curl_cffi_version < (0, 8, 0)):
35 curl_cffi._yt_dlp__version = f'{curl_cffi.__version__} (unsupported)'
36 raise ImportError('Only curl_cffi versions 0.5.10, 0.7.X are supported')
38 import curl_cffi.requests
39 from curl_cffi.const import CurlECode, CurlOpt
42 class CurlCFFIResponseReader(io.IOBase):
43 def __init__(self, response: curl_cffi.requests.Response):
44 self._response = response
45 self._iterator = response.iter_content()
46 self._buffer = b''
47 self.bytes_read = 0
49 def readable(self):
50 return True
52 def read(self, size=None):
53 exception_raised = True
54 try:
55 while self._iterator and (size is None or len(self._buffer) < size):
56 chunk = next(self._iterator, None)
57 if chunk is None:
58 self._iterator = None
59 break
60 self._buffer += chunk
61 self.bytes_read += len(chunk)
63 if size is None:
64 size = len(self._buffer)
65 data = self._buffer[:size]
66 self._buffer = self._buffer[size:]
68 # "free" the curl instance if the response is fully read.
69 # curl_cffi doesn't do this automatically and only allows one open response per thread
70 if not self._iterator and not self._buffer:
71 self.close()
72 exception_raised = False
73 return data
74 finally:
75 if exception_raised:
76 self.close()
78 def close(self):
79 if not self.closed:
80 self._response.close()
81 self._buffer = b''
82 super().close()
85 class CurlCFFIResponseAdapter(Response):
86 fp: CurlCFFIResponseReader
88 def __init__(self, response: curl_cffi.requests.Response):
89 super().__init__(
90 fp=CurlCFFIResponseReader(response),
91 headers=response.headers,
92 url=response.url,
93 status=response.status_code)
95 def read(self, amt=None):
96 try:
97 return self.fp.read(amt)
98 except curl_cffi.requests.errors.RequestsError as e:
99 if e.code == CurlECode.PARTIAL_FILE:
100 content_length = int_or_none(e.response.headers.get('Content-Length'))
101 raise IncompleteRead(
102 partial=self.fp.bytes_read,
103 expected=content_length - self.fp.bytes_read if content_length is not None else None,
104 cause=e) from e
105 raise TransportError(cause=e) from e
108 @register_rh
109 class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
110 RH_NAME = 'curl_cffi'
111 _SUPPORTED_URL_SCHEMES = ('http', 'https')
112 _SUPPORTED_FEATURES = (Features.NO_PROXY, Features.ALL_PROXY)
113 _SUPPORTED_PROXY_SCHEMES = ('http', 'https', 'socks4', 'socks4a', 'socks5', 'socks5h')
114 _SUPPORTED_IMPERSONATE_TARGET_MAP = {
115 **({
116 ImpersonateTarget('chrome', '124', 'macos', '14'): curl_cffi.requests.BrowserType.chrome124,
117 ImpersonateTarget('chrome', '123', 'macos', '14'): curl_cffi.requests.BrowserType.chrome123,
118 ImpersonateTarget('chrome', '120', 'macos', '14'): curl_cffi.requests.BrowserType.chrome120,
119 ImpersonateTarget('chrome', '119', 'macos', '14'): curl_cffi.requests.BrowserType.chrome119,
120 ImpersonateTarget('chrome', '116', 'windows', '10'): curl_cffi.requests.BrowserType.chrome116,
121 } if curl_cffi_version >= (0, 7, 0) else {}),
122 ImpersonateTarget('chrome', '110', 'windows', '10'): curl_cffi.requests.BrowserType.chrome110,
123 ImpersonateTarget('chrome', '107', 'windows', '10'): curl_cffi.requests.BrowserType.chrome107,
124 ImpersonateTarget('chrome', '104', 'windows', '10'): curl_cffi.requests.BrowserType.chrome104,
125 ImpersonateTarget('chrome', '101', 'windows', '10'): curl_cffi.requests.BrowserType.chrome101,
126 ImpersonateTarget('chrome', '100', 'windows', '10'): curl_cffi.requests.BrowserType.chrome100,
127 ImpersonateTarget('chrome', '99', 'windows', '10'): curl_cffi.requests.BrowserType.chrome99,
128 ImpersonateTarget('edge', '101', 'windows', '10'): curl_cffi.requests.BrowserType.edge101,
129 ImpersonateTarget('edge', '99', 'windows', '10'): curl_cffi.requests.BrowserType.edge99,
130 **({
131 ImpersonateTarget('safari', '17.0', 'macos', '14'): curl_cffi.requests.BrowserType.safari17_0,
132 } if curl_cffi_version >= (0, 7, 0) else {}),
133 ImpersonateTarget('safari', '15.5', 'macos', '12'): curl_cffi.requests.BrowserType.safari15_5,
134 ImpersonateTarget('safari', '15.3', 'macos', '11'): curl_cffi.requests.BrowserType.safari15_3,
135 ImpersonateTarget('chrome', '99', 'android', '12'): curl_cffi.requests.BrowserType.chrome99_android,
136 **({
137 ImpersonateTarget('safari', '17.2', 'ios', '17.2'): curl_cffi.requests.BrowserType.safari17_2_ios,
138 } if curl_cffi_version >= (0, 7, 0) else {}),
141 def _create_instance(self, cookiejar=None):
142 return curl_cffi.requests.Session(cookies=cookiejar)
144 def _check_extensions(self, extensions):
145 super()._check_extensions(extensions)
146 extensions.pop('impersonate', None)
147 extensions.pop('cookiejar', None)
148 extensions.pop('timeout', None)
149 # CurlCFFIRH ignores legacy ssl options currently.
150 # Impersonation generally uses a looser SSL configuration than urllib/requests.
151 extensions.pop('legacy_ssl', None)
153 def send(self, request: Request) -> Response:
154 target = self._get_request_target(request)
155 try:
156 response = super().send(request)
157 except HTTPError as e:
158 e.response.extensions['impersonate'] = target
159 raise
160 response.extensions['impersonate'] = target
161 return response
163 def _send(self, request: Request):
164 max_redirects_exceeded = False
165 session: curl_cffi.requests.Session = self._get_instance(
166 cookiejar=self._get_cookiejar(request) if 'cookie' not in request.headers else None)
168 if self.verbose:
169 session.curl.setopt(CurlOpt.VERBOSE, 1)
171 proxies = self._get_proxies(request)
172 if 'no' in proxies:
173 session.curl.setopt(CurlOpt.NOPROXY, proxies['no'])
174 proxies.pop('no', None)
176 # curl doesn't support per protocol proxies, so we select the one that matches the request protocol
177 proxy = select_proxy(request.url, proxies=proxies)
178 if proxy:
179 session.curl.setopt(CurlOpt.PROXY, proxy)
180 scheme = urllib.parse.urlparse(request.url).scheme.lower()
181 if scheme != 'http':
182 # Enable HTTP CONNECT for HTTPS urls.
183 # Don't use CONNECT for http for compatibility with urllib behaviour.
184 # See: https://curl.se/libcurl/c/CURLOPT_HTTPPROXYTUNNEL.html
185 session.curl.setopt(CurlOpt.HTTPPROXYTUNNEL, 1)
187 # curl_cffi does not currently set these for proxies
188 session.curl.setopt(CurlOpt.PROXY_CAINFO, certifi.where())
190 if not self.verify:
191 session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYPEER, 0)
192 session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYHOST, 0)
194 headers = self._get_impersonate_headers(request)
196 if self._client_cert:
197 session.curl.setopt(CurlOpt.SSLCERT, self._client_cert['client_certificate'])
198 client_certificate_key = self._client_cert.get('client_certificate_key')
199 client_certificate_password = self._client_cert.get('client_certificate_password')
200 if client_certificate_key:
201 session.curl.setopt(CurlOpt.SSLKEY, client_certificate_key)
202 if client_certificate_password:
203 session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password)
205 timeout = self._calculate_timeout(request)
207 # set CURLOPT_LOW_SPEED_LIMIT and CURLOPT_LOW_SPEED_TIME to act as a read timeout. [1]
208 # This is required only for 0.5.10 [2]
209 # Note: CURLOPT_LOW_SPEED_TIME is in seconds, so we need to round up to the nearest second. [3]
210 # [1] https://unix.stackexchange.com/a/305311
211 # [2] https://github.com/yifeikong/curl_cffi/issues/156
212 # [3] https://curl.se/libcurl/c/CURLOPT_LOW_SPEED_TIME.html
213 session.curl.setopt(CurlOpt.LOW_SPEED_LIMIT, 1) # 1 byte per second
214 session.curl.setopt(CurlOpt.LOW_SPEED_TIME, math.ceil(timeout))
216 try:
217 curl_response = session.request(
218 method=request.method,
219 url=request.url,
220 headers=headers,
221 data=request.data,
222 verify=self.verify,
223 max_redirects=5,
224 timeout=(timeout, timeout),
225 impersonate=self._SUPPORTED_IMPERSONATE_TARGET_MAP.get(
226 self._get_request_target(request)),
227 interface=self.source_address,
228 stream=True,
230 except curl_cffi.requests.errors.RequestsError as e:
231 if e.code == CurlECode.PEER_FAILED_VERIFICATION:
232 raise CertificateVerifyError(cause=e) from e
234 elif e.code == CurlECode.SSL_CONNECT_ERROR:
235 raise SSLError(cause=e) from e
237 elif e.code == CurlECode.TOO_MANY_REDIRECTS:
238 max_redirects_exceeded = True
239 curl_response = e.response
241 elif (
242 e.code == CurlECode.PROXY
243 or (e.code == CurlECode.RECV_ERROR and 'CONNECT' in str(e))
245 raise ProxyError(cause=e) from e
246 else:
247 raise TransportError(cause=e) from e
249 response = CurlCFFIResponseAdapter(curl_response)
251 if not 200 <= response.status < 300:
252 raise HTTPError(response, redirect_loop=max_redirects_exceeded)
254 return response
257 @register_preference(CurlCFFIRH)
258 def curl_cffi_preference(rh, request):
259 return -100