vuls: init at 0.27.0
[NixPkgs.git] / nixos / lib / test-driver / test_driver / machine.py
blob3a1d5bc1be764368bb6c1f0ba77face3fb7680be
1 import base64
2 import io
3 import os
4 import queue
5 import re
6 import select
7 import shlex
8 import shutil
9 import socket
10 import subprocess
11 import sys
12 import tempfile
13 import threading
14 import time
15 from contextlib import _GeneratorContextManager, nullcontext
16 from pathlib import Path
17 from queue import Queue
18 from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
20 from test_driver.logger import AbstractLogger
22 from .qmp import QMPSession
24 CHAR_TO_KEY = {
25 "A": "shift-a",
26 "N": "shift-n",
27 "-": "0x0C",
28 "_": "shift-0x0C",
29 "B": "shift-b",
30 "O": "shift-o",
31 "=": "0x0D",
32 "+": "shift-0x0D",
33 "C": "shift-c",
34 "P": "shift-p",
35 "[": "0x1A",
36 "{": "shift-0x1A",
37 "D": "shift-d",
38 "Q": "shift-q",
39 "]": "0x1B",
40 "}": "shift-0x1B",
41 "E": "shift-e",
42 "R": "shift-r",
43 ";": "0x27",
44 ":": "shift-0x27",
45 "F": "shift-f",
46 "S": "shift-s",
47 "'": "0x28",
48 '"': "shift-0x28",
49 "G": "shift-g",
50 "T": "shift-t",
51 "`": "0x29",
52 "~": "shift-0x29",
53 "H": "shift-h",
54 "U": "shift-u",
55 "\\": "0x2B",
56 "|": "shift-0x2B",
57 "I": "shift-i",
58 "V": "shift-v",
59 ",": "0x33",
60 "<": "shift-0x33",
61 "J": "shift-j",
62 "W": "shift-w",
63 ".": "0x34",
64 ">": "shift-0x34",
65 "K": "shift-k",
66 "X": "shift-x",
67 "/": "0x35",
68 "?": "shift-0x35",
69 "L": "shift-l",
70 "Y": "shift-y",
71 " ": "spc",
72 "M": "shift-m",
73 "Z": "shift-z",
74 "\n": "ret",
75 "!": "shift-0x02",
76 "@": "shift-0x03",
77 "#": "shift-0x04",
78 "$": "shift-0x05",
79 "%": "shift-0x06",
80 "^": "shift-0x07",
81 "&": "shift-0x08",
82 "*": "shift-0x09",
83 "(": "shift-0x0A",
84 ")": "shift-0x0B",
88 def make_command(args: list) -> str:
89 return " ".join(map(shlex.quote, (map(str, args))))
92 def _perform_ocr_on_screenshot(
93 screenshot_path: str, model_ids: Iterable[int]
94 ) -> List[str]:
95 if shutil.which("tesseract") is None:
96 raise Exception("OCR requested but enableOCR is false")
98 magick_args = (
99 "-filter Catrom -density 72 -resample 300 "
100 + "-contrast -normalize -despeckle -type grayscale "
101 + "-sharpen 1 -posterize 3 -negate -gamma 100 "
102 + "-blur 1x65535"
105 tess_args = "-c debug_file=/dev/null --psm 11"
107 cmd = f"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'"
108 ret = subprocess.run(cmd, shell=True, capture_output=True)
109 if ret.returncode != 0:
110 raise Exception(f"TIFF conversion failed with exit code {ret.returncode}")
112 model_results = []
113 for model_id in model_ids:
114 cmd = f"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'"
115 ret = subprocess.run(cmd, shell=True, capture_output=True)
116 if ret.returncode != 0:
117 raise Exception(f"OCR failed with exit code {ret.returncode}")
118 model_results.append(ret.stdout.decode("utf-8"))
120 return model_results
123 def retry(fn: Callable, timeout: int = 900) -> None:
124 """Call the given function repeatedly, with 1 second intervals,
125 until it returns True or a timeout is reached.
128 for _ in range(timeout):
129 if fn(False):
130 return
131 time.sleep(1)
133 if not fn(True):
134 raise Exception(f"action timed out after {timeout} seconds")
137 class StartCommand:
138 """The Base Start Command knows how to append the necessary
139 runtime qemu options as determined by a particular test driver
140 run. Any such start command is expected to happily receive and
141 append additional qemu args.
144 _cmd: str
146 def cmd(
147 self,
148 monitor_socket_path: Path,
149 qmp_socket_path: Path,
150 shell_socket_path: Path,
151 allow_reboot: bool = False,
152 ) -> str:
153 display_opts = ""
154 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"])
155 if not display_available:
156 display_opts += " -nographic"
158 # qemu options
159 qemu_opts = (
160 " -device virtio-serial"
161 # Note: virtconsole will map to /dev/hvc0 in Linux guests
162 " -device virtconsole,chardev=shell"
163 " -device virtio-rng-pci"
164 " -serial stdio"
166 if not allow_reboot:
167 qemu_opts += " -no-reboot"
169 return (
170 f"{self._cmd}"
171 f" -qmp unix:{qmp_socket_path},server=on,wait=off"
172 f" -monitor unix:{monitor_socket_path}"
173 f" -chardev socket,id=shell,path={shell_socket_path}"
174 f"{qemu_opts}"
175 f"{display_opts}"
178 @staticmethod
179 def build_environment(
180 state_dir: Path,
181 shared_dir: Path,
182 ) -> dict:
183 # We make a copy to not update the current environment
184 env = dict(os.environ)
185 env.update(
187 "TMPDIR": str(state_dir),
188 "SHARED_DIR": str(shared_dir),
189 "USE_TMPDIR": "1",
192 return env
194 def run(
195 self,
196 state_dir: Path,
197 shared_dir: Path,
198 monitor_socket_path: Path,
199 qmp_socket_path: Path,
200 shell_socket_path: Path,
201 allow_reboot: bool,
202 ) -> subprocess.Popen:
203 return subprocess.Popen(
204 self.cmd(
205 monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot
207 stdin=subprocess.PIPE,
208 stdout=subprocess.PIPE,
209 shell=True,
210 cwd=state_dir,
211 env=self.build_environment(state_dir, shared_dir),
215 class NixStartScript(StartCommand):
216 """A start script from nixos/modules/virtualiation/qemu-vm.nix
217 that also satisfies the requirement of the BaseStartCommand.
218 These Nix commands have the particular characteristic that the
219 machine name can be extracted out of them via a regex match.
220 (Admittedly a _very_ implicit contract, evtl. TODO fix)
223 def __init__(self, script: str):
224 self._cmd = script
226 @property
227 def machine_name(self) -> str:
228 match = re.search("run-(.+)-vm$", self._cmd)
229 name = "machine"
230 if match:
231 name = match.group(1)
232 return name
235 class Machine:
236 """A handle to the machine with this name, that also knows how to manage
237 the machine lifecycle with the help of a start script / command."""
239 name: str
240 out_dir: Path
241 tmp_dir: Path
242 shared_dir: Path
243 state_dir: Path
244 monitor_path: Path
245 qmp_path: Path
246 shell_path: Path
248 start_command: StartCommand
249 keep_vm_state: bool
251 process: Optional[subprocess.Popen]
252 pid: Optional[int]
253 monitor: Optional[socket.socket]
254 qmp_client: Optional[QMPSession]
255 shell: Optional[socket.socket]
256 serial_thread: Optional[threading.Thread]
258 booted: bool
259 connected: bool
260 # Store last serial console lines for use
261 # of wait_for_console_text
262 last_lines: Queue = Queue()
263 callbacks: List[Callable]
265 def __repr__(self) -> str:
266 return f"<Machine '{self.name}'>"
268 def __init__(
269 self,
270 out_dir: Path,
271 tmp_dir: Path,
272 start_command: StartCommand,
273 logger: AbstractLogger,
274 name: str = "machine",
275 keep_vm_state: bool = False,
276 callbacks: Optional[List[Callable]] = None,
277 ) -> None:
278 self.out_dir = out_dir
279 self.tmp_dir = tmp_dir
280 self.keep_vm_state = keep_vm_state
281 self.name = name
282 self.start_command = start_command
283 self.callbacks = callbacks if callbacks is not None else []
284 self.logger = logger
286 # set up directories
287 self.shared_dir = self.tmp_dir / "shared-xchg"
288 self.shared_dir.mkdir(mode=0o700, exist_ok=True)
290 self.state_dir = self.tmp_dir / f"vm-state-{self.name}"
291 self.monitor_path = self.state_dir / "monitor"
292 self.qmp_path = self.state_dir / "qmp"
293 self.shell_path = self.state_dir / "shell"
294 if (not self.keep_vm_state) and self.state_dir.exists():
295 self.cleanup_statedir()
296 self.state_dir.mkdir(mode=0o700, exist_ok=True)
298 self.process = None
299 self.pid = None
300 self.monitor = None
301 self.qmp_client = None
302 self.shell = None
303 self.serial_thread = None
305 self.booted = False
306 self.connected = False
308 def is_up(self) -> bool:
309 return self.booted and self.connected
311 def log(self, msg: str) -> None:
312 self.logger.log(msg, {"machine": self.name})
314 def log_serial(self, msg: str) -> None:
315 self.logger.log_serial(msg, self.name)
317 def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager:
318 my_attrs = {"machine": self.name}
319 my_attrs.update(attrs)
320 return self.logger.nested(msg, my_attrs)
322 def wait_for_monitor_prompt(self) -> str:
323 assert self.monitor is not None
324 answer = ""
325 while True:
326 undecoded_answer = self.monitor.recv(1024)
327 if not undecoded_answer:
328 break
329 answer += undecoded_answer.decode()
330 if answer.endswith("(qemu) "):
331 break
332 return answer
334 def send_monitor_command(self, command: str) -> str:
336 Send a command to the QEMU monitor. This allows attaching
337 virtual USB disks to a running machine, among other things.
339 self.run_callbacks()
340 message = f"{command}\n".encode()
341 assert self.monitor is not None
342 self.monitor.send(message)
343 return self.wait_for_monitor_prompt()
345 def wait_for_unit(
346 self, unit: str, user: Optional[str] = None, timeout: int = 900
347 ) -> None:
349 Wait for a systemd unit to get into "active" state.
350 Throws exceptions on "failed" and "inactive" states as well as after
351 timing out.
354 def check_active(_: Any) -> bool:
355 state = self.get_unit_property(unit, "ActiveState", user)
356 if state == "failed":
357 raise Exception(f'unit "{unit}" reached state "{state}"')
359 if state == "inactive":
360 status, jobs = self.systemctl("list-jobs --full 2>&1", user)
361 if "No jobs" in jobs:
362 info = self.get_unit_info(unit, user)
363 if info["ActiveState"] == state:
364 raise Exception(
365 f'unit "{unit}" is inactive and there are no pending jobs'
368 return state == "active"
370 with self.nested(
371 f"waiting for unit {unit}"
372 + (f" with user {user}" if user is not None else "")
374 retry(check_active, timeout)
376 def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]:
377 status, lines = self.systemctl(f'--no-pager show "{unit}"', user)
378 if status != 0:
379 raise Exception(
380 f'retrieving systemctl info for unit "{unit}"'
381 + ("" if user is None else f' under user "{user}"')
382 + f" failed with exit code {status}"
385 line_pattern = re.compile(r"^([^=]+)=(.*)$")
387 def tuple_from_line(line: str) -> Tuple[str, str]:
388 match = line_pattern.match(line)
389 assert match is not None
390 return match[1], match[2]
392 return dict(
393 tuple_from_line(line)
394 for line in lines.split("\n")
395 if line_pattern.match(line)
398 def get_unit_property(
399 self,
400 unit: str,
401 property: str,
402 user: Optional[str] = None,
403 ) -> str:
404 status, lines = self.systemctl(
405 f'--no-pager show "{unit}" --property="{property}"',
406 user,
408 if status != 0:
409 raise Exception(
410 f'retrieving systemctl property "{property}" for unit "{unit}"'
411 + ("" if user is None else f' under user "{user}"')
412 + f" failed with exit code {status}"
415 invalid_output_message = (
416 f'systemctl show --property "{property}" "{unit}"'
417 f"produced invalid output: {lines}"
420 line_pattern = re.compile(r"^([^=]+)=(.*)$")
421 match = line_pattern.match(lines)
422 assert match is not None, invalid_output_message
424 assert match[1] == property, invalid_output_message
425 return match[2]
427 def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]:
429 Runs `systemctl` commands with optional support for
430 `systemctl --user`
432 ```py
433 # run `systemctl list-jobs --no-pager`
434 machine.systemctl("list-jobs --no-pager")
436 # spawn a shell for `any-user` and run
437 # `systemctl --user list-jobs --no-pager`
438 machine.systemctl("list-jobs --no-pager", "any-user")
441 if user is not None:
442 q = q.replace("'", "\\'")
443 return self.execute(
444 f"su -l {user} --shell /bin/sh -c "
445 "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
446 f"systemctl --user {q}'"
448 return self.execute(f"systemctl {q}")
450 def require_unit_state(self, unit: str, require_state: str = "active") -> None:
451 with self.nested(
452 f"checking if unit '{unit}' has reached state '{require_state}'"
454 info = self.get_unit_info(unit)
455 state = info["ActiveState"]
456 if state != require_state:
457 raise Exception(
458 f"Expected unit '{unit}' to to be in state "
459 f"'{require_state}' but it is in state '{state}'"
462 def _next_newline_closed_block_from_shell(self) -> str:
463 assert self.shell
464 output_buffer = []
465 while True:
466 # This receives up to 4096 bytes from the socket
467 chunk = self.shell.recv(4096)
468 if not chunk:
469 # Probably a broken pipe, return the output we have
470 break
472 decoded = chunk.decode()
473 output_buffer += [decoded]
474 if decoded[-1] == "\n":
475 break
476 return "".join(output_buffer)
478 def execute(
479 self,
480 command: str,
481 check_return: bool = True,
482 check_output: bool = True,
483 timeout: Optional[int] = 900,
484 ) -> Tuple[int, str]:
486 Execute a shell command, returning a list `(status, stdout)`.
488 Commands are run with `set -euo pipefail` set:
490 - If several commands are separated by `;` and one fails, the
491 command as a whole will fail.
493 - For pipelines, the last non-zero exit status will be returned
494 (if there is one; otherwise zero will be returned).
496 - Dereferencing unset variables fails the command.
498 - It will wait for stdout to be closed.
500 If the command detaches, it must close stdout, as `execute` will wait
501 for this to consume all output reliably. This can be achieved by
502 redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or
503 a file. Examples of detaching commands are `sleep 365d &`, where the
504 shell forks a new process that can write to stdout and `xclip -i`, where
505 the `xclip` command itself forks without closing stdout.
507 Takes an optional parameter `check_return` that defaults to `True`.
508 Setting this parameter to `False` will not check for the return code
509 and return -1 instead. This can be used for commands that shut down
510 the VM and would therefore break the pipe that would be used for
511 retrieving the return code.
513 A timeout for the command can be specified (in seconds) using the optional
514 `timeout` parameter, e.g., `execute(cmd, timeout=10)` or
515 `execute(cmd, timeout=None)`. The default is 900 seconds.
517 self.run_callbacks()
518 self.connect()
520 # Always run command with shell opts
521 command = f"set -euo pipefail; {command}"
523 timeout_str = ""
524 if timeout is not None:
525 timeout_str = f"timeout {timeout}"
527 # While sh is bash on NixOS, this is not the case for every distro.
528 # We explicitly call bash here to allow for the driver to boot other distros as well.
529 out_command = (
530 f"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n"
533 assert self.shell
534 self.shell.send(out_command.encode())
536 if not check_output:
537 return (-2, "")
539 # Get the output
540 output = base64.b64decode(self._next_newline_closed_block_from_shell())
542 if not check_return:
543 return (-1, output.decode())
545 # Get the return code
546 self.shell.send(b"echo ${PIPESTATUS[0]}\n")
547 rc = int(self._next_newline_closed_block_from_shell().strip())
549 return (rc, output.decode(errors="replace"))
551 def shell_interact(self, address: Optional[str] = None) -> None:
553 Allows you to directly interact with the guest shell. This should
554 only be used during test development, not in production tests.
555 Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends
556 the guest session.
558 self.connect()
560 if address is None:
561 address = "READLINE,prompt=$ "
562 self.log("Terminal is ready (there is no initial prompt):")
564 assert self.shell
565 try:
566 subprocess.run(
567 ["socat", address, f"FD:{self.shell.fileno()}"],
568 pass_fds=[self.shell.fileno()],
570 # allow users to cancel this command without breaking the test
571 except KeyboardInterrupt:
572 pass
574 def console_interact(self) -> None:
576 Allows you to directly interact with QEMU's stdin, by forwarding
577 terminal input to the QEMU process.
578 This is for use with the interactive test driver, not for production
579 tests, which run unattended.
580 Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and
581 `Ctrl-d` closes console and returns to the test runner.
583 self.log("Terminal is ready (there is no prompt):")
585 assert self.process
586 assert self.process.stdin
588 while True:
589 try:
590 char = sys.stdin.buffer.read(1)
591 except KeyboardInterrupt:
592 break
593 if char == b"": # ctrl+d
594 self.log("Closing connection to the console")
595 break
596 self.send_console(char.decode())
598 def succeed(self, *commands: str, timeout: Optional[int] = None) -> str:
600 Execute a shell command, raising an exception if the exit status is
601 not zero, otherwise returning the standard output. Similar to `execute`,
602 except that the timeout is `None` by default. See `execute` for details on
603 command execution.
605 output = ""
606 for command in commands:
607 with self.nested(f"must succeed: {command}"):
608 (status, out) = self.execute(command, timeout=timeout)
609 if status != 0:
610 self.log(f"output: {out}")
611 raise Exception(f"command `{command}` failed (exit code {status})")
612 output += out
613 return output
615 def fail(self, *commands: str, timeout: Optional[int] = None) -> str:
617 Like `succeed`, but raising an exception if the command returns a zero
618 status.
620 output = ""
621 for command in commands:
622 with self.nested(f"must fail: {command}"):
623 (status, out) = self.execute(command, timeout=timeout)
624 if status == 0:
625 raise Exception(f"command `{command}` unexpectedly succeeded")
626 output += out
627 return output
629 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str:
631 Repeat a shell command with 1-second intervals until it succeeds.
632 Has a default timeout of 900 seconds which can be modified, e.g.
633 `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on
634 command execution.
635 Throws an exception on timeout.
637 output = ""
639 def check_success(_: Any) -> bool:
640 nonlocal output
641 status, output = self.execute(command, timeout=timeout)
642 return status == 0
644 with self.nested(f"waiting for success: {command}"):
645 retry(check_success, timeout)
646 return output
648 def wait_until_fails(self, command: str, timeout: int = 900) -> str:
650 Like `wait_until_succeeds`, but repeating the command until it fails.
652 output = ""
654 def check_failure(_: Any) -> bool:
655 nonlocal output
656 status, output = self.execute(command, timeout=timeout)
657 return status != 0
659 with self.nested(f"waiting for failure: {command}"):
660 retry(check_failure, timeout)
661 return output
663 def wait_for_shutdown(self) -> None:
664 if not self.booted:
665 return
667 with self.nested("waiting for the VM to power off"):
668 sys.stdout.flush()
669 assert self.process
670 self.process.wait()
672 self.pid = None
673 self.booted = False
674 self.connected = False
676 def wait_for_qmp_event(
677 self, event_filter: Callable[[dict[str, Any]], bool], timeout: int = 60 * 10
678 ) -> dict[str, Any]:
680 Wait for a QMP event which you can filter with the `event_filter` function.
681 The function takes as an input a dictionary of the event and if it returns True, we return that event,
682 if it does not, we wait for the next event and retry.
684 It will skip all events received in the meantime, if you want to keep them,
685 you have to do the bookkeeping yourself and store them somewhere.
687 By default, it will wait up to 10 minutes, `timeout` is in seconds.
689 if self.qmp_client is None:
690 raise RuntimeError("QMP API is not ready yet, is the VM ready?")
692 start = time.time()
693 while True:
694 evt = self.qmp_client.wait_for_event(timeout=timeout)
695 if event_filter(evt):
696 return evt
698 elapsed = time.time() - start
699 if elapsed >= timeout:
700 raise TimeoutError
702 def get_tty_text(self, tty: str) -> str:
703 status, output = self.execute(
704 f"fold -w$(stty -F /dev/tty{tty} size | "
705 f"awk '{{print $2}}') /dev/vcs{tty}"
707 return output
709 def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None:
710 """Wait until the visible output on the chosen TTY matches regular
711 expression. Throws an exception on timeout.
713 matcher = re.compile(regexp)
715 def tty_matches(last: bool) -> bool:
716 text = self.get_tty_text(tty)
717 if last:
718 self.log(
719 f"Last chance to match /{regexp}/ on TTY{tty}, "
720 f"which currently contains: {text}"
722 return len(matcher.findall(text)) > 0
724 with self.nested(f"waiting for {regexp} to appear on tty {tty}"):
725 retry(tty_matches, timeout)
727 def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None:
729 Simulate typing a sequence of characters on the virtual keyboard,
730 e.g., `send_chars("foobar\n")` will type the string `foobar`
731 followed by the Enter key.
733 with self.nested(f"sending keys {repr(chars)}"):
734 for char in chars:
735 self.send_key(char, delay, log=False)
737 def wait_for_file(self, filename: str, timeout: int = 900) -> None:
739 Waits until the file exists in the machine's file system.
742 def check_file(_: Any) -> bool:
743 status, _ = self.execute(f"test -e {filename}")
744 return status == 0
746 with self.nested(f"waiting for file '{filename}'"):
747 retry(check_file, timeout)
749 def wait_for_open_port(
750 self, port: int, addr: str = "localhost", timeout: int = 900
751 ) -> None:
753 Wait until a process is listening on the given TCP port and IP address
754 (default `localhost`).
757 def port_is_open(_: Any) -> bool:
758 status, _ = self.execute(f"nc -z {addr} {port}")
759 return status == 0
761 with self.nested(f"waiting for TCP port {port} on {addr}"):
762 retry(port_is_open, timeout)
764 def wait_for_open_unix_socket(
765 self, addr: str, is_datagram: bool = False, timeout: int = 900
766 ) -> None:
768 Wait until a process is listening on the given UNIX-domain socket
769 (default to a UNIX-domain stream socket).
772 nc_flags = [
773 "-z",
774 "-uU" if is_datagram else "-U",
777 def socket_is_open(_: Any) -> bool:
778 status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}")
779 return status == 0
781 with self.nested(
782 f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'"
784 retry(socket_is_open, timeout)
786 def wait_for_closed_port(
787 self, port: int, addr: str = "localhost", timeout: int = 900
788 ) -> None:
790 Wait until nobody is listening on the given TCP port and IP address
791 (default `localhost`).
794 def port_is_closed(_: Any) -> bool:
795 status, _ = self.execute(f"nc -z {addr} {port}")
796 return status != 0
798 with self.nested(f"waiting for TCP port {port} on {addr} to be closed"):
799 retry(port_is_closed, timeout)
801 def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
802 return self.systemctl(f"start {jobname}", user)
804 def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
805 return self.systemctl(f"stop {jobname}", user)
807 def wait_for_job(self, jobname: str) -> None:
808 self.wait_for_unit(jobname)
810 def connect(self) -> None:
811 def shell_ready(timeout_secs: int) -> bool:
812 """We sent some data from the backdoor service running on the guest
813 to indicate that the backdoor shell is ready.
814 As soon as we read some data from the socket here, we assume that
815 our root shell is operational.
817 (ready, _, _) = select.select([self.shell], [], [], timeout_secs)
818 return bool(ready)
820 if self.connected:
821 return
823 with self.nested("waiting for the VM to finish booting"):
824 self.start()
826 assert self.shell
828 tic = time.time()
829 # TODO: do we want to bail after a set number of attempts?
830 while not shell_ready(timeout_secs=30):
831 self.log("Guest root shell did not produce any data yet...")
832 self.log(
833 " To debug, enter the VM and run 'systemctl status backdoor.service'."
836 while True:
837 chunk = self.shell.recv(1024)
838 # No need to print empty strings, it means we are waiting.
839 if len(chunk) == 0:
840 continue
841 self.log(f"Guest shell says: {chunk!r}")
842 # NOTE: for this to work, nothing must be printed after this line!
843 if b"Spawning backdoor root shell..." in chunk:
844 break
846 toc = time.time()
848 self.log("connected to guest root shell")
849 self.log(f"(connecting took {toc - tic:.2f} seconds)")
850 self.connected = True
852 def screenshot(self, filename: str) -> None:
854 Take a picture of the display of the virtual machine, in PNG format.
855 The screenshot will be available in the derivation output.
857 if "." not in filename:
858 filename += ".png"
859 if "/" not in filename:
860 filename = os.path.join(self.out_dir, filename)
861 tmp = f"{filename}.ppm"
863 with self.nested(
864 f"making screenshot {filename}",
865 {"image": os.path.basename(filename)},
867 self.send_monitor_command(f"screendump {tmp}")
868 ret = subprocess.run(f"pnmtopng '{tmp}' > '{filename}'", shell=True)
869 os.unlink(tmp)
870 if ret.returncode != 0:
871 raise Exception("Cannot convert screenshot")
873 def copy_from_host_via_shell(self, source: str, target: str) -> None:
874 """Copy a file from the host into the guest by piping it over the
875 shell into the destination file. Works without host-guest shared folder.
876 Prefer copy_from_host for whenever possible.
878 with open(source, "rb") as fh:
879 content_b64 = base64.b64encode(fh.read()).decode()
880 self.succeed(
881 f"mkdir -p $(dirname {target})",
882 f"echo -n {content_b64} | base64 -d > {target}",
885 def copy_from_host(self, source: str, target: str) -> None:
887 Copies a file from host to machine, e.g.,
888 `copy_from_host("myfile", "/etc/my/important/file")`.
890 The first argument is the file on the host. Note that the "host" refers
891 to the environment in which the test driver runs, which is typically the
892 Nix build sandbox.
894 The second argument is the location of the file on the machine that will
895 be written to.
897 The file is copied via the `shared_dir` directory which is shared among
898 all the VMs (using a temporary directory).
899 The access rights bits will mimic the ones from the host file and
900 user:group will be root:root.
902 host_src = Path(source)
903 vm_target = Path(target)
904 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
905 shared_temp = Path(shared_td)
906 host_intermediate = shared_temp / host_src.name
907 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
908 vm_intermediate = vm_shared_temp / host_src.name
910 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
911 if host_src.is_dir():
912 shutil.copytree(host_src, host_intermediate)
913 else:
914 shutil.copy(host_src, host_intermediate)
915 self.succeed(make_command(["mkdir", "-p", vm_target.parent]))
916 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target]))
918 def copy_from_vm(self, source: str, target_dir: str = "") -> None:
919 """Copy a file from the VM (specified by an in-VM source path) to a path
920 relative to `$out`. The file is copied via the `shared_dir` shared among
921 all the VMs (using a temporary directory).
923 # Compute the source, target, and intermediate shared file names
924 vm_src = Path(source)
925 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
926 shared_temp = Path(shared_td)
927 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
928 vm_intermediate = vm_shared_temp / vm_src.name
929 intermediate = shared_temp / vm_src.name
930 # Copy the file to the shared directory inside VM
931 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
932 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate]))
933 abs_target = self.out_dir / target_dir / vm_src.name
934 abs_target.parent.mkdir(exist_ok=True, parents=True)
935 # Copy the file from the shared directory outside VM
936 if intermediate.is_dir():
937 shutil.copytree(intermediate, abs_target)
938 else:
939 shutil.copy(intermediate, abs_target)
941 def dump_tty_contents(self, tty: str) -> None:
942 """Debugging: Dump the contents of the TTY<n>"""
943 self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat")
945 def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]:
946 with tempfile.TemporaryDirectory() as tmpdir:
947 screenshot_path = os.path.join(tmpdir, "ppm")
948 self.send_monitor_command(f"screendump {screenshot_path}")
949 return _perform_ocr_on_screenshot(screenshot_path, model_ids)
951 def get_screen_text_variants(self) -> List[str]:
953 Return a list of different interpretations of what is currently
954 visible on the machine's screen using optical character
955 recognition. The number and order of the interpretations is not
956 specified and is subject to change, but if no exception is raised at
957 least one will be returned.
959 ::: {.note}
960 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
963 return self._get_screen_text_variants([0, 1, 2])
965 def get_screen_text(self) -> str:
967 Return a textual representation of what is currently visible on the
968 machine's screen using optical character recognition.
970 ::: {.note}
971 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
974 return self._get_screen_text_variants([2])[0]
976 def wait_for_text(self, regex: str, timeout: int = 900) -> None:
978 Wait until the supplied regular expressions matches the textual
979 contents of the screen by using optical character recognition (see
980 `get_screen_text` and `get_screen_text_variants`).
982 ::: {.note}
983 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
987 def screen_matches(last: bool) -> bool:
988 variants = self.get_screen_text_variants()
989 for text in variants:
990 if re.search(regex, text) is not None:
991 return True
993 if last:
994 self.log(f"Last OCR attempt failed. Text was: {variants}")
996 return False
998 with self.nested(f"waiting for {regex} to appear on screen"):
999 retry(screen_matches, timeout)
1001 def wait_for_console_text(self, regex: str, timeout: int | None = None) -> None:
1003 Wait until the supplied regular expressions match a line of the
1004 serial console output.
1005 This method is useful when OCR is not possible or inaccurate.
1007 # Buffer the console output, this is needed
1008 # to match multiline regexes.
1009 console = io.StringIO()
1011 def console_matches(_: Any) -> bool:
1012 nonlocal console
1013 try:
1014 # This will return as soon as possible and
1015 # sleep 1 second.
1016 console.write(self.last_lines.get(block=False))
1017 except queue.Empty:
1018 pass
1019 console.seek(0)
1020 matches = re.search(regex, console.read())
1021 return matches is not None
1023 with self.nested(f"waiting for {regex} to appear on console"):
1024 if timeout is not None:
1025 retry(console_matches, timeout)
1026 else:
1027 while not console_matches(False):
1028 pass
1030 def send_key(
1031 self, key: str, delay: Optional[float] = 0.01, log: Optional[bool] = True
1032 ) -> None:
1034 Simulate pressing keys on the virtual keyboard, e.g.,
1035 `send_key("ctrl-alt-delete")`.
1037 Please also refer to the QEMU documentation for more information on the
1038 input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys
1040 key = CHAR_TO_KEY.get(key, key)
1041 context = self.nested(f"sending key {repr(key)}") if log else nullcontext()
1042 with context:
1043 self.send_monitor_command(f"sendkey {key}")
1044 if delay is not None:
1045 time.sleep(delay)
1047 def send_console(self, chars: str) -> None:
1048 r"""
1049 Send keys to the kernel console. This allows interaction with the systemd
1050 emergency mode, for example. Takes a string that is sent, e.g.,
1051 `send_console("\n\nsystemctl default\n")`.
1053 assert self.process
1054 assert self.process.stdin
1055 self.process.stdin.write(chars.encode())
1056 self.process.stdin.flush()
1058 def start(self, allow_reboot: bool = False) -> None:
1060 Start the virtual machine. This method is asynchronous --- it does
1061 not wait for the machine to finish booting.
1063 if self.booted:
1064 return
1066 self.log("starting vm")
1068 def clear(path: Path) -> Path:
1069 if path.exists():
1070 path.unlink()
1071 return path
1073 def create_socket(path: Path) -> socket.socket:
1074 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
1075 s.bind(str(path))
1076 s.listen(1)
1077 return s
1079 monitor_socket = create_socket(clear(self.monitor_path))
1080 shell_socket = create_socket(clear(self.shell_path))
1081 self.process = self.start_command.run(
1082 self.state_dir,
1083 self.shared_dir,
1084 self.monitor_path,
1085 self.qmp_path,
1086 self.shell_path,
1087 allow_reboot,
1089 self.monitor, _ = monitor_socket.accept()
1090 self.shell, _ = shell_socket.accept()
1091 self.qmp_client = QMPSession.from_path(self.qmp_path)
1093 # Store last serial console lines for use
1094 # of wait_for_console_text
1095 self.last_lines: Queue = Queue()
1097 def process_serial_output() -> None:
1098 assert self.process
1099 assert self.process.stdout
1100 for _line in self.process.stdout:
1101 # Ignore undecodable bytes that may occur in boot menus
1102 line = _line.decode(errors="ignore").replace("\r", "").rstrip()
1103 self.last_lines.put(line)
1104 self.log_serial(line)
1106 self.serial_thread = threading.Thread(target=process_serial_output)
1107 self.serial_thread.start()
1109 self.wait_for_monitor_prompt()
1111 self.pid = self.process.pid
1112 self.booted = True
1114 self.log(f"QEMU running (pid {self.pid})")
1116 def cleanup_statedir(self) -> None:
1117 shutil.rmtree(self.state_dir)
1118 self.logger.log(f"deleting VM state directory {self.state_dir}")
1119 self.logger.log("if you want to keep the VM state, pass --keep-vm-state")
1121 def shutdown(self) -> None:
1123 Shut down the machine, waiting for the VM to exit.
1125 if not self.booted:
1126 return
1128 assert self.shell
1129 self.shell.send(b"poweroff\n")
1130 self.wait_for_shutdown()
1132 def crash(self) -> None:
1134 Simulate a sudden power failure, by telling the VM to exit immediately.
1136 if not self.booted:
1137 return
1139 self.log("forced crash")
1140 self.send_monitor_command("quit")
1141 self.wait_for_shutdown()
1143 def reboot(self) -> None:
1144 """Press Ctrl+Alt+Delete in the guest.
1146 Prepares the machine to be reconnected which is useful if the
1147 machine was started with `allow_reboot = True`
1149 self.send_key("ctrl-alt-delete")
1150 self.connected = False
1152 def wait_for_x(self, timeout: int = 900) -> None:
1154 Wait until it is possible to connect to the X server.
1157 def check_x(_: Any) -> bool:
1158 cmd = (
1159 "journalctl -b SYSLOG_IDENTIFIER=systemd | "
1160 + 'grep "Reached target Current graphical"'
1162 status, _ = self.execute(cmd)
1163 if status != 0:
1164 return False
1165 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
1166 return status == 0
1168 with self.nested("waiting for the X11 server"):
1169 retry(check_x, timeout)
1171 def get_window_names(self) -> List[str]:
1172 return self.succeed(
1173 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'"
1174 ).splitlines()
1176 def wait_for_window(self, regexp: str, timeout: int = 900) -> None:
1178 Wait until an X11 window has appeared whose name matches the given
1179 regular expression, e.g., `wait_for_window("Terminal")`.
1181 pattern = re.compile(regexp)
1183 def window_is_visible(last_try: bool) -> bool:
1184 names = self.get_window_names()
1185 if last_try:
1186 self.log(
1187 f"Last chance to match {regexp} on the window list,"
1188 + " which currently contains: "
1189 + ", ".join(names)
1191 return any(pattern.search(name) for name in names)
1193 with self.nested("waiting for a window to appear"):
1194 retry(window_is_visible, timeout)
1196 def sleep(self, secs: int) -> None:
1197 # We want to sleep in *guest* time, not *host* time.
1198 self.succeed(f"sleep {secs}")
1200 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None:
1202 Forward a TCP port on the host to a TCP port on the guest.
1203 Useful during interactive testing.
1205 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}")
1207 def block(self) -> None:
1209 Simulate unplugging the Ethernet cable that connects the machine to
1210 the other machines.
1211 This happens by shutting down eth1 (the multicast interface used to talk
1212 to the other VMs). eth0 is kept online to still enable the test driver
1213 to communicate with the machine.
1215 self.send_monitor_command("set_link virtio-net-pci.1 off")
1217 def unblock(self) -> None:
1219 Undo the effect of `block`.
1221 self.send_monitor_command("set_link virtio-net-pci.1 on")
1223 def release(self) -> None:
1224 if self.pid is None:
1225 return
1226 self.logger.info(f"kill machine (pid {self.pid})")
1227 assert self.process
1228 assert self.shell
1229 assert self.monitor
1230 assert self.serial_thread
1232 self.process.terminate()
1233 self.shell.close()
1234 self.monitor.close()
1235 self.serial_thread.join()
1237 def run_callbacks(self) -> None:
1238 for callback in self.callbacks:
1239 callback()
1241 def switch_root(self) -> None:
1243 Transition from stage 1 to stage 2. This requires the
1244 machine to be configured with `testing.initrdBackdoor = true`
1245 and `boot.initrd.systemd.enable = true`.
1247 self.wait_for_unit("initrd.target")
1248 self.execute(
1249 "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null",
1250 check_return=False,
1251 check_output=False,
1253 self.connected = False
1254 self.connect()