[ie/tv5monde] Support browser impersonation (#10417)
[yt-dlp3.git] / yt_dlp / update.py
blob8c6790d6105b2cba273d5097ad05855a70a15d84
1 from __future__ import annotations
3 import atexit
4 import contextlib
5 import hashlib
6 import json
7 import os
8 import platform
9 import re
10 import subprocess
11 import sys
12 from dataclasses import dataclass
13 from zipimport import zipimporter
15 from .compat import functools # isort: split
16 from .compat import compat_realpath
17 from .networking import Request
18 from .networking.exceptions import HTTPError, network_exceptions
19 from .utils import (
20 NO_DEFAULT,
21 Popen,
22 deprecation_warning,
23 format_field,
24 remove_end,
25 shell_quote,
26 system_identifier,
27 version_tuple,
29 from .version import (
30 CHANNEL,
31 ORIGIN,
32 RELEASE_GIT_HEAD,
33 UPDATE_HINT,
34 VARIANT,
35 __version__,
38 UPDATE_SOURCES = {
39 'stable': 'yt-dlp/yt-dlp',
40 'nightly': 'yt-dlp/yt-dlp-nightly-builds',
41 'master': 'yt-dlp/yt-dlp-master-builds',
43 REPOSITORY = UPDATE_SOURCES['stable']
44 _INVERSE_UPDATE_SOURCES = {value: key for key, value in UPDATE_SOURCES.items()}
46 _VERSION_RE = re.compile(r'(\d+\.)*\d+')
47 _HASH_PATTERN = r'[\da-f]{40}'
48 _COMMIT_RE = re.compile(rf'Generated from: https://(?:[^/?#]+/){{3}}commit/(?P<hash>{_HASH_PATTERN})')
50 API_BASE_URL = 'https://api.github.com/repos'
52 # Backwards compatibility variables for the current channel
53 API_URL = f'{API_BASE_URL}/{REPOSITORY}/releases'
56 @functools.cache
57 def _get_variant_and_executable_path():
58 """@returns (variant, executable_path)"""
59 if getattr(sys, 'frozen', False):
60 path = sys.executable
61 if not hasattr(sys, '_MEIPASS'):
62 return 'py2exe', path
63 elif sys._MEIPASS == os.path.dirname(path):
64 return f'{sys.platform}_dir', path
65 elif sys.platform == 'darwin':
66 machine = '_legacy' if version_tuple(platform.mac_ver()[0]) < (10, 15) else ''
67 else:
68 machine = f'_{platform.machine().lower()}'
69 # Ref: https://en.wikipedia.org/wiki/Uname#Examples
70 if machine[1:] in ('x86', 'x86_64', 'amd64', 'i386', 'i686'):
71 machine = '_x86' if platform.architecture()[0][:2] == '32' else ''
72 # sys.executable returns a /tmp/ path for staticx builds (linux_static)
73 # Ref: https://staticx.readthedocs.io/en/latest/usage.html#run-time-information
74 if static_exe_path := os.getenv('STATICX_PROG_PATH'):
75 path = static_exe_path
76 return f'{remove_end(sys.platform, "32")}{machine}_exe', path
78 path = os.path.dirname(__file__)
79 if isinstance(__loader__, zipimporter):
80 return 'zip', os.path.join(path, '..')
81 elif (os.path.basename(sys.argv[0]) in ('__main__.py', '-m')
82 and os.path.exists(os.path.join(path, '../.git/HEAD'))):
83 return 'source', path
84 return 'unknown', path
87 def detect_variant():
88 return VARIANT or _get_variant_and_executable_path()[0]
91 @functools.cache
92 def current_git_head():
93 if detect_variant() != 'source':
94 return
95 with contextlib.suppress(Exception):
96 stdout, _, _ = Popen.run(
97 ['git', 'rev-parse', '--short', 'HEAD'],
98 text=True, cwd=os.path.dirname(os.path.abspath(__file__)),
99 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
100 if re.fullmatch('[0-9a-f]+', stdout.strip()):
101 return stdout.strip()
104 _FILE_SUFFIXES = {
105 'zip': '',
106 'py2exe': '_min.exe',
107 'win_exe': '.exe',
108 'win_x86_exe': '_x86.exe',
109 'darwin_exe': '_macos',
110 'darwin_legacy_exe': '_macos_legacy',
111 'linux_exe': '_linux',
112 'linux_aarch64_exe': '_linux_aarch64',
113 'linux_armv7l_exe': '_linux_armv7l',
116 _NON_UPDATEABLE_REASONS = {
117 **{variant: None for variant in _FILE_SUFFIXES}, # Updatable
118 **{variant: f'Auto-update is not supported for unpackaged {name} executable; Re-download the latest release'
119 for variant, name in {'win32_dir': 'Windows', 'darwin_dir': 'MacOS', 'linux_dir': 'Linux'}.items()},
120 'source': 'You cannot update when running from source code; Use git to pull the latest changes',
121 'unknown': 'You installed yt-dlp from a manual build or with a package manager; Use that to update',
122 'other': 'You are using an unofficial build of yt-dlp; Build the executable again',
126 def is_non_updateable():
127 if UPDATE_HINT:
128 return UPDATE_HINT
129 return _NON_UPDATEABLE_REASONS.get(
130 detect_variant(), _NON_UPDATEABLE_REASONS['unknown' if VARIANT else 'other'])
133 def _get_binary_name():
134 return format_field(_FILE_SUFFIXES, detect_variant(), template='yt-dlp%s', ignore=None, default=None)
137 def _get_system_deprecation():
138 MIN_SUPPORTED, MIN_RECOMMENDED = (3, 8), (3, 8)
140 if sys.version_info > MIN_RECOMMENDED:
141 return None
143 major, minor = sys.version_info[:2]
144 if sys.version_info < MIN_SUPPORTED:
145 msg = f'Python version {major}.{minor} is no longer supported'
146 else:
147 msg = (f'Support for Python version {major}.{minor} has been deprecated. '
148 '\nYou may stop receiving updates on this version at any time')
150 major, minor = MIN_RECOMMENDED
151 return f'{msg}! Please update to Python {major}.{minor} or above'
154 def _sha256_file(path):
155 h = hashlib.sha256()
156 mv = memoryview(bytearray(128 * 1024))
157 with open(os.path.realpath(path), 'rb', buffering=0) as f:
158 for n in iter(lambda: f.readinto(mv), 0):
159 h.update(mv[:n])
160 return h.hexdigest()
163 def _make_label(origin, tag, version=None):
164 if '/' in origin:
165 channel = _INVERSE_UPDATE_SOURCES.get(origin, origin)
166 else:
167 channel = origin
168 label = f'{channel}@{tag}'
169 if version and version != tag:
170 label += f' build {version}'
171 if channel != origin:
172 label += f' from {origin}'
173 return label
176 @dataclass
177 class UpdateInfo:
179 Update target information
181 Can be created by `query_update()` or manually.
183 Attributes:
184 tag The release tag that will be updated to. If from query_update,
185 the value is after API resolution and update spec processing.
186 The only property that is required.
187 version The actual numeric version (if available) of the binary to be updated to,
188 after API resolution and update spec processing. (default: None)
189 requested_version Numeric version of the binary being requested (if available),
190 after API resolution only. (default: None)
191 commit Commit hash (if available) of the binary to be updated to,
192 after API resolution and update spec processing. (default: None)
193 This value will only match the RELEASE_GIT_HEAD of prerelease builds.
194 binary_name Filename of the binary to be updated to. (default: current binary name)
195 checksum Expected checksum (if available) of the binary to be
196 updated to. (default: None)
198 tag: str
199 version: str | None = None
200 requested_version: str | None = None
201 commit: str | None = None
203 binary_name: str | None = _get_binary_name() # noqa: RUF009: Always returns the same value
204 checksum: str | None = None
206 _has_update = True
209 class Updater:
210 # XXX: use class variables to simplify testing
211 _channel = CHANNEL
212 _origin = ORIGIN
213 _update_sources = UPDATE_SOURCES
215 def __init__(self, ydl, target: str | None = None):
216 self.ydl = ydl
217 # For backwards compat, target needs to be treated as if it could be None
218 self.requested_channel, sep, self.requested_tag = (target or self._channel).rpartition('@')
219 # Check if requested_tag is actually the requested repo/channel
220 if not sep and ('/' in self.requested_tag or self.requested_tag in self._update_sources):
221 self.requested_channel = self.requested_tag
222 self.requested_tag: str = None # type: ignore (we set it later)
223 elif not self.requested_channel:
224 # User did not specify a channel, so we are requesting the default channel
225 self.requested_channel = self._channel.partition('@')[0]
227 # --update should not be treated as an exact tag request even if CHANNEL has a @tag
228 self._exact = bool(target) and target != self._channel
229 if not self.requested_tag:
230 # User did not specify a tag, so we request 'latest' and track that no exact tag was passed
231 self.requested_tag = 'latest'
232 self._exact = False
234 if '/' in self.requested_channel:
235 # requested_channel is actually a repository
236 self.requested_repo = self.requested_channel
237 if not self.requested_repo.startswith('yt-dlp/') and self.requested_repo != self._origin:
238 self.ydl.report_warning(
239 f'You are switching to an {self.ydl._format_err("unofficial", "red")} executable '
240 f'from {self.ydl._format_err(self.requested_repo, self.ydl.Styles.EMPHASIS)}. '
241 f'Run {self.ydl._format_err("at your own risk", "light red")}')
242 self._block_restart('Automatically restarting into custom builds is disabled for security reasons')
243 else:
244 # Check if requested_channel resolves to a known repository or else raise
245 self.requested_repo = self._update_sources.get(self.requested_channel)
246 if not self.requested_repo:
247 self._report_error(
248 f'Invalid update channel {self.requested_channel!r} requested. '
249 f'Valid channels are {", ".join(self._update_sources)}', True)
251 self._identifier = f'{detect_variant()} {system_identifier()}'
253 @property
254 def current_version(self):
255 """Current version"""
256 return __version__
258 @property
259 def current_commit(self):
260 """Current commit hash"""
261 return RELEASE_GIT_HEAD
263 def _download_asset(self, name, tag=None):
264 if not tag:
265 tag = self.requested_tag
267 path = 'latest/download' if tag == 'latest' else f'download/{tag}'
268 url = f'https://github.com/{self.requested_repo}/releases/{path}/{name}'
269 self.ydl.write_debug(f'Downloading {name} from {url}')
270 return self.ydl.urlopen(url).read()
272 def _call_api(self, tag):
273 tag = f'tags/{tag}' if tag != 'latest' else tag
274 url = f'{API_BASE_URL}/{self.requested_repo}/releases/{tag}'
275 self.ydl.write_debug(f'Fetching release info: {url}')
276 return json.loads(self.ydl.urlopen(Request(url, headers={
277 'Accept': 'application/vnd.github+json',
278 'User-Agent': 'yt-dlp',
279 'X-GitHub-Api-Version': '2022-11-28',
280 })).read().decode())
282 def _get_version_info(self, tag: str) -> tuple[str | None, str | None]:
283 if _VERSION_RE.fullmatch(tag):
284 return tag, None
286 api_info = self._call_api(tag)
288 if tag == 'latest':
289 requested_version = api_info['tag_name']
290 else:
291 match = re.search(rf'\s+(?P<version>{_VERSION_RE.pattern})$', api_info.get('name', ''))
292 requested_version = match.group('version') if match else None
294 if re.fullmatch(_HASH_PATTERN, api_info.get('target_commitish', '')):
295 target_commitish = api_info['target_commitish']
296 else:
297 match = _COMMIT_RE.match(api_info.get('body', ''))
298 target_commitish = match.group('hash') if match else None
300 if not (requested_version or target_commitish):
301 self._report_error('One of either version or commit hash must be available on the release', expected=True)
303 return requested_version, target_commitish
305 def _download_update_spec(self, source_tags):
306 for tag in source_tags:
307 try:
308 return self._download_asset('_update_spec', tag=tag).decode()
309 except network_exceptions as error:
310 if isinstance(error, HTTPError) and error.status == 404:
311 continue
312 self._report_network_error(f'fetch update spec: {error}')
314 self._report_error(
315 f'The requested tag {self.requested_tag} does not exist for {self.requested_repo}', True)
316 return None
318 def _process_update_spec(self, lockfile: str, resolved_tag: str):
319 lines = lockfile.splitlines()
320 is_version2 = any(line.startswith('lockV2 ') for line in lines)
322 for line in lines:
323 if is_version2:
324 if not line.startswith(f'lockV2 {self.requested_repo} '):
325 continue
326 _, _, tag, pattern = line.split(' ', 3)
327 else:
328 if not line.startswith('lock '):
329 continue
330 _, tag, pattern = line.split(' ', 2)
332 if re.match(pattern, self._identifier):
333 if _VERSION_RE.fullmatch(tag):
334 if not self._exact:
335 return tag
336 elif self._version_compare(tag, resolved_tag):
337 return resolved_tag
338 elif tag != resolved_tag:
339 continue
341 self._report_error(
342 f'yt-dlp cannot be updated to {resolved_tag} since you are on an older Python version', True)
343 return None
345 return resolved_tag
347 def _version_compare(self, a: str, b: str):
349 Compare two version strings
351 This function SHOULD NOT be called if self._exact == True
353 if _VERSION_RE.fullmatch(f'{a}.{b}'):
354 return version_tuple(a) >= version_tuple(b)
355 return a == b
357 def query_update(self, *, _output=False) -> UpdateInfo | None:
358 """Fetches info about the available update
359 @returns An `UpdateInfo` if there is an update available, else None
361 if not self.requested_repo:
362 self._report_error('No target repository could be determined from input')
363 return None
365 try:
366 requested_version, target_commitish = self._get_version_info(self.requested_tag)
367 except network_exceptions as e:
368 self._report_network_error(f'obtain version info ({e})', delim='; Please try again later or')
369 return None
371 if self._exact and self._origin != self.requested_repo:
372 has_update = True
373 elif requested_version:
374 if self._exact:
375 has_update = self.current_version != requested_version
376 else:
377 has_update = not self._version_compare(self.current_version, requested_version)
378 elif target_commitish:
379 has_update = target_commitish != self.current_commit
380 else:
381 has_update = False
383 resolved_tag = requested_version if self.requested_tag == 'latest' else self.requested_tag
384 current_label = _make_label(self._origin, self._channel.partition('@')[2] or self.current_version, self.current_version)
385 requested_label = _make_label(self.requested_repo, resolved_tag, requested_version)
386 latest_or_requested = f'{"Latest" if self.requested_tag == "latest" else "Requested"} version: {requested_label}'
387 if not has_update:
388 if _output:
389 self.ydl.to_screen(f'{latest_or_requested}\nyt-dlp is up to date ({current_label})')
390 return None
392 update_spec = self._download_update_spec(('latest', None) if requested_version else (None,))
393 if not update_spec:
394 return None
395 # `result_` prefixed vars == post-_process_update_spec() values
396 result_tag = self._process_update_spec(update_spec, resolved_tag)
397 if not result_tag or result_tag == self.current_version:
398 return None
399 elif result_tag == resolved_tag:
400 result_version = requested_version
401 elif _VERSION_RE.fullmatch(result_tag):
402 result_version = result_tag
403 else: # actual version being updated to is unknown
404 result_version = None
406 checksum = None
407 # Non-updateable variants can get update_info but need to skip checksum
408 if not is_non_updateable():
409 try:
410 hashes = self._download_asset('SHA2-256SUMS', result_tag)
411 except network_exceptions as error:
412 if not isinstance(error, HTTPError) or error.status != 404:
413 self._report_network_error(f'fetch checksums: {error}')
414 return None
415 self.ydl.report_warning('No hash information found for the release, skipping verification')
416 else:
417 for ln in hashes.decode().splitlines():
418 if ln.endswith(_get_binary_name()):
419 checksum = ln.split()[0]
420 break
421 if not checksum:
422 self.ydl.report_warning('The hash could not be found in the checksum file, skipping verification')
424 if _output:
425 update_label = _make_label(self.requested_repo, result_tag, result_version)
426 self.ydl.to_screen(
427 f'Current version: {current_label}\n{latest_or_requested}'
428 + (f'\nUpgradable to: {update_label}' if update_label != requested_label else ''))
430 return UpdateInfo(
431 tag=result_tag,
432 version=result_version,
433 requested_version=requested_version,
434 commit=target_commitish if result_tag == resolved_tag else None,
435 checksum=checksum)
437 def update(self, update_info=NO_DEFAULT):
438 """Update yt-dlp executable to the latest version
439 @param update_info `UpdateInfo | None` as returned by query_update()
441 if update_info is NO_DEFAULT:
442 update_info = self.query_update(_output=True)
443 if not update_info:
444 return False
446 err = is_non_updateable()
447 if err:
448 self._report_error(err, True)
449 return False
451 self.ydl.to_screen(f'Current Build Hash: {_sha256_file(self.filename)}')
453 update_label = _make_label(self.requested_repo, update_info.tag, update_info.version)
454 self.ydl.to_screen(f'Updating to {update_label} ...')
456 directory = os.path.dirname(self.filename)
457 if not os.access(self.filename, os.W_OK):
458 return self._report_permission_error(self.filename)
459 elif not os.access(directory, os.W_OK):
460 return self._report_permission_error(directory)
462 new_filename, old_filename = f'{self.filename}.new', f'{self.filename}.old'
463 if detect_variant() == 'zip': # Can be replaced in-place
464 new_filename, old_filename = self.filename, None
466 try:
467 if os.path.exists(old_filename or ''):
468 os.remove(old_filename)
469 except OSError:
470 return self._report_error('Unable to remove the old version')
472 try:
473 newcontent = self._download_asset(update_info.binary_name, update_info.tag)
474 except network_exceptions as e:
475 if isinstance(e, HTTPError) and e.status == 404:
476 return self._report_error(
477 f'The requested tag {self.requested_repo}@{update_info.tag} does not exist', True)
478 return self._report_network_error(f'fetch updates: {e}', tag=update_info.tag)
480 if not update_info.checksum:
481 self._block_restart('Automatically restarting into unverified builds is disabled for security reasons')
482 elif hashlib.sha256(newcontent).hexdigest() != update_info.checksum:
483 return self._report_network_error('verify the new executable', tag=update_info.tag)
485 try:
486 with open(new_filename, 'wb') as outf:
487 outf.write(newcontent)
488 except OSError:
489 return self._report_permission_error(new_filename)
491 if old_filename:
492 mask = os.stat(self.filename).st_mode
493 try:
494 os.rename(self.filename, old_filename)
495 except OSError:
496 return self._report_error('Unable to move current version')
498 try:
499 os.rename(new_filename, self.filename)
500 except OSError:
501 self._report_error('Unable to overwrite current version')
502 return os.rename(old_filename, self.filename)
504 variant = detect_variant()
505 if variant.startswith('win') or variant == 'py2exe':
506 atexit.register(Popen, f'ping 127.0.0.1 -n 5 -w 1000 & del /F "{old_filename}"',
507 shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
508 elif old_filename:
509 try:
510 os.remove(old_filename)
511 except OSError:
512 self._report_error('Unable to remove the old version')
514 try:
515 os.chmod(self.filename, mask)
516 except OSError:
517 return self._report_error(
518 f'Unable to set permissions. Run: sudo chmod a+rx {shell_quote(self.filename)}')
520 self.ydl.to_screen(f'Updated yt-dlp to {update_label}')
521 return True
523 @functools.cached_property
524 def filename(self):
525 """Filename of the executable"""
526 return compat_realpath(_get_variant_and_executable_path()[1])
528 @functools.cached_property
529 def cmd(self):
530 """The command-line to run the executable, if known"""
531 # There is no sys.orig_argv in py < 3.10. Also, it can be [] when frozen
532 if getattr(sys, 'orig_argv', None):
533 return sys.orig_argv
534 elif getattr(sys, 'frozen', False):
535 return sys.argv
537 def restart(self):
538 """Restart the executable"""
539 assert self.cmd, 'Must be frozen or Py >= 3.10'
540 self.ydl.write_debug(f'Restarting: {shell_quote(self.cmd)}')
541 _, _, returncode = Popen.run(self.cmd)
542 return returncode
544 def _block_restart(self, msg):
545 def wrapper():
546 self._report_error(f'{msg}. Restart yt-dlp to use the updated version', expected=True)
547 return self.ydl._download_retcode
548 self.restart = wrapper
550 def _report_error(self, msg, expected=False):
551 self.ydl.report_error(msg, tb=False if expected else None)
552 self.ydl._download_retcode = 100
554 def _report_permission_error(self, file):
555 self._report_error(f'Unable to write to {file}; try running as administrator', True)
557 def _report_network_error(self, action, delim=';', tag=None):
558 if not tag:
559 tag = self.requested_tag
560 self._report_error(
561 f'Unable to {action}{delim} visit https://github.com/{self.requested_repo}/releases/'
562 + tag if tag == 'latest' else f'tag/{tag}', True)
564 # XXX: Everything below this line in this class is deprecated / for compat only
565 @property
566 def _target_tag(self):
567 """Deprecated; requested tag with 'tags/' prepended when necessary for API calls"""
568 return f'tags/{self.requested_tag}' if self.requested_tag != 'latest' else self.requested_tag
570 def _check_update(self):
571 """Deprecated; report whether there is an update available"""
572 return bool(self.query_update(_output=True))
574 def __getattr__(self, attribute: str):
575 """Compat getter function for deprecated attributes"""
576 deprecated_props_map = {
577 'check_update': '_check_update',
578 'target_tag': '_target_tag',
579 'target_channel': 'requested_channel',
581 update_info_props_map = {
582 'has_update': '_has_update',
583 'new_version': 'version',
584 'latest_version': 'requested_version',
585 'release_name': 'binary_name',
586 'release_hash': 'checksum',
589 if attribute not in deprecated_props_map and attribute not in update_info_props_map:
590 raise AttributeError(f'{type(self).__name__!r} object has no attribute {attribute!r}')
592 msg = f'{type(self).__name__}.{attribute} is deprecated and will be removed in a future version'
593 if attribute in deprecated_props_map:
594 source_name = deprecated_props_map[attribute]
595 if not source_name.startswith('_'):
596 msg += f'. Please use {source_name!r} instead'
597 source = self
598 mapping = deprecated_props_map
600 else: # attribute in update_info_props_map
601 msg += '. Please call query_update() instead'
602 source = self.query_update()
603 if source is None:
604 source = UpdateInfo('', None, None, None)
605 source._has_update = False
606 mapping = update_info_props_map
608 deprecation_warning(msg)
609 for target_name, source_name in mapping.items():
610 value = getattr(source, source_name)
611 setattr(self, target_name, value)
613 return getattr(self, attribute)
616 def run_update(ydl):
617 """Update the program file with the latest version from the repository
618 @returns Whether there was a successful update (No update = False)
620 return Updater(ydl).update()
623 __all__ = ['Updater']