Extensions: lock the repositories before overwriting their manifests
[blender-addons-contrib.git] / bl_pkg / wheel_manager.py
blobfe509364a6747c3de416531cddf740bb78dfee81
1 # SPDX-FileCopyrightText: 2024 Blender Foundation
3 # SPDX-License-Identifier: GPL-2.0-or-later
5 # Ref: https://peps.python.org/pep-0491/
6 # Deferred but seems to include valid info for existing wheels.
8 """
9 This module takes wheels and applies them to a "managed" destination directory.
10 """
12 __all__ = (
13 "apply_action"
16 import os
17 import re
18 import shutil
19 import zipfile
21 from typing import (
22 Dict,
23 List,
24 Optional,
25 Set,
26 Tuple,
29 WheelSource = Tuple[
30 # Key - doesn't matter what this is... it's just a handle.
31 str,
32 # A list of absolute wheel file-paths.
33 List[str],
37 def _read_records_csv(filepath: str) -> List[List[str]]:
38 import csv
39 with open(filepath, encoding="utf8", errors="surrogateescape") as fh:
40 return list(csv.reader(fh.read().splitlines()))
43 def _wheels_from_dir(dirpath: str) -> Tuple[
44 # The key is:
45 # wheel_id
46 # The values are:
47 # Top level directories.
48 Dict[str, List[str]],
49 # Unknown path.s
50 List[str],
52 result: Dict[str, List[str]] = {}
53 paths_unused: Set[str] = set()
55 if not os.path.exists(dirpath):
56 return result, list(paths_unused)
58 for entry in os.scandir(dirpath):
59 name = entry.name
60 paths_unused.add(name)
61 if not entry.is_dir():
62 continue
63 # TODO: is this part of the spec?
64 name = entry.name
65 if not name.endswith("-info"):
66 continue
67 filepath_record = os.path.join(entry.path, "RECORD")
68 if not os.path.exists(filepath_record):
69 continue
71 record_rows = _read_records_csv(filepath_record)
73 # Build top-level paths.
74 toplevel_paths_set: Set[str] = set()
75 for row in record_rows:
76 if not row:
77 continue
78 path_text = row[0]
79 # Ensure paths separator is compatible.
80 path_text = path_text.replace("\\", "/")
81 # Ensure double slashes don't cause issues or "/./" doesn't complicate checking the head of the path.
82 path_split = [
83 elem for elem in path_text.split("/")
84 if elem not in {"", "."}
86 if not path_split:
87 continue
88 # These wont have been extracted.
89 if path_split[0] in {"..", name}:
90 continue
92 toplevel_paths_set.add(path_split[0])
94 result[name] = list(sorted(toplevel_paths_set))
95 del toplevel_paths_set
97 for wheel_name, toplevel_paths in result.items():
98 paths_unused.discard(wheel_name)
99 for name in toplevel_paths:
100 paths_unused.discard(name)
102 paths_unused_list = list(sorted(paths_unused))
104 return result, paths_unused_list
107 def _wheel_info_dir_from_zip(filepath_wheel: str) -> Optional[Tuple[str, List[str]]]:
109 Return:
110 - The "*-info" directory name which contains meta-data.
111 - The top-level path list (excluding "..").
113 dir_info = ""
114 toplevel_paths: Set[str] = set()
116 with zipfile.ZipFile(filepath_wheel, mode="r") as zip_fh:
117 # This file will always exist.
118 for filepath_rel in zip_fh.namelist():
119 path_split = [
120 elem for elem in filepath_rel.split("/")
121 if elem not in {"", "."}
123 if not path_split:
124 continue
125 if path_split[0] == "..":
126 continue
128 if len(path_split) == 2:
129 if path_split[1].upper() == "RECORD":
130 if path_split[0].endswith("-info"):
131 dir_info = path_split[0]
133 toplevel_paths.add(path_split[0])
135 if dir_info == "":
136 return None
137 toplevel_paths.discard(dir_info)
138 toplevel_paths_list = list(sorted(toplevel_paths))
139 return dir_info, toplevel_paths_list
142 def _rmtree_safe(dir_remove: str, expected_root: str) -> None:
143 if not dir_remove.startswith(expected_root):
144 raise Exception("Expected prefix not found")
145 shutil.rmtree(dir_remove)
148 def _zipfile_extractall_safe(
149 zip_fh: zipfile.ZipFile,
150 path: str,
151 path_restrict: str,
152 ) -> None:
154 A version of ``ZipFile.extractall`` that wont write to paths outside ``path_restrict``.
156 Avoids writing this:
157 ``zip_fh.extractall(zip_fh, path)``
159 sep = os.sep
160 path_restrict = path_restrict.rstrip(sep)
161 if sep == "\\":
162 path_restrict = path_restrict.rstrip("/")
163 path_restrict_with_slash = path_restrict + sep
165 # Strip is probably not needed (only if multiple slashes exist).
166 path_prefix = path[len(path_restrict_with_slash):].lstrip(sep)
167 # Switch slashes forward.
168 if sep == "\\":
169 path_prefix = path_prefix.replace("\\", "/").rstrip("/") + "/"
170 else:
171 path_prefix = path_prefix + "/"
173 path_restrict_with_slash = path_restrict + sep
174 assert len(path) >= len(path_restrict_with_slash)
175 if not path.startswith(path_restrict_with_slash):
176 raise Exception("Expected the restricted directory to start with ")
178 for member in zip_fh.infolist():
179 filename_orig = member.filename
180 member.filename = path_prefix + filename_orig
181 # This isn't likely to happen so accept a noisy print here.
182 # If this ends up happening more often, it could be suppressed.
183 # (although this hints at bigger problems because we might be excluding necessary files).
184 if os.path.normpath(member.filename).startswith(".." + sep):
185 print("Skipping path:", member.filename, "that escapes:", path_restrict)
186 continue
187 zip_fh.extract(member, path_restrict)
188 member.filename = filename_orig
191 WHEEL_VERSION_RE = re.compile(r"(\d+)?(?:\.(\d+))?(?:\.(\d+))")
194 def wheel_version_from_filename_for_cmp(
195 filename: str,
196 ) -> Tuple[int, int, int, str]:
198 Extract the version number for comparison.
199 Note that this only handled the first 3 numbers,
200 the trailing text is compared as a string which is not technically correct
201 however this is not a priority to support since scripts should only be including stable releases,
202 so comparing the first 3 numbers is sufficient. The trailing string is just a tie breaker in the
203 unlikely event it differs.
205 If supporting the full spec, comparing: "1.1.dev6" with "1.1.6rc6" for e.g.
206 we could support this doesn't seem especially important as extensions should use major releases.
208 filename_split = filename.split("-")
209 if len(filename_split) >= 2:
210 version = filename.split("-")[1]
211 if (version_match := WHEEL_VERSION_RE.match(version)) is not None:
212 groups = version_match.groups()
213 # print(groups)
214 return (
215 int(groups[0]) if groups[0] is not None else 0,
216 int(groups[1]) if groups[1] is not None else 0,
217 int(groups[2]) if groups[2] is not None else 0,
218 version[version_match.end():],
220 return (0, 0, 0, "")
223 def wheel_list_deduplicate_as_skip_set(
224 wheel_list: List[WheelSource],
225 ) -> Set[str]:
227 Return all wheel paths to skip.
229 wheels_to_skip: Set[str] = set()
230 all_wheels: Set[str] = {
231 filepath
232 for _, wheels in wheel_list
233 for filepath in wheels
236 # NOTE: this is not optimized.
237 # Probably speed is never an issue here, but this could be sped up.
239 # Keep a map from the base name to the "best" wheel,
240 # the other wheels get added to `wheels_to_skip` to be ignored.
241 all_wheels_by_base: Dict[str, str] = {}
243 for wheel in all_wheels:
244 wheel_filename = os.path.basename(wheel)
245 wheel_base = wheel_filename.partition("-")[0]
247 wheel_exists = all_wheels_by_base.get(wheel_base)
248 if wheel_exists is None:
249 all_wheels_by_base[wheel_base] = wheel
250 continue
252 wheel_exists_filename = os.path.basename(wheel_exists)
253 if wheel_exists_filename == wheel_filename:
254 # Should never happen because they are converted into a set before looping.
255 assert wheel_exists != wheel
256 # The same wheel is used in two different locations, use a tie breaker for predictability
257 # although the result should be the same.
258 if wheel_exists_filename < wheel_filename:
259 all_wheels_by_base[wheel_base] = wheel
260 wheels_to_skip.add(wheel_exists)
261 else:
262 wheels_to_skip.add(wheel)
263 else:
264 wheel_version = wheel_version_from_filename_for_cmp(wheel_filename)
265 wheel_exists_version = wheel_version_from_filename_for_cmp(wheel_exists_filename)
266 if (
267 (wheel_exists_version < wheel_version) or
268 # Tie breaker for predictability.
269 ((wheel_exists_version == wheel_version) and (wheel_exists_filename < wheel_filename))
271 all_wheels_by_base[wheel_base] = wheel
272 wheels_to_skip.add(wheel_exists)
273 else:
274 wheels_to_skip.add(wheel)
276 return wheels_to_skip
279 def apply_action(
281 local_dir: str,
282 local_dir_site_packages: str,
283 wheel_list: List[WheelSource],
284 ) -> None:
286 :arg local_dir:
287 The location wheels are stored.
288 Typically: ``~/.config/blender/4.2/extensions/.local``.
290 WARNING: files under this directory may be removed.
291 :arg local_dir_site_packages:
292 The path which wheels are extracted into.
293 Typically: ``~/.config/blender/4.2/extensions/.local/lib/python3.11/site-packages``.
295 debug = False
297 # NOTE: we could avoid scanning the wheel directories however:
298 # Recursively removing all paths on the users system can be considered relatively risky
299 # even if this is located in a known location under the users home directory - better avoid.
300 # So build a list of wheel paths and only remove the unused paths from this list.
301 wheels_installed, paths_unknown = _wheels_from_dir(local_dir_site_packages)
303 # Wheels and their top level directories (which would be installed).
304 wheels_packages: Dict[str, List[str]] = {}
306 # Map the wheel ID to path.
307 wheels_dir_info_to_filepath_map: Dict[str, str] = {}
309 # NOTE(@ideasman42): the wheels skip-set only de-duplicates at the level of the base-name of the wheels filename.
310 # So the wheel file-paths:
311 # - `pip-24.0-py3-none-any.whl`
312 # - `pip-22.1-py2-none-any.whl`
313 # Will both extract the *base* name `pip`, de-duplicating by skipping the wheels with an older version number.
314 # This is not fool-proof, because it is possible files inside the `.whl` conflict upon extraction.
315 # In practice I consider this fairly unlikely because:
316 # - Practically all wheels extract to their top-level module names.
317 # - Modules are mainly downloaded from the Python package index.
319 # Having two modules conflict is possible but this is an issue outside of Blender,
320 # as it's most likely quite rare and generally avoided with unique module names,
321 # this is not considered a problem to "solve" at the moment.
323 # The one exception to this assumption is any extensions that bundle `.whl` files that aren't
324 # available on the Python package index. In this case naming collisions are more likely.
325 # This probably needs to be handled on a policy level - if the `.whl` author also maintains
326 # the extension they can in all likelihood make the module a sub-module of the extension
327 # without the need to use `.whl` files.
328 wheels_to_skip = wheel_list_deduplicate_as_skip_set(wheel_list)
330 for key, wheels in wheel_list:
331 for wheel in wheels:
332 if wheel in wheels_to_skip:
333 continue
334 if (wheel_info := _wheel_info_dir_from_zip(wheel)) is None:
335 continue
336 dir_info, toplevel_paths_list = wheel_info
337 wheels_packages[dir_info] = toplevel_paths_list
339 wheels_dir_info_to_filepath_map[dir_info] = wheel
341 # Now there is two sets of packages, the ones we need and the ones we have.
343 # -----
344 # Clear
346 # First remove installed packages no longer needed:
347 for dir_info, toplevel_paths_list in wheels_installed.items():
348 if dir_info in wheels_packages:
349 continue
351 # Remove installed packages which aren't needed any longer.
352 for filepath_rel in (dir_info, *toplevel_paths_list):
353 filepath_abs = os.path.join(local_dir_site_packages, filepath_rel)
354 if not os.path.exists(filepath_abs):
355 continue
357 if debug:
358 print("removing wheel:", filepath_rel)
360 if os.path.isdir(filepath_abs):
361 _rmtree_safe(filepath_abs, local_dir)
362 else:
363 os.remove(filepath_abs)
365 # -----
366 # Setup
368 # Install packages that need to be installed:
369 for dir_info, toplevel_paths_list in wheels_packages.items():
370 if dir_info in wheels_installed:
371 continue
373 if debug:
374 for filepath_rel in toplevel_paths_list:
375 print("adding wheel:", filepath_rel)
376 filepath = wheels_dir_info_to_filepath_map[dir_info]
377 # `ZipFile.extractall` is needed because some wheels contain paths that point to parent directories.
378 # Handle this *safely* by allowing extracting to parent directories but limit this to the `local_dir`.
379 with zipfile.ZipFile(filepath, mode="r") as zip_fh:
380 _zipfile_extractall_safe(zip_fh, local_dir_site_packages, local_dir)