vuls: init at 0.27.0
[NixPkgs.git] / nixos / lib / test-driver / test_driver / logger.py
blob484829254b812c1541475e7e8b86503dd0a3b0d8
1 import atexit
2 import codecs
3 import os
4 import sys
5 import time
6 import unicodedata
7 from abc import ABC, abstractmethod
8 from contextlib import ExitStack, contextmanager
9 from pathlib import Path
10 from queue import Empty, Queue
11 from typing import Any, Dict, Iterator, List
12 from xml.sax.saxutils import XMLGenerator
13 from xml.sax.xmlreader import AttributesImpl
15 from colorama import Fore, Style
16 from junit_xml import TestCase, TestSuite
19 class AbstractLogger(ABC):
20 @abstractmethod
21 def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
22 pass
24 @abstractmethod
25 @contextmanager
26 def subtest(self, name: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
27 pass
29 @abstractmethod
30 @contextmanager
31 def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
32 pass
34 @abstractmethod
35 def info(self, *args, **kwargs) -> None: # type: ignore
36 pass
38 @abstractmethod
39 def warning(self, *args, **kwargs) -> None: # type: ignore
40 pass
42 @abstractmethod
43 def error(self, *args, **kwargs) -> None: # type: ignore
44 pass
46 @abstractmethod
47 def log_serial(self, message: str, machine: str) -> None:
48 pass
50 @abstractmethod
51 def print_serial_logs(self, enable: bool) -> None:
52 pass
55 class JunitXMLLogger(AbstractLogger):
56 class TestCaseState:
57 def __init__(self) -> None:
58 self.stdout = ""
59 self.stderr = ""
60 self.failure = False
62 def __init__(self, outfile: Path) -> None:
63 self.tests: dict[str, JunitXMLLogger.TestCaseState] = {
64 "main": self.TestCaseState()
66 self.currentSubtest = "main"
67 self.outfile: Path = outfile
68 self._print_serial_logs = True
69 atexit.register(self.close)
71 def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
72 self.tests[self.currentSubtest].stdout += message + os.linesep
74 @contextmanager
75 def subtest(self, name: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
76 old_test = self.currentSubtest
77 self.tests.setdefault(name, self.TestCaseState())
78 self.currentSubtest = name
80 yield
82 self.currentSubtest = old_test
84 @contextmanager
85 def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
86 self.log(message)
87 yield
89 def info(self, *args, **kwargs) -> None: # type: ignore
90 self.tests[self.currentSubtest].stdout += args[0] + os.linesep
92 def warning(self, *args, **kwargs) -> None: # type: ignore
93 self.tests[self.currentSubtest].stdout += args[0] + os.linesep
95 def error(self, *args, **kwargs) -> None: # type: ignore
96 self.tests[self.currentSubtest].stderr += args[0] + os.linesep
97 self.tests[self.currentSubtest].failure = True
99 def log_serial(self, message: str, machine: str) -> None:
100 if not self._print_serial_logs:
101 return
103 self.log(f"{machine} # {message}")
105 def print_serial_logs(self, enable: bool) -> None:
106 self._print_serial_logs = enable
108 def close(self) -> None:
109 with open(self.outfile, "w") as f:
110 test_cases = []
111 for name, test_case_state in self.tests.items():
112 tc = TestCase(
113 name,
114 stdout=test_case_state.stdout,
115 stderr=test_case_state.stderr,
117 if test_case_state.failure:
118 tc.add_failure_info("test case failed")
120 test_cases.append(tc)
121 ts = TestSuite("NixOS integration test", test_cases)
122 f.write(TestSuite.to_xml_string([ts]))
125 class CompositeLogger(AbstractLogger):
126 def __init__(self, logger_list: List[AbstractLogger]) -> None:
127 self.logger_list = logger_list
129 def add_logger(self, logger: AbstractLogger) -> None:
130 self.logger_list.append(logger)
132 def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
133 for logger in self.logger_list:
134 logger.log(message, attributes)
136 @contextmanager
137 def subtest(self, name: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
138 with ExitStack() as stack:
139 for logger in self.logger_list:
140 stack.enter_context(logger.subtest(name, attributes))
141 yield
143 @contextmanager
144 def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
145 with ExitStack() as stack:
146 for logger in self.logger_list:
147 stack.enter_context(logger.nested(message, attributes))
148 yield
150 def info(self, *args, **kwargs) -> None: # type: ignore
151 for logger in self.logger_list:
152 logger.info(*args, **kwargs)
154 def warning(self, *args, **kwargs) -> None: # type: ignore
155 for logger in self.logger_list:
156 logger.warning(*args, **kwargs)
158 def error(self, *args, **kwargs) -> None: # type: ignore
159 for logger in self.logger_list:
160 logger.error(*args, **kwargs)
161 sys.exit(1)
163 def print_serial_logs(self, enable: bool) -> None:
164 for logger in self.logger_list:
165 logger.print_serial_logs(enable)
167 def log_serial(self, message: str, machine: str) -> None:
168 for logger in self.logger_list:
169 logger.log_serial(message, machine)
172 class TerminalLogger(AbstractLogger):
173 def __init__(self) -> None:
174 self._print_serial_logs = True
176 def maybe_prefix(self, message: str, attributes: Dict[str, str]) -> str:
177 if "machine" in attributes:
178 return f"{attributes['machine']}: {message}"
179 return message
181 @staticmethod
182 def _eprint(*args: object, **kwargs: Any) -> None:
183 print(*args, file=sys.stderr, **kwargs)
185 def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
186 self._eprint(self.maybe_prefix(message, attributes))
188 @contextmanager
189 def subtest(self, name: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
190 with self.nested("subtest: " + name, attributes):
191 yield
193 @contextmanager
194 def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
195 self._eprint(
196 self.maybe_prefix(
197 Style.BRIGHT + Fore.GREEN + message + Style.RESET_ALL, attributes
201 tic = time.time()
202 yield
203 toc = time.time()
204 self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)")
206 def info(self, *args, **kwargs) -> None: # type: ignore
207 self.log(*args, **kwargs)
209 def warning(self, *args, **kwargs) -> None: # type: ignore
210 self.log(*args, **kwargs)
212 def error(self, *args, **kwargs) -> None: # type: ignore
213 self.log(*args, **kwargs)
215 def print_serial_logs(self, enable: bool) -> None:
216 self._print_serial_logs = enable
218 def log_serial(self, message: str, machine: str) -> None:
219 if not self._print_serial_logs:
220 return
222 self._eprint(Style.DIM + f"{machine} # {message}" + Style.RESET_ALL)
225 class XMLLogger(AbstractLogger):
226 def __init__(self, outfile: str) -> None:
227 self.logfile_handle = codecs.open(outfile, "wb")
228 self.xml = XMLGenerator(self.logfile_handle, encoding="utf-8")
229 self.queue: Queue[dict[str, str]] = Queue()
231 self._print_serial_logs = True
233 self.xml.startDocument()
234 self.xml.startElement("logfile", attrs=AttributesImpl({}))
236 def close(self) -> None:
237 self.xml.endElement("logfile")
238 self.xml.endDocument()
239 self.logfile_handle.close()
241 def sanitise(self, message: str) -> str:
242 return "".join(ch for ch in message if unicodedata.category(ch)[0] != "C")
244 def maybe_prefix(self, message: str, attributes: Dict[str, str]) -> str:
245 if "machine" in attributes:
246 return f"{attributes['machine']}: {message}"
247 return message
249 def log_line(self, message: str, attributes: Dict[str, str]) -> None:
250 self.xml.startElement("line", attrs=AttributesImpl(attributes))
251 self.xml.characters(message)
252 self.xml.endElement("line")
254 def info(self, *args, **kwargs) -> None: # type: ignore
255 self.log(*args, **kwargs)
257 def warning(self, *args, **kwargs) -> None: # type: ignore
258 self.log(*args, **kwargs)
260 def error(self, *args, **kwargs) -> None: # type: ignore
261 self.log(*args, **kwargs)
263 def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
264 self.drain_log_queue()
265 self.log_line(message, attributes)
267 def print_serial_logs(self, enable: bool) -> None:
268 self._print_serial_logs = enable
270 def log_serial(self, message: str, machine: str) -> None:
271 if not self._print_serial_logs:
272 return
274 self.enqueue({"msg": message, "machine": machine, "type": "serial"})
276 def enqueue(self, item: Dict[str, str]) -> None:
277 self.queue.put(item)
279 def drain_log_queue(self) -> None:
280 try:
281 while True:
282 item = self.queue.get_nowait()
283 msg = self.sanitise(item["msg"])
284 del item["msg"]
285 self.log_line(msg, item)
286 except Empty:
287 pass
289 @contextmanager
290 def subtest(self, name: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
291 with self.nested("subtest: " + name, attributes):
292 yield
294 @contextmanager
295 def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
296 self.xml.startElement("nest", attrs=AttributesImpl({}))
297 self.xml.startElement("head", attrs=AttributesImpl(attributes))
298 self.xml.characters(message)
299 self.xml.endElement("head")
301 tic = time.time()
302 self.drain_log_queue()
303 yield
304 self.drain_log_queue()
305 toc = time.time()
306 self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)")
308 self.xml.endElement("nest")