7 from abc
import ABC
, abstractmethod
8 from contextlib
import ExitStack
, contextmanager
9 from pathlib
import Path
10 from queue
import Empty
, Queue
11 from typing
import Any
, Dict
, Iterator
, List
12 from xml
.sax
.saxutils
import XMLGenerator
13 from xml
.sax
.xmlreader
import AttributesImpl
15 from colorama
import Fore
, Style
16 from junit_xml
import TestCase
, TestSuite
19 class AbstractLogger(ABC
):
21 def log(self
, message
: str, attributes
: Dict
[str, str] = {}) -> None:
26 def subtest(self
, name
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
31 def nested(self
, message
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
35 def info(self
, *args
, **kwargs
) -> None: # type: ignore
39 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
43 def error(self
, *args
, **kwargs
) -> None: # type: ignore
47 def log_serial(self
, message
: str, machine
: str) -> None:
51 def print_serial_logs(self
, enable
: bool) -> None:
55 class JunitXMLLogger(AbstractLogger
):
57 def __init__(self
) -> None:
62 def __init__(self
, outfile
: Path
) -> None:
63 self
.tests
: dict[str, JunitXMLLogger
.TestCaseState
] = {
64 "main": self
.TestCaseState()
66 self
.currentSubtest
= "main"
67 self
.outfile
: Path
= outfile
68 self
._print
_serial
_logs
= True
69 atexit
.register(self
.close
)
71 def log(self
, message
: str, attributes
: Dict
[str, str] = {}) -> None:
72 self
.tests
[self
.currentSubtest
].stdout
+= message
+ os
.linesep
75 def subtest(self
, name
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
76 old_test
= self
.currentSubtest
77 self
.tests
.setdefault(name
, self
.TestCaseState())
78 self
.currentSubtest
= name
82 self
.currentSubtest
= old_test
85 def nested(self
, message
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
89 def info(self
, *args
, **kwargs
) -> None: # type: ignore
90 self
.tests
[self
.currentSubtest
].stdout
+= args
[0] + os
.linesep
92 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
93 self
.tests
[self
.currentSubtest
].stdout
+= args
[0] + os
.linesep
95 def error(self
, *args
, **kwargs
) -> None: # type: ignore
96 self
.tests
[self
.currentSubtest
].stderr
+= args
[0] + os
.linesep
97 self
.tests
[self
.currentSubtest
].failure
= True
99 def log_serial(self
, message
: str, machine
: str) -> None:
100 if not self
._print
_serial
_logs
:
103 self
.log(f
"{machine} # {message}")
105 def print_serial_logs(self
, enable
: bool) -> None:
106 self
._print
_serial
_logs
= enable
108 def close(self
) -> None:
109 with
open(self
.outfile
, "w") as f
:
111 for name
, test_case_state
in self
.tests
.items():
114 stdout
=test_case_state
.stdout
,
115 stderr
=test_case_state
.stderr
,
117 if test_case_state
.failure
:
118 tc
.add_failure_info("test case failed")
120 test_cases
.append(tc
)
121 ts
= TestSuite("NixOS integration test", test_cases
)
122 f
.write(TestSuite
.to_xml_string([ts
]))
125 class CompositeLogger(AbstractLogger
):
126 def __init__(self
, logger_list
: List
[AbstractLogger
]) -> None:
127 self
.logger_list
= logger_list
129 def add_logger(self
, logger
: AbstractLogger
) -> None:
130 self
.logger_list
.append(logger
)
132 def log(self
, message
: str, attributes
: Dict
[str, str] = {}) -> None:
133 for logger
in self
.logger_list
:
134 logger
.log(message
, attributes
)
137 def subtest(self
, name
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
138 with
ExitStack() as stack
:
139 for logger
in self
.logger_list
:
140 stack
.enter_context(logger
.subtest(name
, attributes
))
144 def nested(self
, message
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
145 with
ExitStack() as stack
:
146 for logger
in self
.logger_list
:
147 stack
.enter_context(logger
.nested(message
, attributes
))
150 def info(self
, *args
, **kwargs
) -> None: # type: ignore
151 for logger
in self
.logger_list
:
152 logger
.info(*args
, **kwargs
)
154 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
155 for logger
in self
.logger_list
:
156 logger
.warning(*args
, **kwargs
)
158 def error(self
, *args
, **kwargs
) -> None: # type: ignore
159 for logger
in self
.logger_list
:
160 logger
.error(*args
, **kwargs
)
163 def print_serial_logs(self
, enable
: bool) -> None:
164 for logger
in self
.logger_list
:
165 logger
.print_serial_logs(enable
)
167 def log_serial(self
, message
: str, machine
: str) -> None:
168 for logger
in self
.logger_list
:
169 logger
.log_serial(message
, machine
)
172 class TerminalLogger(AbstractLogger
):
173 def __init__(self
) -> None:
174 self
._print
_serial
_logs
= True
176 def maybe_prefix(self
, message
: str, attributes
: Dict
[str, str]) -> str:
177 if "machine" in attributes
:
178 return f
"{attributes['machine']}: {message}"
182 def _eprint(*args
: object, **kwargs
: Any
) -> None:
183 print(*args
, file=sys
.stderr
, **kwargs
)
185 def log(self
, message
: str, attributes
: Dict
[str, str] = {}) -> None:
186 self
._eprint
(self
.maybe_prefix(message
, attributes
))
189 def subtest(self
, name
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
190 with self
.nested("subtest: " + name
, attributes
):
194 def nested(self
, message
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
197 Style
.BRIGHT
+ Fore
.GREEN
+ message
+ Style
.RESET_ALL
, attributes
204 self
.log(f
"(finished: {message}, in {toc - tic:.2f} seconds)")
206 def info(self
, *args
, **kwargs
) -> None: # type: ignore
207 self
.log(*args
, **kwargs
)
209 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
210 self
.log(*args
, **kwargs
)
212 def error(self
, *args
, **kwargs
) -> None: # type: ignore
213 self
.log(*args
, **kwargs
)
215 def print_serial_logs(self
, enable
: bool) -> None:
216 self
._print
_serial
_logs
= enable
218 def log_serial(self
, message
: str, machine
: str) -> None:
219 if not self
._print
_serial
_logs
:
222 self
._eprint
(Style
.DIM
+ f
"{machine} # {message}" + Style
.RESET_ALL
)
225 class XMLLogger(AbstractLogger
):
226 def __init__(self
, outfile
: str) -> None:
227 self
.logfile_handle
= codecs
.open(outfile
, "wb")
228 self
.xml
= XMLGenerator(self
.logfile_handle
, encoding
="utf-8")
229 self
.queue
: Queue
[dict[str, str]] = Queue()
231 self
._print
_serial
_logs
= True
233 self
.xml
.startDocument()
234 self
.xml
.startElement("logfile", attrs
=AttributesImpl({}))
236 def close(self
) -> None:
237 self
.xml
.endElement("logfile")
238 self
.xml
.endDocument()
239 self
.logfile_handle
.close()
241 def sanitise(self
, message
: str) -> str:
242 return "".join(ch
for ch
in message
if unicodedata
.category(ch
)[0] != "C")
244 def maybe_prefix(self
, message
: str, attributes
: Dict
[str, str]) -> str:
245 if "machine" in attributes
:
246 return f
"{attributes['machine']}: {message}"
249 def log_line(self
, message
: str, attributes
: Dict
[str, str]) -> None:
250 self
.xml
.startElement("line", attrs
=AttributesImpl(attributes
))
251 self
.xml
.characters(message
)
252 self
.xml
.endElement("line")
254 def info(self
, *args
, **kwargs
) -> None: # type: ignore
255 self
.log(*args
, **kwargs
)
257 def warning(self
, *args
, **kwargs
) -> None: # type: ignore
258 self
.log(*args
, **kwargs
)
260 def error(self
, *args
, **kwargs
) -> None: # type: ignore
261 self
.log(*args
, **kwargs
)
263 def log(self
, message
: str, attributes
: Dict
[str, str] = {}) -> None:
264 self
.drain_log_queue()
265 self
.log_line(message
, attributes
)
267 def print_serial_logs(self
, enable
: bool) -> None:
268 self
._print
_serial
_logs
= enable
270 def log_serial(self
, message
: str, machine
: str) -> None:
271 if not self
._print
_serial
_logs
:
274 self
.enqueue({"msg": message
, "machine": machine
, "type": "serial"})
276 def enqueue(self
, item
: Dict
[str, str]) -> None:
279 def drain_log_queue(self
) -> None:
282 item
= self
.queue
.get_nowait()
283 msg
= self
.sanitise(item
["msg"])
285 self
.log_line(msg
, item
)
290 def subtest(self
, name
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
291 with self
.nested("subtest: " + name
, attributes
):
295 def nested(self
, message
: str, attributes
: Dict
[str, str] = {}) -> Iterator
[None]:
296 self
.xml
.startElement("nest", attrs
=AttributesImpl({}))
297 self
.xml
.startElement("head", attrs
=AttributesImpl(attributes
))
298 self
.xml
.characters(message
)
299 self
.xml
.endElement("head")
302 self
.drain_log_queue()
304 self
.drain_log_queue()
306 self
.log(f
"(finished: {message}, in {toc - tic:.2f} seconds)")
308 self
.xml
.endElement("nest")