3 Generate a list of packages that should be updated from Debian.
4 It uses the the Ultimate Debian Database (UDD) to get a
5 list of version in the Debian repository.
7 By default it will get the last successful build manifest file from testing or stable from here:
8 https://nightly.tails.boum.org/build_Tails_ISO_testing/lastSuccessful/archive/build-artifacts/
9 https://nightly.tails.boum.org/build_Tails_ISO_stable/lastSuccessful/archive/build-artifacts/
20 from collections.abc import Iterator, Sequence
21 from dataclasses import dataclass
23 import debian.debian_support
24 import distro_info # type: ignore[import]
29 logger = logging.getLogger()
32 class Version(debian.debian_support.Version):
33 def _compare(self, other):
34 assert isinstance(other, type(self))
35 return super()._compare(other)
38 def partition(pred, iterable):
39 "Use a predicate to partition entries into false entries and true entries"
40 # partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
51 return non_match, match
54 class NoBuildManifest(Exception):
58 def add_metadata(yml, name):
59 yml["file_name"] = name
60 match = re.match(r".*@([0-9a-f]+)-([0-9T]+Z).build-manifest", name)
62 yml["git_hash"] = match.group(1)
63 yml["timestamp"] = match.group(2)
66 def get_build_manifest(suite: str) -> dict:
67 base_url = f"https://nightly.tails.boum.org/build_Tails_ISO_{suite}/lastSuccessful/archive/build-artifacts/"
69 shasum_response = requests.get(
70 urllib.parse.urljoin(base_url, "tails-build-artifacts.shasum"),
75 shasum_response.raise_for_status()
76 except requests.HTTPError as e:
77 raise NoBuildManifest(f"build-manifest file for {suite} not found!") from e
79 for i in shasum_response.text.splitlines():
82 if name.endswith(".build-manifest"):
83 url = urllib.parse.urljoin(base_url, name)
84 bm_response = requests.get(url, timeout=60)
85 ret = yaml.safe_load(bm_response.text) # type: dict
87 add_metadata(ret, name)
90 raise NoBuildManifest(f"build-manifest file for {suite} not found!")
93 class NotFoundError(Exception):
105 source_version: Version
109 """to manage output of the Ultimate Debian Database (UDD).
110 Normally you give a list of packages you want to check and get the versions on different suites.
113 def __init__(self, packages: Sequence[str], suites: Sequence[str]):
115 self.packages = self._request(packages)
117 def _request(self, packages: Sequence[str]) -> dict[str, dict[str, UDDPackageRow]]:
118 ret: dict[str, dict[str, UDDPackageRow]] = collections.defaultdict(dict)
119 with psycopg2.connect(
120 "postgresql://udd-mirror:udd-mirror@udd-mirror.debian.net/udd",
121 ) as conn, conn.cursor() as curs:
122 archs = ("all", "amd64")
123 components = ("main", "contrib", "non-free", "non-free-firmware")
124 pkgs = tuple(packages)
125 fields = ", ".join(UDDPackageRow.__dataclass_fields__.keys())
128 f"SELECT {fields}" # noqa: S608 (derived from data hard-coded in this script)
130 " WHERE distribution = 'debian'"
132 " and architecture in %s"
133 " and component in %s"
134 " and package in %s;",
135 (self.suites, archs, components, pkgs),
138 row = UDDPackageRow(*r)
139 row.version = Version(row.version)
140 row.source_version = Version(row.source_version)
141 if row.release in ret[row.package]:
142 if ret[row.package][row.release].version > row.version:
144 ret[row.package][row.release] = row
147 def package(self, name: str) -> dict[str, UDDPackageRow]:
148 return self.packages[name]
150 def source(self, name: str, suite: str) -> str:
151 package = self.package(name)
153 if suite not in package:
154 raise NotFoundError(f"{name} not found in {suite}")
156 return package[suite].source
158 def source_version(self, name: str, suite: str) -> Version:
159 package = self.package(name)
161 if suite not in package:
162 raise NotFoundError(f"{name} not found in {suite}")
164 return package[suite].source_version
166 def version(self, name: str, suite: str) -> Version:
167 package = self.package(name)
169 if suite not in package:
170 raise NotFoundError(f"{name} not found in {suite}")
172 return package[suite].version
174 def get_debian_version(self, name: str, version: Version) -> tuple:
175 for suite in self.suites:
177 suite_version = self.version(name, suite)
178 except NotFoundError:
180 if version <= suite_version:
181 return (suite, suite_version)
183 error_msg = f"{name}: the package version({version}) is higher than the version on {suite} ({suite_version})"
184 # Raised if suite_version is not defined, meaning that the package was not found in any of the suites
186 error_msg = f"{name}: the package was not found in any of the Debian suites"
188 raise NotFoundError(error_msg)
190 def packages_by_source(self, source: str, suite: str) -> set[str]:
192 for name, pkg in self.packages.items():
196 if p.source == source:
201 def strip_tails_version(version: str) -> Version:
202 """if we have a Tails own fork get the Debian version."""
203 match = re.match(r"^(.*)(\.0tails[0-9]+)$", version)
205 return Version(match[1])
207 return Version(version)
211 class NewVersionIssue:
216 suite_version: Version
217 suite_source_version: Version
220 binaries = getattr(self, "log_binaries", None)
222 binaries = ", ".join(binaries)
223 return f"{self.source}[{binaries}] ({self.version}) to Debian {self.suite} ({self.suite_source_version})"
224 return f"{self.source} ({self.version}) to Debian {self.suite} ({self.suite_source_version})"
226 def tails_fork(self):
227 return re.search(".0tails[0-9]+$", str(self.version)) is not None
230 def get_udd(package_dict: dict[str, Version], suites: tuple[str]) -> UDD:
231 return UDD(package_dict.keys(), suites) # type: ignore[arg-type]
234 def get_issues(udd: UDD, package_dict: dict[str, Version]) -> Iterator[NewVersionIssue]:
235 """Get a issue list of updateable packages."""
236 for package, version in package_dict.items():
237 striped_version = strip_tails_version(str(version))
239 suite, suite_version = udd.get_debian_version(package, striped_version)
240 except NotFoundError as e:
244 if striped_version < suite_version:
245 issue = NewVersionIssue(
247 udd.source(package, suite),
251 udd.source_version(package, suite),
253 if issue.tails_fork() and striped_version >= issue.suite_source_version:
258 def check_build_manifest(
259 build_manifest: dict,
264 ignore = config.get("ignore", {})
265 general_ignore = ignore.get("general", [])
266 tmp_ignore = ignore.get("temporary", {})
268 pkg_dict: dict[str, Version] = {}
269 for pkg in build_manifest["packages"]["binary"]:
271 v = Version(pkg["version"])
274 "%(p)s: multiple entries, so only considering max(%(v)s, %(seen)s)",
281 if pkg_dict.get(p) > v:
283 "%(p)s: %(seen)s > %(v)s, so ignoring %(v)s",
286 "seen": pkg_dict.get(p),
293 udd = get_udd(pkg_dict, suites)
295 issues = list(get_issues(udd, pkg_dict))
297 def _is_ignored(issue):
298 if issue.source in general_ignore:
301 str(issue.suite_source_version)
302 == tmp_ignore.get(issue.source, {"version": None})["version"]
308 issues = list(itertools.filterfalse(_is_ignored, issues))
310 non_forked, forked = partition(
311 operator.attrgetter("tails_fork"),
315 def _log_issue(issue):
316 if _is_ignored(issue):
317 if issue.source in general_ignore:
318 return f"(always ignored) {issue}"
319 return f"(known) {issue}"
323 def log_group(source, issues):
326 names = {i.name for i in issues}
327 if names != udd.packages_by_source(source, suite):
328 issue.log_binaries = names
330 return _log_issue(issue)
333 for source, i in itertools.groupby(
335 key=operator.attrgetter("source"),
337 yield log_group(source, list(i))
340 line = "\n - ".join(sorted(_log(forked)))
341 logger.info(f"Need to upgrade our own forked package:\n - {line}")
344 line = "\n - ".join(sorted(_log(non_forked)))
345 logger.info(f"Need to upgrade to a new APT snapshot:\n - {line}")
347 # Check if we have at least one non ignored issue
349 next(itertools.filterfalse(_is_ignored, issues))
351 except StopIteration:
357 def get_suites(min_codename: str) -> list:
359 ddi = distro_info.DebianDistroInfo()
360 ddi_s = ddi.supported()
361 codename_pos = ddi_s.index(min_codename)
362 testing = ddi.testing()
363 for suite in ddi_s[codename_pos:]:
364 if suite in (testing, "sid", "experimental"):
367 # We always want to make sure that we have the stable-security
368 # version installed, if available.
369 # The rest of the list follows the Debian package flow.
375 f"{suite}-proposed-updates",
376 f"{suite}-backports",
386 format="%(levelname)s: %(message)s",
388 logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
390 parser = argparse.ArgumentParser(description="list all packages that ")
391 parser.add_argument("-v", "--verbose", action="store_true", help="Give more infos")
392 parser.add_argument("--debug", action="store_true", help="Show all debug messages")
396 type=argparse.FileType("r"),
397 default="config/ci/needed-package-updates.yml",
400 group = parser.add_mutually_exclusive_group()
401 group.add_argument("--suite", help="build manifest suite name.")
402 group.add_argument("--file", type=argparse.FileType("r"), help="local file name.")
404 args = parser.parse_args()
406 logger.setLevel(logging.DEBUG if args.debug else logging.INFO)
411 config = yaml.safe_load(args.config)
413 suites = tuple(get_suites(config.get("distribution")))
416 build_manifest = yaml.safe_load(args.file)
417 add_metadata(build_manifest, args.file.name)
418 logger.info("Check local file %s", build_manifest["file_name"])
420 build_manifest = get_build_manifest(args.suite)
421 logger.info("Check %s", build_manifest["file_name"])
424 for suite in ("testing", "stable"):
426 build_manifest = get_build_manifest(suite)
427 logger.info("Check %s", build_manifest["file_name"])
429 except NoBuildManifest as e:
430 logger.debug("No build manifest found for %s.", suite)
435 propose_update = check_build_manifest(build_manifest, config, suites, args.verbose)
440 logger.debug("Nothing to do.")
443 if __name__ == "__main__":