7 from abc
import ABC
, abstractmethod
8 from collections
.abc
import Iterator
9 from contextlib
import ExitStack
, contextmanager
10 from pathlib
import Path
11 from queue
import Empty
, Queue
12 from typing
import Any
13 from xml
.sax
.saxutils
import XMLGenerator
14 from xml
.sax
.xmlreader
import AttributesImpl
16 from colorama
import Fore
, Style
17 from junit_xml
import TestCase
, TestSuite
20 class AbstractLogger(ABC
):
22 def log(self
, message
: str, attributes
: dict[str, str] = {}) -> None:
27 def subtest(self
, name
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
32 def nested(self
, message
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
36 def info(self
, *args
, **kwargs
) -> None: # type: ignore
40 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
44 def error(self
, *args
, **kwargs
) -> None: # type: ignore
48 def log_serial(self
, message
: str, machine
: str) -> None:
52 def print_serial_logs(self
, enable
: bool) -> None:
56 class JunitXMLLogger(AbstractLogger
):
58 def __init__(self
) -> None:
63 def __init__(self
, outfile
: Path
) -> None:
64 self
.tests
: dict[str, JunitXMLLogger
.TestCaseState
] = {
65 "main": self
.TestCaseState()
67 self
.currentSubtest
= "main"
68 self
.outfile
: Path
= outfile
69 self
._print
_serial
_logs
= True
70 atexit
.register(self
.close
)
72 def log(self
, message
: str, attributes
: dict[str, str] = {}) -> None:
73 self
.tests
[self
.currentSubtest
].stdout
+= message
+ os
.linesep
76 def subtest(self
, name
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
77 old_test
= self
.currentSubtest
78 self
.tests
.setdefault(name
, self
.TestCaseState())
79 self
.currentSubtest
= name
83 self
.currentSubtest
= old_test
86 def nested(self
, message
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
90 def info(self
, *args
, **kwargs
) -> None: # type: ignore
91 self
.tests
[self
.currentSubtest
].stdout
+= args
[0] + os
.linesep
93 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
94 self
.tests
[self
.currentSubtest
].stdout
+= args
[0] + os
.linesep
96 def error(self
, *args
, **kwargs
) -> None: # type: ignore
97 self
.tests
[self
.currentSubtest
].stderr
+= args
[0] + os
.linesep
98 self
.tests
[self
.currentSubtest
].failure
= True
100 def log_serial(self
, message
: str, machine
: str) -> None:
101 if not self
._print
_serial
_logs
:
104 self
.log(f
"{machine} # {message}")
106 def print_serial_logs(self
, enable
: bool) -> None:
107 self
._print
_serial
_logs
= enable
109 def close(self
) -> None:
110 with
open(self
.outfile
, "w") as f
:
112 for name
, test_case_state
in self
.tests
.items():
115 stdout
=test_case_state
.stdout
,
116 stderr
=test_case_state
.stderr
,
118 if test_case_state
.failure
:
119 tc
.add_failure_info("test case failed")
121 test_cases
.append(tc
)
122 ts
= TestSuite("NixOS integration test", test_cases
)
123 f
.write(TestSuite
.to_xml_string([ts
]))
126 class CompositeLogger(AbstractLogger
):
127 def __init__(self
, logger_list
: list[AbstractLogger
]) -> None:
128 self
.logger_list
= logger_list
130 def add_logger(self
, logger
: AbstractLogger
) -> None:
131 self
.logger_list
.append(logger
)
133 def log(self
, message
: str, attributes
: dict[str, str] = {}) -> None:
134 for logger
in self
.logger_list
:
135 logger
.log(message
, attributes
)
138 def subtest(self
, name
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
139 with
ExitStack() as stack
:
140 for logger
in self
.logger_list
:
141 stack
.enter_context(logger
.subtest(name
, attributes
))
145 def nested(self
, message
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
146 with
ExitStack() as stack
:
147 for logger
in self
.logger_list
:
148 stack
.enter_context(logger
.nested(message
, attributes
))
151 def info(self
, *args
, **kwargs
) -> None: # type: ignore
152 for logger
in self
.logger_list
:
153 logger
.info(*args
, **kwargs
)
155 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
156 for logger
in self
.logger_list
:
157 logger
.warning(*args
, **kwargs
)
159 def error(self
, *args
, **kwargs
) -> None: # type: ignore
160 for logger
in self
.logger_list
:
161 logger
.error(*args
, **kwargs
)
164 def print_serial_logs(self
, enable
: bool) -> None:
165 for logger
in self
.logger_list
:
166 logger
.print_serial_logs(enable
)
168 def log_serial(self
, message
: str, machine
: str) -> None:
169 for logger
in self
.logger_list
:
170 logger
.log_serial(message
, machine
)
173 class TerminalLogger(AbstractLogger
):
174 def __init__(self
) -> None:
175 self
._print
_serial
_logs
= True
177 def maybe_prefix(self
, message
: str, attributes
: dict[str, str]) -> str:
178 if "machine" in attributes
:
179 return f
"{attributes['machine']}: {message}"
183 def _eprint(*args
: object, **kwargs
: Any
) -> None:
184 print(*args
, file=sys
.stderr
, **kwargs
)
186 def log(self
, message
: str, attributes
: dict[str, str] = {}) -> None:
187 self
._eprint
(self
.maybe_prefix(message
, attributes
))
190 def subtest(self
, name
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
191 with self
.nested("subtest: " + name
, attributes
):
195 def nested(self
, message
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
198 Style
.BRIGHT
+ Fore
.GREEN
+ message
+ Style
.RESET_ALL
, attributes
205 self
.log(f
"(finished: {message}, in {toc - tic:.2f} seconds)")
207 def info(self
, *args
, **kwargs
) -> None: # type: ignore
208 self
.log(*args
, **kwargs
)
210 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
211 self
.log(*args
, **kwargs
)
213 def error(self
, *args
, **kwargs
) -> None: # type: ignore
214 self
.log(*args
, **kwargs
)
216 def print_serial_logs(self
, enable
: bool) -> None:
217 self
._print
_serial
_logs
= enable
219 def log_serial(self
, message
: str, machine
: str) -> None:
220 if not self
._print
_serial
_logs
:
223 self
._eprint
(Style
.DIM
+ f
"{machine} # {message}" + Style
.RESET_ALL
)
226 class XMLLogger(AbstractLogger
):
227 def __init__(self
, outfile
: str) -> None:
228 self
.logfile_handle
= codecs
.open(outfile
, "wb")
229 self
.xml
= XMLGenerator(self
.logfile_handle
, encoding
="utf-8")
230 self
.queue
: Queue
[dict[str, str]] = Queue()
232 self
._print
_serial
_logs
= True
234 self
.xml
.startDocument()
235 self
.xml
.startElement("logfile", attrs
=AttributesImpl({}))
237 def close(self
) -> None:
238 self
.xml
.endElement("logfile")
239 self
.xml
.endDocument()
240 self
.logfile_handle
.close()
242 def sanitise(self
, message
: str) -> str:
243 return "".join(ch
for ch
in message
if unicodedata
.category(ch
)[0] != "C")
245 def maybe_prefix(self
, message
: str, attributes
: dict[str, str]) -> str:
246 if "machine" in attributes
:
247 return f
"{attributes['machine']}: {message}"
250 def log_line(self
, message
: str, attributes
: dict[str, str]) -> None:
251 self
.xml
.startElement("line", attrs
=AttributesImpl(attributes
))
252 self
.xml
.characters(message
)
253 self
.xml
.endElement("line")
255 def info(self
, *args
, **kwargs
) -> None: # type: ignore
256 self
.log(*args
, **kwargs
)
258 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
259 self
.log(*args
, **kwargs
)
261 def error(self
, *args
, **kwargs
) -> None: # type: ignore
262 self
.log(*args
, **kwargs
)
264 def log(self
, message
: str, attributes
: dict[str, str] = {}) -> None:
265 self
.drain_log_queue()
266 self
.log_line(message
, attributes
)
268 def print_serial_logs(self
, enable
: bool) -> None:
269 self
._print
_serial
_logs
= enable
271 def log_serial(self
, message
: str, machine
: str) -> None:
272 if not self
._print
_serial
_logs
:
275 self
.enqueue({"msg": message
, "machine": machine
, "type": "serial"})
277 def enqueue(self
, item
: dict[str, str]) -> None:
280 def drain_log_queue(self
) -> None:
283 item
= self
.queue
.get_nowait()
284 msg
= self
.sanitise(item
["msg"])
286 self
.log_line(msg
, item
)
291 def subtest(self
, name
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
292 with self
.nested("subtest: " + name
, attributes
):
296 def nested(self
, message
: str, attributes
: dict[str, str] = {}) -> Iterator
[None]:
297 self
.xml
.startElement("nest", attrs
=AttributesImpl({}))
298 self
.xml
.startElement("head", attrs
=AttributesImpl(attributes
))
299 self
.xml
.characters(message
)
300 self
.xml
.endElement("head")
303 self
.drain_log_queue()
305 self
.drain_log_queue()
307 self
.log(f
"(finished: {message}, in {toc - tic:.2f} seconds)")
309 self
.xml
.endElement("nest")