Merge remote-tracking branch 'origin/stable' into devel
[tails.git] / bin / needed-package-updates
blob016c189386badeaa78f447ea357c85251407d02d
1 #!/usr/bin/env python3
2 """
3 Generate a list of packages that should be updated from Debian.
4 It uses the the Ultimate Debian Database (UDD) to get a
5 list of version in the Debian repository.
7 By default it will get the last successful build manifest file from testing or stable from here:
8 https://nightly.tails.boum.org/build_Tails_ISO_testing/lastSuccessful/archive/build-artifacts/
9 https://nightly.tails.boum.org/build_Tails_ISO_stable/lastSuccessful/archive/build-artifacts/
10 """
12 import argparse
13 import collections
14 import itertools
15 import logging
16 import operator
17 import re
18 import sys
19 import urllib
20 from collections.abc import Iterator, Sequence
21 from dataclasses import dataclass
23 import debian.debian_support
24 import distro_info  # type: ignore[import]
25 import psycopg2
26 import requests
27 import yaml
29 logger = logging.getLogger()
32 class Version(debian.debian_support.Version):
33     def _compare(self, other):
34         assert isinstance(other, type(self))
35         return super()._compare(other)
38 def partition(pred, iterable):
39     "Use a predicate to partition entries into false entries and true entries"
40     # partition(is_odd, range(10)) --> 0 2 4 6 8   and  1 3 5 7 9
42     match = []
43     non_match = []
45     for i in iterable:
46         if pred(i)():
47             match.append(i)
48         else:
49             non_match.append(i)
51     return non_match, match
54 class NoBuildManifest(Exception):
55     pass
58 def add_metadata(yml, name):
59     yml["file_name"] = name
60     match = re.match(r".*@([0-9a-f]+)-([0-9T]+Z).build-manifest", name)
61     if match:
62         yml["git_hash"] = match.group(1)
63         yml["timestamp"] = match.group(2)
66 def get_build_manifest(suite: str) -> dict:
67     base_url = f"https://nightly.tails.boum.org/build_Tails_ISO_{suite}/lastSuccessful/archive/build-artifacts/"
69     shasum_response = requests.get(
70         urllib.parse.urljoin(base_url, "tails-build-artifacts.shasum"),
71         timeout=10,
72     )
74     try:
75         shasum_response.raise_for_status()
76     except requests.HTTPError as e:
77         raise NoBuildManifest(f"build-manifest file for {suite} not found!") from e
79     for i in shasum_response.text.splitlines():
80         _, name = i.split()
82         if name.endswith(".build-manifest"):
83             url = urllib.parse.urljoin(base_url, name)
84             bm_response = requests.get(url, timeout=60)
85             ret = yaml.safe_load(bm_response.text)  # type: dict
86             ret["url"] = url
87             add_metadata(ret, name)
88             return ret
90     raise NoBuildManifest(f"build-manifest file for {suite} not found!")
93 class NotFoundError(Exception):
94     pass
97 @dataclass
98 class UDDPackageRow:
99     package: str
100     version: Version
101     release: str
102     component: str
103     distribution: str
104     source: str
105     source_version: Version
108 class UDD:
109     """to manage output of the Ultimate Debian Database (UDD).
110     Normally you give a list of packages you want to check and get the versions on different suites.
111     """
113     def __init__(self, packages: Sequence[str], suites: Sequence[str]):
114         self.suites = suites
115         self.packages = self._request(packages)
117     def _request(self, packages: Sequence[str]) -> dict[str, dict[str, UDDPackageRow]]:
118         ret: dict[str, dict[str, UDDPackageRow]] = collections.defaultdict(dict)
119         with psycopg2.connect(
120             "postgresql://udd-mirror:udd-mirror@udd-mirror.debian.net/udd",
121         ) as conn, conn.cursor() as curs:
122             archs = ("all", "amd64")
123             components = ("main", "contrib", "non-free", "non-free-firmware")
124             pkgs = tuple(packages)
125             fields = ", ".join(UDDPackageRow.__dataclass_fields__.keys())
127             curs.execute(
128                 f"SELECT {fields}"  # noqa: S608 (derived from data hard-coded in this script)
129                 " FROM packages"
130                 " WHERE distribution = 'debian'"
131                 " and release in %s"
132                 " and architecture in %s"
133                 " and component in %s"
134                 " and package in %s;",
135                 (self.suites, archs, components, pkgs),
136             )
137             for r in curs:
138                 row = UDDPackageRow(*r)
139                 row.version = Version(row.version)
140                 row.source_version = Version(row.source_version)
141                 if row.release in ret[row.package]:
142                     if ret[row.package][row.release].version > row.version:
143                         continue
144                 ret[row.package][row.release] = row
145         return ret
147     def package(self, name: str) -> dict[str, UDDPackageRow]:
148         return self.packages[name]
150     def source(self, name: str, suite: str) -> str:
151         package = self.package(name)
153         if suite not in package:
154             raise NotFoundError(f"{name} not found in {suite}")
156         return package[suite].source
158     def source_version(self, name: str, suite: str) -> Version:
159         package = self.package(name)
161         if suite not in package:
162             raise NotFoundError(f"{name} not found in {suite}")
164         return package[suite].source_version
166     def version(self, name: str, suite: str) -> Version:
167         package = self.package(name)
169         if suite not in package:
170             raise NotFoundError(f"{name} not found in {suite}")
172         return package[suite].version
174     def get_debian_version(self, name: str, version: Version) -> tuple:
175         for suite in self.suites:
176             try:
177                 suite_version = self.version(name, suite)
178             except NotFoundError:
179                 continue
180             if version <= suite_version:
181                 return (suite, suite_version)
182         try:
183             error_msg = f"{name}: the package version({version}) is higher than the version on {suite} ({suite_version})"
184         # Raised if suite_version is not defined, meaning that the package was not found in any of the suites
185         except NameError:
186             error_msg = f"{name}: the package was not found in any of the Debian suites"
188         raise NotFoundError(error_msg)
190     def packages_by_source(self, source: str, suite: str) -> set[str]:
191         ret = set()
192         for name, pkg in self.packages.items():
193             if suite not in pkg:
194                 continue
195             p = pkg[suite]
196             if p.source == source:
197                 ret.add(name)
198         return ret
201 def strip_tails_version(version: str) -> Version:
202     """if we have a Tails own fork get the Debian version."""
203     match = re.match(r"^(.*)(\.0tails[0-9]+)$", version)
204     if match:
205         return Version(match[1])
206     else:
207         return Version(version)
210 @dataclass
211 class NewVersionIssue:
212     name: str
213     source: str
214     version: Version
215     suite: str
216     suite_version: Version
217     suite_source_version: Version
219     def __str__(self):
220         binaries = getattr(self, "log_binaries", None)
221         if binaries:
222             binaries = ", ".join(binaries)
223             return f"{self.source}[{binaries}] ({self.version}) to Debian {self.suite} ({self.suite_source_version})"
224         return f"{self.source} ({self.version}) to Debian {self.suite} ({self.suite_source_version})"
226     def tails_fork(self):
227         return re.search(".0tails[0-9]+$", str(self.version)) is not None
230 def get_udd(package_dict: dict[str, Version], suites: tuple[str]) -> UDD:
231     return UDD(package_dict.keys(), suites)  # type: ignore[arg-type]
234 def get_issues(udd: UDD, package_dict: dict[str, Version]) -> Iterator[NewVersionIssue]:
235     """Get a issue list of updateable packages."""
236     for package, version in package_dict.items():
237         striped_version = strip_tails_version(str(version))
238         try:
239             suite, suite_version = udd.get_debian_version(package, striped_version)
240         except NotFoundError as e:
241             logger.error(e)
242             continue
244         if striped_version < suite_version:
245             issue = NewVersionIssue(
246                 package,
247                 udd.source(package, suite),
248                 version,
249                 suite,
250                 suite_version,
251                 udd.source_version(package, suite),
252             )
253             if issue.tails_fork() and striped_version >= issue.suite_source_version:
254                 continue
255             yield issue
258 def check_build_manifest(
259     build_manifest: dict,
260     config: dict,
261     suites: tuple[str],
262     verbose: bool,
263 ) -> bool:
264     ignore = config.get("ignore", {})
265     general_ignore = ignore.get("general", [])
266     tmp_ignore = ignore.get("temporary", {})
268     pkg_dict: dict[str, Version] = {}
269     for pkg in build_manifest["packages"]["binary"]:
270         p = pkg["package"]
271         v = Version(pkg["version"])
272         if pkg_dict.get(p):
273             logger.debug(
274                 "%(p)s: multiple entries, so only considering max(%(v)s, %(seen)s)",
275                 {
276                     "p": p,
277                     "v": v,
278                     "seen": pkg_dict[p],
279                 },
280             )
281             if pkg_dict.get(p) > v:
282                 logger.debug(
283                     "%(p)s: %(seen)s > %(v)s, so ignoring %(v)s",
284                     {
285                         "p": p,
286                         "seen": pkg_dict.get(p),
287                         "v": v,
288                     },
289                 )
290                 continue
291         pkg_dict[p] = v
293     udd = get_udd(pkg_dict, suites)
295     issues = list(get_issues(udd, pkg_dict))
297     def _is_ignored(issue):
298         if issue.source in general_ignore:
299             return True
300         if (
301             str(issue.suite_source_version)
302             == tmp_ignore.get(issue.source, {"version": None})["version"]
303         ):
304             return True
305         return False
307     if not verbose:
308         issues = list(itertools.filterfalse(_is_ignored, issues))
310     non_forked, forked = partition(
311         operator.attrgetter("tails_fork"),
312         issues,
313     )
315     def _log_issue(issue):
316         if _is_ignored(issue):
317             if issue.source in general_ignore:
318                 return f"(always ignored) {issue}"
319             return f"(known) {issue}"
320         else:
321             return str(issue)
323     def log_group(source, issues):
324         issue = issues[0]
325         suite = issue.suite
326         names = {i.name for i in issues}
327         if names != udd.packages_by_source(source, suite):
328             issue.log_binaries = names
330         return _log_issue(issue)
332     def _log(issues):
333         for source, i in itertools.groupby(
334             issues,
335             key=operator.attrgetter("source"),
336         ):
337             yield log_group(source, list(i))
339     if forked:
340         line = "\n  - ".join(sorted(_log(forked)))
341         logger.info(f"Need to upgrade our own forked package:\n  - {line}")
343     if non_forked:
344         line = "\n  - ".join(sorted(_log(non_forked)))
345         logger.info(f"Need to upgrade to a new APT snapshot:\n  - {line}")
347     # Check if we have at least one non ignored issue
348     try:
349         next(itertools.filterfalse(_is_ignored, issues))
350         return True
351     except StopIteration:
352         pass
354     return False
357 def get_suites(min_codename: str) -> list:
358     suites = []
359     ddi = distro_info.DebianDistroInfo()
360     ddi_s = ddi.supported()
361     codename_pos = ddi_s.index(min_codename)
362     testing = ddi.testing()
363     for suite in ddi_s[codename_pos:]:
364         if suite in (testing, "sid", "experimental"):
365             suites.append(suite)
366         else:
367             # We always want to make sure that we have the stable-security
368             # version installed, if available.
369             # The rest of the list follows the Debian package flow.
370             suites.extend(
371                 [
372                     f"{suite}-security",
373                     suite,
374                     f"{suite}-updates",
375                     f"{suite}-proposed-updates",
376                     f"{suite}-backports",
377                 ],
378             )
380     return suites
383 def main():
384     logging.basicConfig(
385         level=logging.DEBUG,
386         format="%(levelname)s: %(message)s",
387     )
388     logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
390     parser = argparse.ArgumentParser(description="list all packages that ")
391     parser.add_argument("-v", "--verbose", action="store_true", help="Give more infos")
392     parser.add_argument("--debug", action="store_true", help="Show all debug messages")
393     parser.add_argument(
394         "-c",
395         "--config",
396         type=argparse.FileType("r"),
397         default="config/ci/needed-package-updates.yml",
398         help="Config file",
399     )
400     group = parser.add_mutually_exclusive_group()
401     group.add_argument("--suite", help="build manifest suite name.")
402     group.add_argument("--file", type=argparse.FileType("r"), help="local file name.")
404     args = parser.parse_args()
406     logger.setLevel(logging.DEBUG if args.debug else logging.INFO)
408     if args.debug:
409         args.verbose = True
411     config = yaml.safe_load(args.config)
413     suites = tuple(get_suites(config.get("distribution")))
415     if args.file:
416         build_manifest = yaml.safe_load(args.file)
417         add_metadata(build_manifest, args.file.name)
418         logger.info("Check local file %s", build_manifest["file_name"])
419     elif args.suite:
420         build_manifest = get_build_manifest(args.suite)
421         logger.info("Check %s", build_manifest["file_name"])
422     else:
423         err = None
424         for suite in ("testing", "stable"):
425             try:
426                 build_manifest = get_build_manifest(suite)
427                 logger.info("Check %s", build_manifest["file_name"])
428                 break
429             except NoBuildManifest as e:
430                 logger.debug("No build manifest found for %s.", suite)
431                 err = e
432         else:
433             raise err
435     propose_update = check_build_manifest(build_manifest, config, suites, args.verbose)
437     if propose_update:
438         sys.exit(1)
439     else:
440         logger.debug("Nothing to do.")
443 if __name__ == "__main__":
444     main()