1 # SPDX-FileCopyrightText: 2024 Blender Foundation
3 # SPDX-License-Identifier: GPL-2.0-or-later
5 # Ref: https://peps.python.org/pep-0491/
6 # Deferred but seems to include valid info for existing wheels.
9 This module takes wheels and applies them to a "managed" destination directory.
30 # Key - doesn't matter what this is... it's just a handle.
32 # A list of absolute wheel file-paths.
37 def _read_records_csv(filepath
: str) -> List
[List
[str]]:
39 with
open(filepath
, encoding
="utf8", errors
="surrogateescape") as fh
:
40 return list(csv
.reader(fh
.read().splitlines()))
43 def _wheels_from_dir(dirpath
: str) -> Tuple
[
47 # Top level directories.
52 result
: Dict
[str, List
[str]] = {}
53 paths_unused
: Set
[str] = set()
55 if not os
.path
.exists(dirpath
):
56 return result
, list(paths_unused
)
58 for entry
in os
.scandir(dirpath
):
60 paths_unused
.add(name
)
61 if not entry
.is_dir():
63 # TODO: is this part of the spec?
65 if not name
.endswith("-info"):
67 filepath_record
= os
.path
.join(entry
.path
, "RECORD")
68 if not os
.path
.exists(filepath_record
):
71 record_rows
= _read_records_csv(filepath_record
)
73 # Build top-level paths.
74 toplevel_paths_set
: Set
[str] = set()
75 for row
in record_rows
:
79 # Ensure paths separator is compatible.
80 path_text
= path_text
.replace("\\", "/")
81 # Ensure double slashes don't cause issues or "/./" doesn't complicate checking the head of the path.
83 elem
for elem
in path_text
.split("/")
84 if elem
not in {"", "."}
88 # These wont have been extracted.
89 if path_split
[0] in {"..", name
}:
92 toplevel_paths_set
.add(path_split
[0])
94 result
[name
] = list(sorted(toplevel_paths_set
))
95 del toplevel_paths_set
97 for wheel_name
, toplevel_paths
in result
.items():
98 paths_unused
.discard(wheel_name
)
99 for name
in toplevel_paths
:
100 paths_unused
.discard(name
)
102 paths_unused_list
= list(sorted(paths_unused
))
104 return result
, paths_unused_list
107 def _wheel_info_dir_from_zip(filepath_wheel
: str) -> Optional
[Tuple
[str, List
[str]]]:
110 - The "*-info" directory name which contains meta-data.
111 - The top-level path list (excluding "..").
114 toplevel_paths
: Set
[str] = set()
116 with zipfile
.ZipFile(filepath_wheel
, mode
="r") as zip_fh
:
117 # This file will always exist.
118 for filepath_rel
in zip_fh
.namelist():
120 elem
for elem
in filepath_rel
.split("/")
121 if elem
not in {"", "."}
125 if path_split
[0] == "..":
128 if len(path_split
) == 2:
129 if path_split
[1].upper() == "RECORD":
130 if path_split
[0].endswith("-info"):
131 dir_info
= path_split
[0]
133 toplevel_paths
.add(path_split
[0])
137 toplevel_paths
.discard(dir_info
)
138 toplevel_paths_list
= list(sorted(toplevel_paths
))
139 return dir_info
, toplevel_paths_list
142 def _rmtree_safe(dir_remove
: str, expected_root
: str) -> None:
143 if not dir_remove
.startswith(expected_root
):
144 raise Exception("Expected prefix not found")
145 shutil
.rmtree(dir_remove
)
148 def _zipfile_extractall_safe(
149 zip_fh
: zipfile
.ZipFile
,
154 A version of ``ZipFile.extractall`` that wont write to paths outside ``path_restrict``.
157 ``zip_fh.extractall(zip_fh, path)``
160 path_restrict
= path_restrict
.rstrip(sep
)
162 path_restrict
= path_restrict
.rstrip("/")
163 path_restrict_with_slash
= path_restrict
+ sep
165 # Strip is probably not needed (only if multiple slashes exist).
166 path_prefix
= path
[len(path_restrict_with_slash
):].lstrip(sep
)
167 # Switch slashes forward.
169 path_prefix
= path_prefix
.replace("\\", "/").rstrip("/") + "/"
171 path_prefix
= path_prefix
+ "/"
173 path_restrict_with_slash
= path_restrict
+ sep
174 assert len(path
) >= len(path_restrict_with_slash
)
175 if not path
.startswith(path_restrict_with_slash
):
176 raise Exception("Expected the restricted directory to start with ")
178 for member
in zip_fh
.infolist():
179 filename_orig
= member
.filename
180 member
.filename
= path_prefix
+ filename_orig
181 # This isn't likely to happen so accept a noisy print here.
182 # If this ends up happening more often, it could be suppressed.
183 # (although this hints at bigger problems because we might be excluding necessary files).
184 if os
.path
.normpath(member
.filename
).startswith(".." + sep
):
185 print("Skipping path:", member
.filename
, "that escapes:", path_restrict
)
187 zip_fh
.extract(member
, path_restrict
)
188 member
.filename
= filename_orig
191 WHEEL_VERSION_RE
= re
.compile(r
"(\d+)?(?:\.(\d+))?(?:\.(\d+))")
194 def wheel_version_from_filename_for_cmp(
196 ) -> Tuple
[int, int, int, str]:
198 Extract the version number for comparison.
199 Note that this only handled the first 3 numbers,
200 the trailing text is compared as a string which is not technically correct
201 however this is not a priority to support since scripts should only be including stable releases,
202 so comparing the first 3 numbers is sufficient. The trailing string is just a tie breaker in the
203 unlikely event it differs.
205 If supporting the full spec, comparing: "1.1.dev6" with "1.1.6rc6" for e.g.
206 we could support this doesn't seem especially important as extensions should use major releases.
208 filename_split
= filename
.split("-")
209 if len(filename_split
) >= 2:
210 version
= filename
.split("-")[1]
211 if (version_match
:= WHEEL_VERSION_RE
.match(version
)) is not None:
212 groups
= version_match
.groups()
215 int(groups
[0]) if groups
[0] is not None else 0,
216 int(groups
[1]) if groups
[1] is not None else 0,
217 int(groups
[2]) if groups
[2] is not None else 0,
218 version
[version_match
.end():],
223 def wheel_list_deduplicate_as_skip_set(
224 wheel_list
: List
[WheelSource
],
227 Return all wheel paths to skip.
229 wheels_to_skip
: Set
[str] = set()
230 all_wheels
: Set
[str] = {
232 for _
, wheels
in wheel_list
233 for filepath
in wheels
236 # NOTE: this is not optimized.
237 # Probably speed is never an issue here, but this could be sped up.
239 # Keep a map from the base name to the "best" wheel,
240 # the other wheels get added to `wheels_to_skip` to be ignored.
241 all_wheels_by_base
: Dict
[str, str] = {}
243 for wheel
in all_wheels
:
244 wheel_filename
= os
.path
.basename(wheel
)
245 wheel_base
= wheel_filename
.partition("-")[0]
247 wheel_exists
= all_wheels_by_base
.get(wheel_base
)
248 if wheel_exists
is None:
249 all_wheels_by_base
[wheel_base
] = wheel
252 wheel_exists_filename
= os
.path
.basename(wheel_exists
)
253 if wheel_exists_filename
== wheel_filename
:
254 # Should never happen because they are converted into a set before looping.
255 assert wheel_exists
!= wheel
256 # The same wheel is used in two different locations, use a tie breaker for predictability
257 # although the result should be the same.
258 if wheel_exists_filename
< wheel_filename
:
259 all_wheels_by_base
[wheel_base
] = wheel
260 wheels_to_skip
.add(wheel_exists
)
262 wheels_to_skip
.add(wheel
)
264 wheel_version
= wheel_version_from_filename_for_cmp(wheel_filename
)
265 wheel_exists_version
= wheel_version_from_filename_for_cmp(wheel_exists_filename
)
267 (wheel_exists_version
< wheel_version
) or
268 # Tie breaker for predictability.
269 ((wheel_exists_version
== wheel_version
) and (wheel_exists_filename
< wheel_filename
))
271 all_wheels_by_base
[wheel_base
] = wheel
272 wheels_to_skip
.add(wheel_exists
)
274 wheels_to_skip
.add(wheel
)
276 return wheels_to_skip
282 local_dir_site_packages
: str,
283 wheel_list
: List
[WheelSource
],
287 The location wheels are stored.
288 Typically: ``~/.config/blender/4.2/extensions/.local``.
290 WARNING: files under this directory may be removed.
291 :arg local_dir_site_packages:
292 The path which wheels are extracted into.
293 Typically: ``~/.config/blender/4.2/extensions/.local/lib/python3.11/site-packages``.
297 # NOTE: we could avoid scanning the wheel directories however:
298 # Recursively removing all paths on the users system can be considered relatively risky
299 # even if this is located in a known location under the users home directory - better avoid.
300 # So build a list of wheel paths and only remove the unused paths from this list.
301 wheels_installed
, paths_unknown
= _wheels_from_dir(local_dir_site_packages
)
303 # Wheels and their top level directories (which would be installed).
304 wheels_packages
: Dict
[str, List
[str]] = {}
306 # Map the wheel ID to path.
307 wheels_dir_info_to_filepath_map
: Dict
[str, str] = {}
309 # NOTE(@ideasman42): the wheels skip-set only de-duplicates at the level of the base-name of the wheels filename.
310 # So the wheel file-paths:
311 # - `pip-24.0-py3-none-any.whl`
312 # - `pip-22.1-py2-none-any.whl`
313 # Will both extract the *base* name `pip`, de-duplicating by skipping the wheels with an older version number.
314 # This is not fool-proof, because it is possible files inside the `.whl` conflict upon extraction.
315 # In practice I consider this fairly unlikely because:
316 # - Practically all wheels extract to their top-level module names.
317 # - Modules are mainly downloaded from the Python package index.
319 # Having two modules conflict is possible but this is an issue outside of Blender,
320 # as it's most likely quite rare and generally avoided with unique module names,
321 # this is not considered a problem to "solve" at the moment.
323 # The one exception to this assumption is any extensions that bundle `.whl` files that aren't
324 # available on the Python package index. In this case naming collisions are more likely.
325 # This probably needs to be handled on a policy level - if the `.whl` author also maintains
326 # the extension they can in all likelihood make the module a sub-module of the extension
327 # without the need to use `.whl` files.
328 wheels_to_skip
= wheel_list_deduplicate_as_skip_set(wheel_list
)
330 for key
, wheels
in wheel_list
:
332 if wheel
in wheels_to_skip
:
334 if (wheel_info
:= _wheel_info_dir_from_zip(wheel
)) is None:
336 dir_info
, toplevel_paths_list
= wheel_info
337 wheels_packages
[dir_info
] = toplevel_paths_list
339 wheels_dir_info_to_filepath_map
[dir_info
] = wheel
341 # Now there is two sets of packages, the ones we need and the ones we have.
346 # First remove installed packages no longer needed:
347 for dir_info
, toplevel_paths_list
in wheels_installed
.items():
348 if dir_info
in wheels_packages
:
351 # Remove installed packages which aren't needed any longer.
352 for filepath_rel
in (dir_info
, *toplevel_paths_list
):
353 filepath_abs
= os
.path
.join(local_dir_site_packages
, filepath_rel
)
354 if not os
.path
.exists(filepath_abs
):
358 print("removing wheel:", filepath_rel
)
360 if os
.path
.isdir(filepath_abs
):
361 _rmtree_safe(filepath_abs
, local_dir
)
363 os
.remove(filepath_abs
)
368 # Install packages that need to be installed:
369 for dir_info
, toplevel_paths_list
in wheels_packages
.items():
370 if dir_info
in wheels_installed
:
374 for filepath_rel
in toplevel_paths_list
:
375 print("adding wheel:", filepath_rel
)
376 filepath
= wheels_dir_info_to_filepath_map
[dir_info
]
377 # `ZipFile.extractall` is needed because some wheels contain paths that point to parent directories.
378 # Handle this *safely* by allowing extracting to parent directories but limit this to the `local_dir`.
379 with zipfile
.ZipFile(filepath
, mode
="r") as zip_fh
:
380 _zipfile_extractall_safe(zip_fh
, local_dir_site_packages
, local_dir
)