k3s: format with nixfmt-rfc-style
[NixPkgs.git] / maintainers / scripts / sha-to-sri.py
blob1af7ff215ad33525b5ab25e6d5e6d836a2cba1e3
1 #!/usr/bin/env nix-shell
2 #! nix-shell -i "python3 -I" -p "python3.withPackages(p: with p; [ rich structlog ])"
4 from abc import ABC, abstractclassmethod, abstractmethod
5 from contextlib import contextmanager
6 from pathlib import Path
7 from structlog.contextvars import bound_contextvars as log_context
8 from typing import ClassVar, List, Tuple
10 import hashlib, re, structlog
13 logger = structlog.getLogger("sha-to-SRI")
16 class Encoding(ABC):
17 alphabet: ClassVar[str]
19 @classmethod
20 @property
21 def name(cls) -> str:
22 return cls.__name__.lower()
24 def toSRI(self, s: str) -> str:
25 digest = self.decode(s)
26 assert len(digest) == self.n
28 from base64 import b64encode
29 return f"{self.hashName}-{b64encode(digest).decode()}"
31 @classmethod
32 def all(cls, h) -> 'List[Encoding]':
33 return [ c(h) for c in cls.__subclasses__() ]
35 def __init__(self, h):
36 self.n = h.digest_size
37 self.hashName = h.name
39 @property
40 @abstractmethod
41 def length(self) -> int:
42 ...
44 @property
45 def regex(self) -> str:
46 return f"[{self.alphabet}]{{{self.length}}}"
48 @abstractmethod
49 def decode(self, s: str) -> bytes:
50 ...
53 class Nix32(Encoding):
54 alphabet = "0123456789abcdfghijklmnpqrsvwxyz"
55 inverted = { c: i for i, c in enumerate(alphabet) }
57 @property
58 def length(self):
59 return 1 + (8 * self.n) // 5
60 def decode(self, s: str):
61 assert len(s) == self.length
62 out = [ 0 for _ in range(self.n) ]
63 # TODO: Do better than a list of byte-sized ints
65 for n, c in enumerate(reversed(s)):
66 digit = self.inverted[c]
67 i, j = divmod(5 * n, 8)
68 out[i] = out[i] | (digit << j) & 0xff
69 rem = digit >> (8 - j)
70 if rem == 0:
71 continue
72 elif i < self.n:
73 out[i+1] = rem
74 else:
75 raise ValueError(f"Invalid nix32 hash: '{s}'")
77 return bytes(out)
79 class Hex(Encoding):
80 alphabet = "0-9A-Fa-f"
82 @property
83 def length(self):
84 return 2 * self.n
85 def decode(self, s: str):
86 from binascii import unhexlify
87 return unhexlify(s)
89 class Base64(Encoding):
90 alphabet = "A-Za-z0-9+/"
92 @property
93 def format(self) -> Tuple[int, int]:
94 """Number of characters in data and padding."""
95 i, k = divmod(self.n, 3)
96 return 4 * i + (0 if k == 0 else k + 1), (3 - k) % 3
97 @property
98 def length(self):
99 return sum(self.format)
100 @property
101 def regex(self):
102 data, padding = self.format
103 return f"[{self.alphabet}]{{{data}}}={{{padding}}}"
104 def decode(self, s):
105 from base64 import b64decode
106 return b64decode(s, validate = True)
109 _HASHES = (hashlib.new(n) for n in ('SHA-256', 'SHA-512'))
110 ENCODINGS = {
111 h.name: Encoding.all(h)
112 for h in _HASHES
115 RE = {
116 h: "|".join(
117 (f"({h}-)?" if e.name == 'base64' else '') +
118 f"(?P<{h}_{e.name}>{e.regex})"
119 for e in encodings
120 ) for h, encodings in ENCODINGS.items()
123 _DEF_RE = re.compile("|".join(
124 f"(?P<{h}>{h} = (?P<{h}_quote>['\"])({re})(?P={h}_quote);)"
125 for h, re in RE.items()
129 def defToSRI(s: str) -> str:
130 def f(m: re.Match[str]) -> str:
131 try:
132 for h, encodings in ENCODINGS.items():
133 if m.group(h) is None:
134 continue
136 for e in encodings:
137 s = m.group(f"{h}_{e.name}")
138 if s is not None:
139 return f'hash = "{e.toSRI(s)}";'
141 raise ValueError(f"Match with '{h}' but no subgroup")
142 raise ValueError("Match with no hash")
144 except ValueError as exn:
145 logger.error(
146 "Skipping",
147 exc_info = exn,
149 return m.group()
151 return _DEF_RE.sub(f, s)
154 @contextmanager
155 def atomicFileUpdate(target: Path):
156 '''Atomically replace the contents of a file.
158 Guarantees that no temporary files are left behind, and `target` is either
159 left untouched, or overwritten with new content if no exception was raised.
161 Yields a pair `(original, new)` of open files.
162 `original` is the pre-existing file at `target`, open for reading;
163 `new` is an empty, temporary file in the same filder, open for writing.
165 Upon exiting the context, the files are closed; if no exception was
166 raised, `new` (atomically) replaces the `target`, otherwise it is deleted.
168 # That's mostly copied from noto-emoji.py, should DRY it out
169 from tempfile import mkstemp
170 fd, _p = mkstemp(
171 dir = target.parent,
172 prefix = target.name,
174 tmpPath = Path(_p)
176 try:
177 with target.open() as original:
178 with tmpPath.open('w') as new:
179 yield (original, new)
181 tmpPath.replace(target)
183 except Exception:
184 tmpPath.unlink(missing_ok = True)
185 raise
188 def fileToSRI(p: Path):
189 with atomicFileUpdate(p) as (og, new):
190 for i, line in enumerate(og):
191 with log_context(line=i):
192 new.write(defToSRI(line))
195 _SKIP_RE = re.compile(
196 "(generated by)|(do not edit)",
197 re.IGNORECASE
200 if __name__ == "__main__":
201 from sys import argv, stderr
202 logger.info("Starting!")
204 for arg in argv[1:]:
205 p = Path(arg)
206 with log_context(path=str(p)):
207 try:
208 if p.name == "yarn.nix" or p.name.find("generated") != -1:
209 logger.warning("File looks autogenerated, skipping!")
210 continue
212 with p.open() as f:
213 for line in f:
214 if line.strip():
215 break
217 if _SKIP_RE.search(line):
218 logger.warning("File looks autogenerated, skipping!")
219 continue
221 fileToSRI(p)
222 except Exception as exn:
223 logger.error(
224 "Unhandled exception, skipping file!",
225 exc_info = exn,
227 else:
228 logger.info("Finished processing file")