Extensions: lock the repositories before overwriting their manifests
[blender-addons-contrib.git] / bl_pkg / bl_extension_utils.py
blobfbf9f3d992e742dcae20078155ec3f16fbeafc2d
1 # SPDX-FileCopyrightText: 2023 Blender Foundation
3 # SPDX-License-Identifier: GPL-2.0-or-later
5 """
6 Non-blocking access to package management.
8 - No ``bpy`` module use.
9 """
11 __all__ = (
12 # Public Repository Actions.
13 "repo_sync",
14 "repo_upgrade",
15 "repo_listing",
17 # Public Package Actions.
18 "pkg_install_files",
19 "pkg_install",
20 "pkg_uninstall",
22 "pkg_make_obsolete_for_testing",
24 "dummy_progress",
26 # Public Stand-Alone Utilities.
27 "pkg_theme_file_list",
28 "file_mtime_or_none",
30 # Public API.
31 "json_from_filepath",
32 "toml_from_filepath",
33 "json_to_filepath",
35 "pkg_manifest_dict_is_valid_or_error",
36 "pkg_manifest_dict_from_file_or_error",
37 "pkg_manifest_archive_url_abs_from_remote_url",
39 "CommandBatch",
40 "RepoCacheStore",
42 # Directory Lock.
43 "RepoLock",
44 "RepoLockContext",
47 import json
48 import os
49 import sys
50 import signal
51 import stat
52 import subprocess
53 import time
54 import tomllib
57 from typing import (
58 Any,
59 Callable,
60 Generator,
61 IO,
62 List,
63 Optional,
64 Dict,
65 NamedTuple,
66 Sequence,
67 Set,
68 Tuple,
69 Union,
72 BASE_DIR = os.path.abspath(os.path.dirname(__file__))
74 BLENDER_EXT_CMD = (
75 # When run from within Blender, it will point to Blender's local Python binary.
76 sys.executable,
77 os.path.normpath(os.path.join(BASE_DIR, "cli", "blender_ext.py")),
80 # This directory is in the local repository.
81 REPO_LOCAL_PRIVATE_DIR = ".blender_ext"
82 # Locate inside `REPO_LOCAL_PRIVATE_DIR`.
83 REPO_LOCAL_PRIVATE_LOCK = "bl_ext_repo.lock"
85 PKG_REPO_LIST_FILENAME = "bl_ext_repo.json"
86 PKG_MANIFEST_FILENAME_TOML = "blender_manifest.toml"
87 PKG_EXT = ".zip"
89 # Add this to the local JSON file.
90 REPO_LOCAL_JSON = os.path.join(REPO_LOCAL_PRIVATE_DIR, PKG_REPO_LIST_FILENAME)
92 # An item we communicate back to Blender.
93 InfoItem = Tuple[str, Any]
94 InfoItemSeq = Sequence[InfoItem]
96 COMPLETE_ITEM = ('DONE', "")
98 # Time to wait when there is no output, avoid 0 as it causes high CPU usage.
99 IDLE_WAIT_ON_READ = 0.05
100 # IDLE_WAIT_ON_READ = 0.2
103 # -----------------------------------------------------------------------------
104 # Internal Functions.
107 if sys.platform == "win32":
108 # See: https://stackoverflow.com/a/35052424/432509
109 def file_handle_make_non_blocking(file_handle: IO[bytes]) -> None:
110 # Constant could define globally but avoid polluting the name-space
111 # thanks to: https://stackoverflow.com/questions/34504970
112 import msvcrt
113 from ctypes import (
114 POINTER,
115 WinError,
116 byref,
117 windll,
118 wintypes,
120 from ctypes.wintypes import (
121 BOOL,
122 DWORD,
123 HANDLE,
126 LPDWORD = POINTER(DWORD)
128 PIPE_NOWAIT = wintypes.DWORD(0x00000001)
130 # Set non-blocking.
131 SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
132 SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
133 SetNamedPipeHandleState.restype = BOOL
134 os_handle = msvcrt.get_osfhandle(file_handle.fileno())
135 res = windll.kernel32.SetNamedPipeHandleState(os_handle, byref(PIPE_NOWAIT), None, None)
136 if res == 0:
137 print(WinError())
139 def file_handle_non_blocking_is_error_blocking(ex: BaseException) -> bool:
140 if not isinstance(ex, OSError):
141 return False
142 from ctypes import GetLastError
143 ERROR_NO_DATA = 232
144 # This is sometimes zero, `ex.args == (22, "Invalid argument")`
145 # This could be checked but for now ignore all zero errors.
146 return (GetLastError() in {0, ERROR_NO_DATA})
148 else:
149 def file_handle_make_non_blocking(file_handle: IO[bytes]) -> None:
150 import fcntl
151 # Get current `file_handle` flags.
152 flags = fcntl.fcntl(file_handle.fileno(), fcntl.F_GETFL)
153 fcntl.fcntl(file_handle, fcntl.F_SETFL, flags | os.O_NONBLOCK)
155 def file_handle_non_blocking_is_error_blocking(ex: BaseException) -> bool:
156 if not isinstance(ex, BlockingIOError):
157 return False
158 return True
161 def file_mtime_or_none(filepath: str) -> Optional[int]:
162 try:
163 # For some reason `mypy` thinks this is a float.
164 return int(os.stat(filepath)[stat.ST_MTIME])
165 except FileNotFoundError:
166 return None
169 def scandir_with_demoted_errors(path: str) -> Generator[os.DirEntry[str], None, None]:
170 try:
171 for entry in os.scandir(path):
172 yield entry
173 except BaseException as ex:
174 print("Error: scandir", ex)
177 # -----------------------------------------------------------------------------
178 # Call JSON.
181 def non_blocking_call(cmd: Sequence[str]) -> subprocess.Popen[bytes]:
182 # pylint: disable-next=consider-using-with
183 ps = subprocess.Popen(cmd, stdout=subprocess.PIPE)
184 stdout = ps.stdout
185 assert stdout is not None
186 # Needed so whatever is available can be read (without waiting).
187 file_handle_make_non_blocking(stdout)
188 return ps
191 def command_output_from_json_0(
192 args: Sequence[str],
193 use_idle: bool,
194 ) -> Generator[InfoItemSeq, bool, None]:
195 cmd = [*BLENDER_EXT_CMD, *args, "--output-type=JSON_0"]
196 ps = non_blocking_call(cmd)
197 stdout = ps.stdout
198 assert stdout is not None
199 chunk_list = []
200 request_exit_signal_sent = False
202 while True:
203 # It's possible this is multiple chunks.
204 try:
205 chunk = stdout.read()
206 except BaseException as ex:
207 if not file_handle_non_blocking_is_error_blocking(ex):
208 raise ex
209 chunk = b''
211 json_messages = []
213 if not chunk:
214 if ps.poll() is not None:
215 break
216 if use_idle:
217 time.sleep(IDLE_WAIT_ON_READ)
218 elif (chunk_zero_index := chunk.find(b'\0')) == -1:
219 chunk_list.append(chunk)
220 else:
221 # Extract contiguous data from `chunk_list`.
222 chunk_list.append(chunk[:chunk_zero_index])
224 json_bytes_list = [b''.join(chunk_list)]
225 chunk_list.clear()
227 # There may be data afterwards, even whole chunks.
228 if chunk_zero_index + 1 != len(chunk):
229 chunk = chunk[chunk_zero_index + 1:]
230 # Add whole chunks.
231 while (chunk_zero_index := chunk.find(b'\0')) != -1:
232 json_bytes_list.append(chunk[:chunk_zero_index])
233 chunk = chunk[chunk_zero_index + 1:]
234 if chunk:
235 chunk_list.append(chunk)
237 request_exit = False
239 for json_bytes in json_bytes_list:
240 json_data = json.loads(json_bytes.decode("utf-8"))
242 assert len(json_data) == 2
243 assert isinstance(json_data[0], str)
245 json_messages.append((json_data[0], json_data[1]))
247 # Yield even when `json_messages`, otherwise this generator can block.
248 # It also means a request to exit might not be responded to soon enough.
249 request_exit = yield json_messages
250 if request_exit and not request_exit_signal_sent:
251 ps.send_signal(signal.SIGINT)
252 request_exit_signal_sent = True
255 # -----------------------------------------------------------------------------
256 # Internal Functions.
260 def repositories_validate_or_errors(repos: Sequence[str]) -> Optional[InfoItemSeq]:
261 return None
264 # -----------------------------------------------------------------------------
265 # Public Stand-Alone Utilities
268 def pkg_theme_file_list(directory: str, pkg_idname: str) -> Tuple[str, List[str]]:
269 theme_dir = os.path.join(directory, pkg_idname)
270 theme_files = [
271 filename for entry in os.scandir(theme_dir)
272 if ((not entry.is_dir()) and
273 (not (filename := entry.name).startswith(".")) and
274 filename.lower().endswith(".xml"))
276 theme_files.sort()
277 return theme_dir, theme_files
280 # -----------------------------------------------------------------------------
281 # Public Repository Actions
284 def repo_sync(
286 directory: str,
287 remote_url: str,
288 online_user_agent: str,
289 use_idle: bool,
290 force_exit_ok: bool = False,
291 extension_override: str = "",
292 ) -> Generator[InfoItemSeq, None, None]:
294 Implementation:
295 ``bpy.ops.ext.repo_sync(directory)``.
297 yield from command_output_from_json_0([
298 "sync",
299 "--local-dir", directory,
300 "--remote-url", remote_url,
301 "--online-user-agent", online_user_agent,
302 *(("--force-exit-ok",) if force_exit_ok else ()),
303 *(("--extension-override", extension_override) if extension_override else ()),
304 ], use_idle=use_idle)
305 yield [COMPLETE_ITEM]
308 def repo_upgrade(
310 directory: str,
311 remote_url: str,
312 online_user_agent: str,
313 use_idle: bool,
314 ) -> Generator[InfoItemSeq, None, None]:
316 Implementation:
317 ``bpy.ops.ext.repo_upgrade(directory)``.
319 yield from command_output_from_json_0([
320 "upgrade",
321 "--local-dir", directory,
322 "--remote-url", remote_url,
323 "--online-user-agent", online_user_agent,
324 ], use_idle=use_idle)
325 yield [COMPLETE_ITEM]
328 def repo_listing(
330 repos: Sequence[str],
331 ) -> Generator[InfoItemSeq, None, None]:
333 Implementation:
334 ``bpy.ops.ext.repo_listing(directory)``.
336 if result := repositories_validate_or_errors(repos):
337 yield result
338 return
340 yield [COMPLETE_ITEM]
343 # -----------------------------------------------------------------------------
344 # Public Package Actions
347 def pkg_install_files(
349 directory: str,
350 files: Sequence[str],
351 use_idle: bool,
352 ) -> Generator[InfoItemSeq, None, None]:
354 Implementation:
355 ``bpy.ops.ext.pkg_install_files(directory, files)``.
357 yield from command_output_from_json_0([
358 "install-files", *files,
359 "--local-dir", directory,
360 ], use_idle=use_idle)
361 yield [COMPLETE_ITEM]
364 def pkg_install(
366 directory: str,
367 remote_url: str,
368 pkg_id_sequence: Sequence[str],
369 online_user_agent: str,
370 use_cache: bool,
371 use_idle: bool,
372 ) -> Generator[InfoItemSeq, None, None]:
374 Implementation:
375 ``bpy.ops.ext.pkg_install(directory, pkg_id)``.
377 yield from command_output_from_json_0([
378 "install", ",".join(pkg_id_sequence),
379 "--local-dir", directory,
380 "--remote-url", remote_url,
381 "--online-user-agent", online_user_agent,
382 "--local-cache", str(int(use_cache)),
383 ], use_idle=use_idle)
384 yield [COMPLETE_ITEM]
387 def pkg_uninstall(
389 directory: str,
390 pkg_id_sequence: Sequence[str],
391 use_idle: bool,
392 ) -> Generator[InfoItemSeq, None, None]:
394 Implementation:
395 ``bpy.ops.ext.pkg_uninstall(directory, pkg_id)``.
397 yield from command_output_from_json_0([
398 "uninstall", ",".join(pkg_id_sequence),
399 "--local-dir", directory,
400 ], use_idle=use_idle)
401 yield [COMPLETE_ITEM]
404 # -----------------------------------------------------------------------------
405 # Public Demo Actions
408 def dummy_progress(
410 use_idle: bool,
411 ) -> Generator[InfoItemSeq, bool, None]:
413 Implementation:
414 ``bpy.ops.ext.dummy_progress()``.
416 yield from command_output_from_json_0(["dummy-progress", "--time-duration=1.0"], use_idle=use_idle)
417 yield [COMPLETE_ITEM]
420 # -----------------------------------------------------------------------------
421 # Public (non-command-line-wrapping) functions
424 def json_from_filepath(filepath_json: str) -> Optional[Dict[str, Any]]:
425 if os.path.exists(filepath_json):
426 with open(filepath_json, "r", encoding="utf-8") as fh:
427 result = json.loads(fh.read())
428 assert isinstance(result, dict)
429 return result
430 return None
433 def toml_from_filepath(filepath_json: str) -> Optional[Dict[str, Any]]:
434 if os.path.exists(filepath_json):
435 with open(filepath_json, "r", encoding="utf-8") as fh:
436 return tomllib.loads(fh.read())
437 return None
440 def json_to_filepath(filepath_json: str, data: Any) -> None:
441 with open(filepath_json, "w", encoding="utf-8") as fh:
442 fh.write(json.dumps(data))
445 def pkg_make_obsolete_for_testing(local_dir: str, pkg_id: str) -> None:
446 import re
447 filepath = os.path.join(local_dir, pkg_id, PKG_MANIFEST_FILENAME_TOML)
448 # Weak! use basic matching to replace the version, not nice but OK as a debugging option.
449 with open(filepath, "r", encoding="utf-8") as fh:
450 data = fh.read()
452 def key_replace(match: re.Match[str]) -> str:
453 return "version = \"0.0.0\""
455 data = re.sub(r"^\s*version\s*=\s*\"[^\"]+\"", key_replace, data, flags=re.MULTILINE)
456 with open(filepath, "w", encoding="utf-8") as fh:
457 fh.write(data)
460 def pkg_manifest_dict_is_valid_or_error(
461 data: Dict[str, Any],
462 from_repo: bool,
463 strict: bool,
464 ) -> Optional[str]:
465 # Exception! In in general `cli` shouldn't be considered a Python module,
466 # it's validation function is handy to reuse.
467 from .cli.blender_ext import pkg_manifest_from_dict_and_validate
468 assert "id" in data
469 result = pkg_manifest_from_dict_and_validate(data, from_repo=from_repo, strict=strict)
470 if isinstance(result, str):
471 return result
472 return None
475 def pkg_manifest_dict_from_file_or_error(
476 filepath: str,
477 ) -> Union[Dict[str, Any], str]:
478 from .cli.blender_ext import pkg_manifest_from_archive_and_validate
479 result = pkg_manifest_from_archive_and_validate(filepath)
480 if isinstance(result, str):
481 return result
482 # Else convert the named-tuple into a dictionary.
483 result_dict = result._asdict()
484 assert isinstance(result_dict, dict)
485 return result_dict
488 def pkg_manifest_archive_url_abs_from_remote_url(remote_url: str, archive_url: str) -> str:
489 if archive_url.startswith("./"):
490 if (
491 len(remote_url) > len(PKG_REPO_LIST_FILENAME) and
492 remote_url.endswith(PKG_REPO_LIST_FILENAME) and
493 (remote_url[-(len(PKG_REPO_LIST_FILENAME) + 1)] in {"\\", "/"})
495 # The URL contains the JSON name, strip this off before adding the package name.
496 archive_url = remote_url[:-len(PKG_REPO_LIST_FILENAME)] + archive_url[2:]
497 elif remote_url.startswith(("http://", "https://", "file://")):
498 # Simply add to the URL.
499 archive_url = remote_url.rstrip("/") + archive_url[1:]
500 else:
501 # Handle as a regular path.
502 archive_url = os.path.join(remote_url, archive_url[2:])
503 return archive_url
506 def pkg_repo_cache_clear(local_dir: str) -> None:
507 local_cache_dir = os.path.join(local_dir, ".blender_ext", "cache")
508 if not os.path.isdir(local_cache_dir):
509 return
511 for entry in scandir_with_demoted_errors(local_cache_dir):
512 if entry.is_dir(follow_symlinks=False):
513 continue
514 if not entry.name.endswith(PKG_EXT):
515 continue
517 # Should never fail unless the file-system has permissions issues or corruption.
518 try:
519 os.unlink(entry.path)
520 except BaseException as ex:
521 print("Error: unlink", ex)
524 # -----------------------------------------------------------------------------
525 # Public Command Pool (non-command-line wrapper)
528 InfoItemCallable = Callable[[], Generator[InfoItemSeq, bool, None]]
531 class CommandBatchItem:
532 __slots__ = (
533 "fn_with_args",
534 "fn_iter",
535 "status",
536 "has_error",
537 "has_warning",
538 "msg_log",
539 "msg_log_len_last",
541 "msg_type",
542 "msg_info",
545 STATUS_NOT_YET_STARTED = 0
546 STATUS_RUNNING = 1
547 STATUS_COMPLETE = 2
549 def __init__(self, fn_with_args: InfoItemCallable):
550 self.fn_with_args = fn_with_args
551 self.fn_iter: Optional[Generator[InfoItemSeq, bool, None]] = None
552 self.status = CommandBatchItem.STATUS_NOT_YET_STARTED
553 self.has_error = False
554 self.has_warning = False
555 self.msg_log: List[Tuple[str, Any]] = []
556 self.msg_log_len_last = 0
557 self.msg_type = ""
558 self.msg_info = ""
560 def invoke(self) -> Generator[InfoItemSeq, bool, None]:
561 return self.fn_with_args()
564 class CommandBatch_ExecNonBlockingResult(NamedTuple):
565 # A message list for each command, aligned to `CommandBatchItem._batch`.
566 messages: Tuple[List[Tuple[str, str]], ...]
567 # When true, the status of all commands is `CommandBatchItem.STATUS_COMPLETE`.
568 all_complete: bool
569 # When true, `calc_status_data` will return a different result.
570 status_data_changed: bool
573 class CommandBatch_StatusFlag(NamedTuple):
574 flag: int
575 failure_count: int
576 count: int
579 class CommandBatch:
580 __slots__ = (
581 "title",
583 "_batch",
584 "_request_exit",
585 "_log_added_since_accessed",
588 def __init__(
589 self,
591 title: str,
592 batch: Sequence[InfoItemCallable],
594 self.title = title
595 self._batch = [CommandBatchItem(fn_with_args) for fn_with_args in batch]
596 self._request_exit = False
597 self._log_added_since_accessed = True
599 def _exec_blocking_single(
600 self,
601 report_fn: Callable[[str, str], None],
602 request_exit_fn: Callable[[], bool],
603 ) -> bool:
604 for cmd in self._batch:
605 assert cmd.fn_iter is None
606 cmd.fn_iter = cmd.invoke()
607 request_exit: Optional[bool] = None
608 while True:
609 try:
610 # Request `request_exit` starts of as None, then it's a boolean.
611 json_messages = cmd.fn_iter.send(request_exit) # type: ignore
612 except StopIteration:
613 break
615 for ty, msg in json_messages:
616 report_fn(ty, msg)
618 if request_exit is None:
619 request_exit = False
621 if request_exit is True:
622 break
623 if request_exit is None:
624 return True
625 return request_exit
627 def _exec_blocking_multi(
628 self,
630 report_fn: Callable[[str, str], None],
631 request_exit_fn: Callable[[], bool],
632 ) -> bool:
633 # TODO, concurrent execution.
634 return self._exec_blocking_single(report_fn, request_exit_fn)
636 def exec_blocking(
637 self,
638 report_fn: Callable[[str, str], None],
639 request_exit_fn: Callable[[], bool],
640 concurrent: bool,
641 ) -> bool:
642 # Blocking execution & finish.
643 if concurrent:
644 return self._exec_blocking_multi(
645 report_fn=report_fn,
646 request_exit_fn=request_exit_fn,
648 return self._exec_blocking_single(report_fn, request_exit_fn)
650 def exec_non_blocking(
651 self,
653 request_exit: bool,
654 ) -> CommandBatch_ExecNonBlockingResult:
656 Return the result of running multiple commands.
658 command_output: Tuple[List[Tuple[str, str]], ...] = tuple([] for _ in range(len(self._batch)))
660 if request_exit:
661 self._request_exit = True
663 status_data_changed = False
665 complete_count = 0
666 for cmd_index in reversed(range(len(self._batch))):
667 cmd = self._batch[cmd_index]
668 if cmd.status == CommandBatchItem.STATUS_COMPLETE:
669 complete_count += 1
670 continue
672 send_arg: Optional[bool] = self._request_exit
674 # First time initialization.
675 if cmd.fn_iter is None:
676 cmd.fn_iter = cmd.invoke()
677 cmd.status = CommandBatchItem.STATUS_RUNNING
678 status_data_changed = True
679 send_arg = None
681 try:
682 json_messages = cmd.fn_iter.send(send_arg) # type: ignore
683 except StopIteration:
684 # FIXME: This should not happen, we should get a "DONE" instead.
685 cmd.status = CommandBatchItem.STATUS_COMPLETE
686 complete_count += 1
687 status_data_changed = True
688 continue
690 if json_messages:
691 for ty, msg in json_messages:
692 self._log_added_since_accessed = True
694 cmd.msg_type = ty
695 cmd.msg_info = msg
696 if ty == 'DONE':
697 assert msg == ""
698 cmd.status = CommandBatchItem.STATUS_COMPLETE
699 complete_count += 1
700 status_data_changed = True
701 break
703 command_output[cmd_index].append((ty, msg))
704 if ty != 'PROGRESS':
705 if ty == 'ERROR':
706 if not cmd.has_error:
707 cmd.has_error = True
708 status_data_changed = True
709 elif ty == 'WARNING':
710 if not cmd.has_warning:
711 cmd.has_warning = True
712 status_data_changed = True
713 cmd.msg_log.append((ty, msg))
715 # Check if all are complete.
716 assert complete_count == len([cmd for cmd in self._batch if cmd.status == CommandBatchItem.STATUS_COMPLETE])
717 all_complete = (complete_count == len(self._batch))
718 return CommandBatch_ExecNonBlockingResult(
719 messages=command_output,
720 all_complete=all_complete,
721 status_data_changed=status_data_changed,
724 def calc_status_string(self) -> List[str]:
725 return [
726 "{:s}: {:s}".format(cmd.msg_type, cmd.msg_info)
727 for cmd in self._batch if (cmd.msg_type or cmd.msg_info)
730 def calc_status_data(self) -> CommandBatch_StatusFlag:
732 A single string for all commands
734 status_flag = 0
735 failure_count = 0
736 for cmd in self._batch:
737 status_flag |= 1 << cmd.status
738 if cmd.has_error or cmd.has_warning:
739 failure_count += 1
740 return CommandBatch_StatusFlag(
741 flag=status_flag,
742 failure_count=failure_count,
743 count=len(self._batch),
746 @staticmethod
747 def calc_status_text_icon_from_data(status_data: CommandBatch_StatusFlag, update_count: int) -> Tuple[str, str]:
748 # Generate a nice UI string for a status-bar & splash screen (must be short).
750 # NOTE: this is (arguably) UI logic, it's just nice to have it here
751 # as it avoids using low-level flags externally.
753 # FIXME: this text assumed a "sync" operation.
754 if status_data.failure_count == 0:
755 fail_text = ""
756 elif status_data.failure_count == status_data.count:
757 fail_text = ", failed"
758 else:
759 fail_text = ", some actions failed"
761 if status_data.flag == 1 << CommandBatchItem.STATUS_NOT_YET_STARTED:
762 return "Starting Extension Updates{:s}".format(fail_text), 'SORTTIME'
763 if status_data.flag == 1 << CommandBatchItem.STATUS_COMPLETE:
764 if update_count > 0:
765 # NOTE: the UI design in #120612 has the number of extensions available in icon.
766 # Include in the text as this is not yet supported.
767 return "Extensions Updates Available ({:d}){:s}".format(update_count, fail_text), 'INTERNET'
768 return "All Extensions Up-to-date{:s}".format(fail_text), 'CHECKMARK'
769 if status_data.flag & 1 << CommandBatchItem.STATUS_RUNNING:
770 return "Checking for Extension Updates{:s}".format(fail_text), 'SORTTIME'
772 # Should never reach this line!
773 return "Internal error, unknown state!{:s}".format(fail_text), 'ERROR'
775 def calc_status_log_or_none(self) -> Optional[List[Tuple[str, str]]]:
777 Return the log or None if there were no changes since the last call.
779 if self._log_added_since_accessed is False:
780 return None
781 self._log_added_since_accessed = False
783 return [
784 (ty, msg)
785 for cmd in self._batch
786 for ty, msg in (cmd.msg_log + ([(cmd.msg_type, cmd.msg_info)] if cmd.msg_type == 'PROGRESS' else []))
789 def calc_status_log_since_last_request_or_none(self) -> Optional[List[List[Tuple[str, str]]]]:
791 Return a list of new errors per command or None when none are found.
793 result: List[List[Tuple[str, str]]] = [[] for _ in range(len(self._batch))]
794 found = False
795 for cmd_index, cmd in enumerate(self._batch):
796 msg_log_len = len(cmd.msg_log)
797 if cmd.msg_log_len_last == msg_log_len:
798 continue
799 assert cmd.msg_log_len_last < msg_log_len
800 result[cmd_index] = cmd.msg_log[cmd.msg_log_len_last:]
801 cmd.msg_log_len_last = len(cmd.msg_log)
802 found = True
804 return result if found else None
807 # -----------------------------------------------------------------------------
808 # Public Repo Cache (non-command-line wrapper)
811 class _RepoCacheEntry:
812 __slots__ = (
813 "directory",
814 "remote_url",
816 "_pkg_manifest_local",
817 "_pkg_manifest_remote",
818 "_pkg_manifest_remote_mtime",
819 "_pkg_manifest_remote_has_warning"
822 def __init__(self, directory: str, remote_url: str) -> None:
823 assert directory != ""
824 self.directory = directory
825 self.remote_url = remote_url
826 # Manifest data per package loaded from the packages local JSON.
827 self._pkg_manifest_local: Optional[Dict[str, Dict[str, Any]]] = None
828 self._pkg_manifest_remote: Optional[Dict[str, Dict[str, Any]]] = None
829 self._pkg_manifest_remote_mtime = 0
830 # Avoid many noisy prints.
831 self._pkg_manifest_remote_has_warning = False
833 def _json_data_ensure(
834 self,
836 error_fn: Callable[[BaseException], None],
837 check_files: bool = False,
838 ignore_missing: bool = False,
839 ) -> Any:
840 if self._pkg_manifest_remote is not None:
841 if check_files:
842 self._json_data_refresh(error_fn=error_fn)
843 return self._pkg_manifest_remote
845 filepath_json = os.path.join(self.directory, REPO_LOCAL_JSON)
847 try:
848 self._pkg_manifest_remote = json_from_filepath(filepath_json)
849 except BaseException as ex:
850 self._pkg_manifest_remote = None
851 error_fn(ex)
853 self._pkg_manifest_local = None
854 if self._pkg_manifest_remote is not None:
855 json_mtime = file_mtime_or_none(filepath_json)
856 assert json_mtime is not None
857 self._pkg_manifest_remote_mtime = json_mtime
858 self._pkg_manifest_local = None
859 self._pkg_manifest_remote_has_warning = False
860 else:
861 if not ignore_missing:
862 # NOTE: this warning will occur when setting up a new repository.
863 # It could be removed but it's also useful to know when the JSON is missing.
864 if self.remote_url:
865 if not self._pkg_manifest_remote_has_warning:
866 print("Repository file:", filepath_json, "not found, sync required!")
867 self._pkg_manifest_remote_has_warning = True
869 return self._pkg_manifest_remote
871 def _json_data_refresh_from_toml(
872 self,
874 error_fn: Callable[[BaseException], None],
875 force: bool = False,
876 ) -> None:
877 assert self.remote_url == ""
878 # Since there is no remote repo the ID name is defined by the directory name only.
879 local_json_data = self.pkg_manifest_from_local_ensure(error_fn=error_fn)
880 if local_json_data is None:
881 return
883 filepath_json = os.path.join(self.directory, REPO_LOCAL_JSON)
885 # We might want to adjust where this happens, create the directory here
886 # because this could be a fresh repo might not have been initialized until now.
887 directory = os.path.dirname(filepath_json)
888 try:
889 # A symbolic-link that's followed (good), if it exists and is a file an error is raised here and returned.
890 if not os.path.isdir(directory):
891 os.makedirs(directory, exist_ok=True)
892 except BaseException as ex:
893 error_fn(ex)
894 return
895 del directory
897 with open(filepath_json, "w", encoding="utf-8") as fh:
898 # Indent because it can be useful to check this file if there are any issues.
900 # Begin: transform to list with ID's in item.
901 # TODO: this transform can probably be removed and the internal format can change
902 # to use the same structure as the actual JSON.
903 local_json_data_compat = {
904 "version": "v1",
905 "blocklist": [],
906 "data": [
907 {"id": pkg_idname, **value}
908 for pkg_idname, value in local_json_data.items()
911 # End: compatibility change.
913 fh.write(json.dumps(local_json_data_compat, indent=2))
915 def _json_data_refresh(
916 self,
918 error_fn: Callable[[BaseException], None],
919 force: bool = False,
920 ) -> None:
921 if force or (self._pkg_manifest_remote is None) or (self._pkg_manifest_remote_mtime == 0):
922 self._pkg_manifest_remote = None
923 self._pkg_manifest_remote_mtime = 0
924 self._pkg_manifest_local = None
926 # Detect a local-only repository, there is no server to sync with
927 # so generate the JSON from the TOML files.
928 # While redundant this avoids having support multiple code-paths for local-only/remote repos.
929 if self.remote_url == "":
930 self._json_data_refresh_from_toml(error_fn=error_fn, force=force)
932 filepath_json = os.path.join(self.directory, REPO_LOCAL_JSON)
933 mtime_test = file_mtime_or_none(filepath_json)
934 if self._pkg_manifest_remote is not None:
935 # TODO: check the time of every installed package.
936 if mtime_test == self._pkg_manifest_remote_mtime:
937 return
939 try:
940 self._pkg_manifest_remote = json_from_filepath(filepath_json)
941 except BaseException as ex:
942 self._pkg_manifest_remote = None
943 error_fn(ex)
945 self._pkg_manifest_local = None
946 if self._pkg_manifest_remote is not None:
947 json_mtime = file_mtime_or_none(filepath_json)
948 assert json_mtime is not None
949 self._pkg_manifest_remote_mtime = json_mtime
951 def pkg_manifest_from_local_ensure(
952 self,
954 error_fn: Callable[[BaseException], None],
955 ignore_missing: bool = False,
956 ) -> Optional[Dict[str, Dict[str, Any]]]:
957 # Important for local-only repositories (where the directory name defines the ID).
958 has_remote = self.remote_url != ""
960 if self._pkg_manifest_local is None:
961 self._json_data_ensure(
962 ignore_missing=ignore_missing,
963 error_fn=error_fn,
965 pkg_manifest_local = {}
966 try:
967 dir_entries = os.scandir(self.directory)
968 except BaseException as ex:
969 dir_entries = None
970 error_fn(ex)
972 for entry in (dir_entries if dir_entries is not None else ()):
973 # Only check directories.
974 if not entry.is_dir(follow_symlinks=True):
975 continue
977 filename = entry.name
979 # Simply ignore these paths without any warnings (accounts for `.git`, `__pycache__`, etc).
980 if filename.startswith((".", "_")):
981 continue
983 # Report any paths that cannot be used.
984 if not filename.isidentifier():
985 error_fn(Exception("\"{:s}\" is not a supported module name, skipping".format(
986 os.path.join(self.directory, filename)
988 continue
990 filepath_toml = os.path.join(self.directory, filename, PKG_MANIFEST_FILENAME_TOML)
991 try:
992 item_local = toml_from_filepath(filepath_toml)
993 except BaseException as ex:
994 item_local = None
995 error_fn(ex)
997 if item_local is None:
998 continue
1000 pkg_idname = item_local["id"]
1001 if has_remote:
1002 # This should never happen, the user may have manually renamed a directory.
1003 if pkg_idname != filename:
1004 print("Skipping package with inconsistent name: \"{:s}\" mismatch \"{:s}\"".format(
1005 filename,
1006 pkg_idname,
1008 continue
1009 else:
1010 pkg_idname = filename
1012 # Validate so local-only packages with invalid manifests aren't used.
1013 if (error_str := pkg_manifest_dict_is_valid_or_error(item_local, from_repo=False, strict=False)):
1014 error_fn(Exception(error_str))
1015 continue
1017 pkg_manifest_local[pkg_idname] = item_local
1018 self._pkg_manifest_local = pkg_manifest_local
1019 return self._pkg_manifest_local
1021 def pkg_manifest_from_remote_ensure(
1022 self,
1024 error_fn: Callable[[BaseException], None],
1025 ignore_missing: bool = False,
1026 ) -> Optional[Dict[str, Dict[str, Any]]]:
1027 if self._pkg_manifest_remote is None:
1028 self._json_data_ensure(
1029 ignore_missing=ignore_missing,
1030 error_fn=error_fn,
1032 return self._pkg_manifest_remote
1034 def force_local_refresh(self) -> None:
1035 self._pkg_manifest_local = None
1038 class RepoCacheStore:
1039 __slots__ = (
1040 "_repos",
1041 "_is_init",
1044 def __init__(self) -> None:
1045 self._repos: List[_RepoCacheEntry] = []
1046 self._is_init = False
1048 def is_init(self) -> bool:
1049 return self._is_init
1051 def refresh_from_repos(
1052 self, *,
1053 repos: List[Tuple[str, str]],
1054 force: bool = False,
1055 ) -> None:
1057 Initialize or update repositories.
1059 repos_prev = {}
1060 if not force:
1061 for repo_entry in self._repos:
1062 repos_prev[repo_entry.directory, repo_entry.remote_url] = repo_entry
1063 self._repos.clear()
1065 for directory, remote_url in repos:
1066 repo_entry_test = repos_prev.get((directory, remote_url))
1067 if repo_entry_test is None:
1068 repo_entry_test = _RepoCacheEntry(directory, remote_url)
1069 self._repos.append(repo_entry_test)
1070 self._is_init = True
1072 def refresh_remote_from_directory(
1073 self,
1074 directory: str,
1076 error_fn: Callable[[BaseException], None],
1077 force: bool = False,
1078 ) -> None:
1079 for repo_entry in self._repos:
1080 if directory == repo_entry.directory:
1081 repo_entry._json_data_refresh(force=force, error_fn=error_fn)
1082 return
1083 raise ValueError("Directory {:s} not a known repo".format(directory))
1085 def refresh_local_from_directory(
1086 self,
1087 directory: str,
1089 error_fn: Callable[[BaseException], None],
1090 ignore_missing: bool = False,
1091 directory_subset: Optional[Set[str]] = None,
1092 ) -> Optional[Dict[str, Dict[str, Any]]]:
1093 for repo_entry in self._repos:
1094 if directory == repo_entry.directory:
1095 # Force refresh.
1096 repo_entry.force_local_refresh()
1097 return repo_entry.pkg_manifest_from_local_ensure(
1098 ignore_missing=ignore_missing,
1099 error_fn=error_fn,
1101 raise ValueError("Directory {:s} not a known repo".format(directory))
1103 def pkg_manifest_from_remote_ensure(
1104 self,
1106 error_fn: Callable[[BaseException], None],
1107 check_files: bool = False,
1108 ignore_missing: bool = False,
1109 directory_subset: Optional[Set[str]] = None,
1110 ) -> Generator[Optional[Dict[str, Dict[str, Any]]], None, None]:
1111 for repo_entry in self._repos:
1112 if directory_subset is not None:
1113 if repo_entry.directory not in directory_subset:
1114 continue
1116 json_data = repo_entry._json_data_ensure(
1117 check_files=check_files,
1118 ignore_missing=ignore_missing,
1119 error_fn=error_fn,
1121 if json_data is None:
1122 # The repository may be fresh, not yet initialized.
1123 yield None
1124 else:
1125 pkg_manifest_remote = {}
1126 # "data" should always exist, it's not the purpose of this function to fully validate though.
1127 json_items = json_data.get("data")
1128 if json_items is None:
1129 error_fn(ValueError("JSON was missing \"data\" key"))
1130 yield None
1131 else:
1132 for item_remote in json_items:
1133 # TODO(@ideasman42): we may want to include the "id", as part of moving to a new format
1134 # the "id" used not to be part of each item so users of this API assume it's not.
1135 # The `item_remote` could be used in-place however that needs further testing.
1136 item_remove_copy = item_remote.copy()
1137 pkg_idname = item_remove_copy.pop("id")
1138 pkg_manifest_remote[pkg_idname] = item_remove_copy
1139 yield pkg_manifest_remote
1141 def pkg_manifest_from_local_ensure(
1142 self,
1144 error_fn: Callable[[BaseException], None],
1145 check_files: bool = False,
1146 directory_subset: Optional[Set[str]] = None,
1147 ) -> Generator[Optional[Dict[str, Dict[str, Any]]], None, None]:
1148 for repo_entry in self._repos:
1149 if directory_subset is not None:
1150 if repo_entry.directory not in directory_subset:
1151 continue
1152 if check_files:
1153 repo_entry.force_local_refresh()
1154 yield repo_entry.pkg_manifest_from_local_ensure(error_fn=error_fn)
1156 def clear(self) -> None:
1157 self._repos.clear()
1158 self._is_init = False
1161 # -----------------------------------------------------------------------------
1162 # Public Repo Lock
1166 class RepoLock:
1168 Lock multiple repositories, one or all may fail,
1169 it's up to the caller to check.
1171 Access via the ``RepoLockContext`` where possible to avoid the lock being left held.
1173 __slots__ = (
1174 "_repo_directories",
1175 "_repo_lock_files",
1176 "_cookie",
1177 "_held",
1180 def __init__(self, *, repo_directories: Sequence[str], cookie: str):
1182 :arg repo_directories:
1183 Directories to attempt to lock.
1184 :arg cookie:
1185 A path which is used as a reference.
1186 It must point to a path that exists.
1187 When a lock exists, check if the cookie path exists, if it doesn't, allow acquiring the lock.
1189 self._repo_directories = tuple(repo_directories)
1190 self._repo_lock_files: List[Tuple[str, str]] = []
1191 self._held = False
1192 self._cookie = cookie
1194 def __del__(self) -> None:
1195 if not self._held:
1196 return
1197 sys.stderr.write("{:s}: freed without releasing lock!".format(type(self).__name__))
1199 @staticmethod
1200 def _is_locked_with_stale_cookie_removal(local_lock_file: str, cookie: str) -> Optional[str]:
1201 if os.path.exists(local_lock_file):
1202 try:
1203 with open(local_lock_file, "r", encoding="utf8") as fh:
1204 data = fh.read()
1205 except BaseException as ex:
1206 return "lock file could not be read: {:s}".format(str(ex))
1208 # The lock is held.
1209 if os.path.exists(data):
1210 if data == cookie:
1211 return "lock is already held by this session"
1212 return "lock is held by other session: {:s}".format(data)
1214 # The lock is held (but stale), remove it.
1215 try:
1216 os.remove(local_lock_file)
1217 except BaseException as ex:
1218 return "lock file could not be removed: {:s}".format(str(ex))
1219 return None
1221 def acquire(self) -> Dict[str, Optional[str]]:
1223 Return directories and the lock status,
1224 with None if locking succeeded.
1226 if self._held:
1227 raise Exception("acquire(): called with an existing lock!")
1228 if not os.path.exists(self._cookie):
1229 raise Exception("acquire(): cookie doesn't exist! (when it should)")
1231 # Assume all succeed.
1232 result: Dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
1233 for directory in self._repo_directories:
1234 local_private_dir = os.path.join(directory, REPO_LOCAL_PRIVATE_DIR)
1236 # This most likely exists, create if it doesn't.
1237 if not os.path.isdir(local_private_dir):
1238 os.makedirs(local_private_dir)
1240 local_lock_file = os.path.join(local_private_dir, REPO_LOCAL_PRIVATE_LOCK)
1241 # Attempt to get the lock, kick out stale locks.
1242 if (lock_msg := self._is_locked_with_stale_cookie_removal(local_lock_file, self._cookie)) is not None:
1243 result[directory] = "Lock exists: {:s}".format(lock_msg)
1244 continue
1245 try:
1246 with open(local_lock_file, "w", encoding="utf8") as fh:
1247 fh.write(self._cookie)
1248 except BaseException as ex:
1249 result[directory] = "Lock could not be created: {:s}".format(str(ex))
1250 # Remove if it was created (but failed to write)... disk-full?
1251 try:
1252 os.remove(local_lock_file)
1253 except BaseException:
1254 pass
1255 continue
1257 # Success, the file is locked.
1258 self._repo_lock_files.append((directory, local_lock_file))
1259 self._held = True
1260 return result
1262 def release(self) -> Dict[str, Optional[str]]:
1263 # NOTE: lots of error checks here, mostly to give insights in the very unlikely case this fails.
1264 if not self._held:
1265 raise Exception("release(): called without a lock!")
1267 result: Dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
1268 for directory, local_lock_file in self._repo_lock_files:
1269 if not os.path.exists(local_lock_file):
1270 result[directory] = "release(): lock missing when expected, continuing."
1271 continue
1272 try:
1273 with open(local_lock_file, "r", encoding="utf8") as fh:
1274 data = fh.read()
1275 except BaseException as ex:
1276 result[directory] = "release(): lock file could not be read: {:s}".format(str(ex))
1277 continue
1278 # Owned by another application, this shouldn't happen.
1279 if data != self._cookie:
1280 result[directory] = "release(): lock was unexpectedly stolen by another program: {:s}".format(data)
1281 continue
1283 # This is our lock file, we're allowed to remove it!
1284 try:
1285 os.remove(local_lock_file)
1286 except BaseException as ex:
1287 result[directory] = "release(): failed to remove file {!r}".format(ex)
1289 self._held = False
1290 return result
1293 class RepoLockContext:
1294 __slots__ = (
1295 "_repo_lock",
1298 def __init__(self, *, repo_directories: Sequence[str], cookie: str):
1299 self._repo_lock = RepoLock(repo_directories=repo_directories, cookie=cookie)
1301 def __enter__(self) -> Dict[str, Optional[str]]:
1302 return self._repo_lock.acquire()
1304 def __exit__(self, _ty: Any, _value: Any, _traceback: Any) -> None:
1305 self._repo_lock.release()