normcap: fix on GNOME wayland when used via keybind or alt-f2 (#351763)
[NixPkgs.git] / nixos / lib / test-driver / test_driver / machine.py
blobc423ad8a3fc07d246a7915364731bed22851d2a4
1 import base64
2 import io
3 import os
4 import queue
5 import re
6 import select
7 import shlex
8 import shutil
9 import socket
10 import subprocess
11 import sys
12 import tempfile
13 import threading
14 import time
15 from collections.abc import Callable, Iterable
16 from contextlib import _GeneratorContextManager, nullcontext
17 from pathlib import Path
18 from queue import Queue
19 from typing import Any
21 from test_driver.logger import AbstractLogger
23 from .qmp import QMPSession
25 CHAR_TO_KEY = {
26 "A": "shift-a",
27 "N": "shift-n",
28 "-": "0x0C",
29 "_": "shift-0x0C",
30 "B": "shift-b",
31 "O": "shift-o",
32 "=": "0x0D",
33 "+": "shift-0x0D",
34 "C": "shift-c",
35 "P": "shift-p",
36 "[": "0x1A",
37 "{": "shift-0x1A",
38 "D": "shift-d",
39 "Q": "shift-q",
40 "]": "0x1B",
41 "}": "shift-0x1B",
42 "E": "shift-e",
43 "R": "shift-r",
44 ";": "0x27",
45 ":": "shift-0x27",
46 "F": "shift-f",
47 "S": "shift-s",
48 "'": "0x28",
49 '"': "shift-0x28",
50 "G": "shift-g",
51 "T": "shift-t",
52 "`": "0x29",
53 "~": "shift-0x29",
54 "H": "shift-h",
55 "U": "shift-u",
56 "\\": "0x2B",
57 "|": "shift-0x2B",
58 "I": "shift-i",
59 "V": "shift-v",
60 ",": "0x33",
61 "<": "shift-0x33",
62 "J": "shift-j",
63 "W": "shift-w",
64 ".": "0x34",
65 ">": "shift-0x34",
66 "K": "shift-k",
67 "X": "shift-x",
68 "/": "0x35",
69 "?": "shift-0x35",
70 "L": "shift-l",
71 "Y": "shift-y",
72 " ": "spc",
73 "M": "shift-m",
74 "Z": "shift-z",
75 "\n": "ret",
76 "!": "shift-0x02",
77 "@": "shift-0x03",
78 "#": "shift-0x04",
79 "$": "shift-0x05",
80 "%": "shift-0x06",
81 "^": "shift-0x07",
82 "&": "shift-0x08",
83 "*": "shift-0x09",
84 "(": "shift-0x0A",
85 ")": "shift-0x0B",
89 def make_command(args: list) -> str:
90 return " ".join(map(shlex.quote, (map(str, args))))
93 def _perform_ocr_on_screenshot(
94 screenshot_path: str, model_ids: Iterable[int]
95 ) -> list[str]:
96 if shutil.which("tesseract") is None:
97 raise Exception("OCR requested but enableOCR is false")
99 magick_args = (
100 "-filter Catrom -density 72 -resample 300 "
101 + "-contrast -normalize -despeckle -type grayscale "
102 + "-sharpen 1 -posterize 3 -negate -gamma 100 "
103 + "-blur 1x65535"
106 tess_args = "-c debug_file=/dev/null --psm 11"
108 cmd = f"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'"
109 ret = subprocess.run(cmd, shell=True, capture_output=True)
110 if ret.returncode != 0:
111 raise Exception(f"TIFF conversion failed with exit code {ret.returncode}")
113 model_results = []
114 for model_id in model_ids:
115 cmd = f"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'"
116 ret = subprocess.run(cmd, shell=True, capture_output=True)
117 if ret.returncode != 0:
118 raise Exception(f"OCR failed with exit code {ret.returncode}")
119 model_results.append(ret.stdout.decode("utf-8"))
121 return model_results
124 def retry(fn: Callable, timeout: int = 900) -> None:
125 """Call the given function repeatedly, with 1 second intervals,
126 until it returns True or a timeout is reached.
129 for _ in range(timeout):
130 if fn(False):
131 return
132 time.sleep(1)
134 if not fn(True):
135 raise Exception(f"action timed out after {timeout} seconds")
138 class StartCommand:
139 """The Base Start Command knows how to append the necessary
140 runtime qemu options as determined by a particular test driver
141 run. Any such start command is expected to happily receive and
142 append additional qemu args.
145 _cmd: str
147 def cmd(
148 self,
149 monitor_socket_path: Path,
150 qmp_socket_path: Path,
151 shell_socket_path: Path,
152 allow_reboot: bool = False,
153 ) -> str:
154 display_opts = ""
155 display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"])
156 if not display_available:
157 display_opts += " -nographic"
159 # qemu options
160 qemu_opts = (
161 " -device virtio-serial"
162 # Note: virtconsole will map to /dev/hvc0 in Linux guests
163 " -device virtconsole,chardev=shell"
164 " -device virtio-rng-pci"
165 " -serial stdio"
167 if not allow_reboot:
168 qemu_opts += " -no-reboot"
170 return (
171 f"{self._cmd}"
172 f" -qmp unix:{qmp_socket_path},server=on,wait=off"
173 f" -monitor unix:{monitor_socket_path}"
174 f" -chardev socket,id=shell,path={shell_socket_path}"
175 f"{qemu_opts}"
176 f"{display_opts}"
179 @staticmethod
180 def build_environment(
181 state_dir: Path,
182 shared_dir: Path,
183 ) -> dict:
184 # We make a copy to not update the current environment
185 env = dict(os.environ)
186 env.update(
188 "TMPDIR": str(state_dir),
189 "SHARED_DIR": str(shared_dir),
190 "USE_TMPDIR": "1",
193 return env
195 def run(
196 self,
197 state_dir: Path,
198 shared_dir: Path,
199 monitor_socket_path: Path,
200 qmp_socket_path: Path,
201 shell_socket_path: Path,
202 allow_reboot: bool,
203 ) -> subprocess.Popen:
204 return subprocess.Popen(
205 self.cmd(
206 monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot
208 stdin=subprocess.PIPE,
209 stdout=subprocess.PIPE,
210 shell=True,
211 cwd=state_dir,
212 env=self.build_environment(state_dir, shared_dir),
216 class NixStartScript(StartCommand):
217 """A start script from nixos/modules/virtualiation/qemu-vm.nix
218 that also satisfies the requirement of the BaseStartCommand.
219 These Nix commands have the particular characteristic that the
220 machine name can be extracted out of them via a regex match.
221 (Admittedly a _very_ implicit contract, evtl. TODO fix)
224 def __init__(self, script: str):
225 self._cmd = script
227 @property
228 def machine_name(self) -> str:
229 match = re.search("run-(.+)-vm$", self._cmd)
230 name = "machine"
231 if match:
232 name = match.group(1)
233 return name
236 class Machine:
237 """A handle to the machine with this name, that also knows how to manage
238 the machine lifecycle with the help of a start script / command."""
240 name: str
241 out_dir: Path
242 tmp_dir: Path
243 shared_dir: Path
244 state_dir: Path
245 monitor_path: Path
246 qmp_path: Path
247 shell_path: Path
249 start_command: StartCommand
250 keep_vm_state: bool
252 process: subprocess.Popen | None
253 pid: int | None
254 monitor: socket.socket | None
255 qmp_client: QMPSession | None
256 shell: socket.socket | None
257 serial_thread: threading.Thread | None
259 booted: bool
260 connected: bool
261 # Store last serial console lines for use
262 # of wait_for_console_text
263 last_lines: Queue = Queue()
264 callbacks: list[Callable]
266 def __repr__(self) -> str:
267 return f"<Machine '{self.name}'>"
269 def __init__(
270 self,
271 out_dir: Path,
272 tmp_dir: Path,
273 start_command: StartCommand,
274 logger: AbstractLogger,
275 name: str = "machine",
276 keep_vm_state: bool = False,
277 callbacks: list[Callable] | None = None,
278 ) -> None:
279 self.out_dir = out_dir
280 self.tmp_dir = tmp_dir
281 self.keep_vm_state = keep_vm_state
282 self.name = name
283 self.start_command = start_command
284 self.callbacks = callbacks if callbacks is not None else []
285 self.logger = logger
287 # set up directories
288 self.shared_dir = self.tmp_dir / "shared-xchg"
289 self.shared_dir.mkdir(mode=0o700, exist_ok=True)
291 self.state_dir = self.tmp_dir / f"vm-state-{self.name}"
292 self.monitor_path = self.state_dir / "monitor"
293 self.qmp_path = self.state_dir / "qmp"
294 self.shell_path = self.state_dir / "shell"
295 if (not self.keep_vm_state) and self.state_dir.exists():
296 self.cleanup_statedir()
297 self.state_dir.mkdir(mode=0o700, exist_ok=True)
299 self.process = None
300 self.pid = None
301 self.monitor = None
302 self.qmp_client = None
303 self.shell = None
304 self.serial_thread = None
306 self.booted = False
307 self.connected = False
309 def is_up(self) -> bool:
310 return self.booted and self.connected
312 def log(self, msg: str) -> None:
313 self.logger.log(msg, {"machine": self.name})
315 def log_serial(self, msg: str) -> None:
316 self.logger.log_serial(msg, self.name)
318 def nested(self, msg: str, attrs: dict[str, str] = {}) -> _GeneratorContextManager:
319 my_attrs = {"machine": self.name}
320 my_attrs.update(attrs)
321 return self.logger.nested(msg, my_attrs)
323 def wait_for_monitor_prompt(self) -> str:
324 assert self.monitor is not None
325 answer = ""
326 while True:
327 undecoded_answer = self.monitor.recv(1024)
328 if not undecoded_answer:
329 break
330 answer += undecoded_answer.decode()
331 if answer.endswith("(qemu) "):
332 break
333 return answer
335 def send_monitor_command(self, command: str) -> str:
337 Send a command to the QEMU monitor. This allows attaching
338 virtual USB disks to a running machine, among other things.
340 self.run_callbacks()
341 message = f"{command}\n".encode()
342 assert self.monitor is not None
343 self.monitor.send(message)
344 return self.wait_for_monitor_prompt()
346 def wait_for_unit(
347 self, unit: str, user: str | None = None, timeout: int = 900
348 ) -> None:
350 Wait for a systemd unit to get into "active" state.
351 Throws exceptions on "failed" and "inactive" states as well as after
352 timing out.
355 def check_active(_: Any) -> bool:
356 state = self.get_unit_property(unit, "ActiveState", user)
357 if state == "failed":
358 raise Exception(f'unit "{unit}" reached state "{state}"')
360 if state == "inactive":
361 status, jobs = self.systemctl("list-jobs --full 2>&1", user)
362 if "No jobs" in jobs:
363 info = self.get_unit_info(unit, user)
364 if info["ActiveState"] == state:
365 raise Exception(
366 f'unit "{unit}" is inactive and there are no pending jobs'
369 return state == "active"
371 with self.nested(
372 f"waiting for unit {unit}"
373 + (f" with user {user}" if user is not None else "")
375 retry(check_active, timeout)
377 def get_unit_info(self, unit: str, user: str | None = None) -> dict[str, str]:
378 status, lines = self.systemctl(f'--no-pager show "{unit}"', user)
379 if status != 0:
380 raise Exception(
381 f'retrieving systemctl info for unit "{unit}"'
382 + ("" if user is None else f' under user "{user}"')
383 + f" failed with exit code {status}"
386 line_pattern = re.compile(r"^([^=]+)=(.*)$")
388 def tuple_from_line(line: str) -> tuple[str, str]:
389 match = line_pattern.match(line)
390 assert match is not None
391 return match[1], match[2]
393 return dict(
394 tuple_from_line(line)
395 for line in lines.split("\n")
396 if line_pattern.match(line)
399 def get_unit_property(
400 self,
401 unit: str,
402 property: str,
403 user: str | None = None,
404 ) -> str:
405 status, lines = self.systemctl(
406 f'--no-pager show "{unit}" --property="{property}"',
407 user,
409 if status != 0:
410 raise Exception(
411 f'retrieving systemctl property "{property}" for unit "{unit}"'
412 + ("" if user is None else f' under user "{user}"')
413 + f" failed with exit code {status}"
416 invalid_output_message = (
417 f'systemctl show --property "{property}" "{unit}"'
418 f"produced invalid output: {lines}"
421 line_pattern = re.compile(r"^([^=]+)=(.*)$")
422 match = line_pattern.match(lines)
423 assert match is not None, invalid_output_message
425 assert match[1] == property, invalid_output_message
426 return match[2]
428 def systemctl(self, q: str, user: str | None = None) -> tuple[int, str]:
430 Runs `systemctl` commands with optional support for
431 `systemctl --user`
433 ```py
434 # run `systemctl list-jobs --no-pager`
435 machine.systemctl("list-jobs --no-pager")
437 # spawn a shell for `any-user` and run
438 # `systemctl --user list-jobs --no-pager`
439 machine.systemctl("list-jobs --no-pager", "any-user")
442 if user is not None:
443 q = q.replace("'", "\\'")
444 return self.execute(
445 f"su -l {user} --shell /bin/sh -c "
446 "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
447 f"systemctl --user {q}'"
449 return self.execute(f"systemctl {q}")
451 def require_unit_state(self, unit: str, require_state: str = "active") -> None:
452 with self.nested(
453 f"checking if unit '{unit}' has reached state '{require_state}'"
455 info = self.get_unit_info(unit)
456 state = info["ActiveState"]
457 if state != require_state:
458 raise Exception(
459 f"Expected unit '{unit}' to to be in state "
460 f"'{require_state}' but it is in state '{state}'"
463 def _next_newline_closed_block_from_shell(self) -> str:
464 assert self.shell
465 output_buffer = []
466 while True:
467 # This receives up to 4096 bytes from the socket
468 chunk = self.shell.recv(4096)
469 if not chunk:
470 # Probably a broken pipe, return the output we have
471 break
473 decoded = chunk.decode()
474 output_buffer += [decoded]
475 if decoded[-1] == "\n":
476 break
477 return "".join(output_buffer)
479 def execute(
480 self,
481 command: str,
482 check_return: bool = True,
483 check_output: bool = True,
484 timeout: int | None = 900,
485 ) -> tuple[int, str]:
487 Execute a shell command, returning a list `(status, stdout)`.
489 Commands are run with `set -euo pipefail` set:
491 - If several commands are separated by `;` and one fails, the
492 command as a whole will fail.
494 - For pipelines, the last non-zero exit status will be returned
495 (if there is one; otherwise zero will be returned).
497 - Dereferencing unset variables fails the command.
499 - It will wait for stdout to be closed.
501 If the command detaches, it must close stdout, as `execute` will wait
502 for this to consume all output reliably. This can be achieved by
503 redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or
504 a file. Examples of detaching commands are `sleep 365d &`, where the
505 shell forks a new process that can write to stdout and `xclip -i`, where
506 the `xclip` command itself forks without closing stdout.
508 Takes an optional parameter `check_return` that defaults to `True`.
509 Setting this parameter to `False` will not check for the return code
510 and return -1 instead. This can be used for commands that shut down
511 the VM and would therefore break the pipe that would be used for
512 retrieving the return code.
514 A timeout for the command can be specified (in seconds) using the optional
515 `timeout` parameter, e.g., `execute(cmd, timeout=10)` or
516 `execute(cmd, timeout=None)`. The default is 900 seconds.
518 self.run_callbacks()
519 self.connect()
521 # Always run command with shell opts
522 command = f"set -euo pipefail; {command}"
524 timeout_str = ""
525 if timeout is not None:
526 timeout_str = f"timeout {timeout}"
528 # While sh is bash on NixOS, this is not the case for every distro.
529 # We explicitly call bash here to allow for the driver to boot other distros as well.
530 out_command = (
531 f"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n"
534 assert self.shell
535 self.shell.send(out_command.encode())
537 if not check_output:
538 return (-2, "")
540 # Get the output
541 output = base64.b64decode(self._next_newline_closed_block_from_shell())
543 if not check_return:
544 return (-1, output.decode())
546 # Get the return code
547 self.shell.send(b"echo ${PIPESTATUS[0]}\n")
548 rc = int(self._next_newline_closed_block_from_shell().strip())
550 return (rc, output.decode(errors="replace"))
552 def shell_interact(self, address: str | None = None) -> None:
554 Allows you to directly interact with the guest shell. This should
555 only be used during test development, not in production tests.
556 Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends
557 the guest session.
559 self.connect()
561 if address is None:
562 address = "READLINE,prompt=$ "
563 self.log("Terminal is ready (there is no initial prompt):")
565 assert self.shell
566 try:
567 subprocess.run(
568 ["socat", address, f"FD:{self.shell.fileno()}"],
569 pass_fds=[self.shell.fileno()],
571 # allow users to cancel this command without breaking the test
572 except KeyboardInterrupt:
573 pass
575 def console_interact(self) -> None:
577 Allows you to directly interact with QEMU's stdin, by forwarding
578 terminal input to the QEMU process.
579 This is for use with the interactive test driver, not for production
580 tests, which run unattended.
581 Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and
582 `Ctrl-d` closes console and returns to the test runner.
584 self.log("Terminal is ready (there is no prompt):")
586 assert self.process
587 assert self.process.stdin
589 while True:
590 try:
591 char = sys.stdin.buffer.read(1)
592 except KeyboardInterrupt:
593 break
594 if char == b"": # ctrl+d
595 self.log("Closing connection to the console")
596 break
597 self.send_console(char.decode())
599 def succeed(self, *commands: str, timeout: int | None = None) -> str:
601 Execute a shell command, raising an exception if the exit status is
602 not zero, otherwise returning the standard output. Similar to `execute`,
603 except that the timeout is `None` by default. See `execute` for details on
604 command execution.
606 output = ""
607 for command in commands:
608 with self.nested(f"must succeed: {command}"):
609 (status, out) = self.execute(command, timeout=timeout)
610 if status != 0:
611 self.log(f"output: {out}")
612 raise Exception(f"command `{command}` failed (exit code {status})")
613 output += out
614 return output
616 def fail(self, *commands: str, timeout: int | None = None) -> str:
618 Like `succeed`, but raising an exception if the command returns a zero
619 status.
621 output = ""
622 for command in commands:
623 with self.nested(f"must fail: {command}"):
624 (status, out) = self.execute(command, timeout=timeout)
625 if status == 0:
626 raise Exception(f"command `{command}` unexpectedly succeeded")
627 output += out
628 return output
630 def wait_until_succeeds(self, command: str, timeout: int = 900) -> str:
632 Repeat a shell command with 1-second intervals until it succeeds.
633 Has a default timeout of 900 seconds which can be modified, e.g.
634 `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on
635 command execution.
636 Throws an exception on timeout.
638 output = ""
640 def check_success(_: Any) -> bool:
641 nonlocal output
642 status, output = self.execute(command, timeout=timeout)
643 return status == 0
645 with self.nested(f"waiting for success: {command}"):
646 retry(check_success, timeout)
647 return output
649 def wait_until_fails(self, command: str, timeout: int = 900) -> str:
651 Like `wait_until_succeeds`, but repeating the command until it fails.
653 output = ""
655 def check_failure(_: Any) -> bool:
656 nonlocal output
657 status, output = self.execute(command, timeout=timeout)
658 return status != 0
660 with self.nested(f"waiting for failure: {command}"):
661 retry(check_failure, timeout)
662 return output
664 def wait_for_shutdown(self) -> None:
665 if not self.booted:
666 return
668 with self.nested("waiting for the VM to power off"):
669 sys.stdout.flush()
670 assert self.process
671 self.process.wait()
673 self.pid = None
674 self.booted = False
675 self.connected = False
677 def wait_for_qmp_event(
678 self, event_filter: Callable[[dict[str, Any]], bool], timeout: int = 60 * 10
679 ) -> dict[str, Any]:
681 Wait for a QMP event which you can filter with the `event_filter` function.
682 The function takes as an input a dictionary of the event and if it returns True, we return that event,
683 if it does not, we wait for the next event and retry.
685 It will skip all events received in the meantime, if you want to keep them,
686 you have to do the bookkeeping yourself and store them somewhere.
688 By default, it will wait up to 10 minutes, `timeout` is in seconds.
690 if self.qmp_client is None:
691 raise RuntimeError("QMP API is not ready yet, is the VM ready?")
693 start = time.time()
694 while True:
695 evt = self.qmp_client.wait_for_event(timeout=timeout)
696 if event_filter(evt):
697 return evt
699 elapsed = time.time() - start
700 if elapsed >= timeout:
701 raise TimeoutError
703 def get_tty_text(self, tty: str) -> str:
704 status, output = self.execute(
705 f"fold -w$(stty -F /dev/tty{tty} size | "
706 f"awk '{{print $2}}') /dev/vcs{tty}"
708 return output
710 def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None:
711 """Wait until the visible output on the chosen TTY matches regular
712 expression. Throws an exception on timeout.
714 matcher = re.compile(regexp)
716 def tty_matches(last: bool) -> bool:
717 text = self.get_tty_text(tty)
718 if last:
719 self.log(
720 f"Last chance to match /{regexp}/ on TTY{tty}, "
721 f"which currently contains: {text}"
723 return len(matcher.findall(text)) > 0
725 with self.nested(f"waiting for {regexp} to appear on tty {tty}"):
726 retry(tty_matches, timeout)
728 def send_chars(self, chars: str, delay: float | None = 0.01) -> None:
730 Simulate typing a sequence of characters on the virtual keyboard,
731 e.g., `send_chars("foobar\n")` will type the string `foobar`
732 followed by the Enter key.
734 with self.nested(f"sending keys {repr(chars)}"):
735 for char in chars:
736 self.send_key(char, delay, log=False)
738 def wait_for_file(self, filename: str, timeout: int = 900) -> None:
740 Waits until the file exists in the machine's file system.
743 def check_file(_: Any) -> bool:
744 status, _ = self.execute(f"test -e {filename}")
745 return status == 0
747 with self.nested(f"waiting for file '{filename}'"):
748 retry(check_file, timeout)
750 def wait_for_open_port(
751 self, port: int, addr: str = "localhost", timeout: int = 900
752 ) -> None:
754 Wait until a process is listening on the given TCP port and IP address
755 (default `localhost`).
758 def port_is_open(_: Any) -> bool:
759 status, _ = self.execute(f"nc -z {addr} {port}")
760 return status == 0
762 with self.nested(f"waiting for TCP port {port} on {addr}"):
763 retry(port_is_open, timeout)
765 def wait_for_open_unix_socket(
766 self, addr: str, is_datagram: bool = False, timeout: int = 900
767 ) -> None:
769 Wait until a process is listening on the given UNIX-domain socket
770 (default to a UNIX-domain stream socket).
773 nc_flags = [
774 "-z",
775 "-uU" if is_datagram else "-U",
778 def socket_is_open(_: Any) -> bool:
779 status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}")
780 return status == 0
782 with self.nested(
783 f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'"
785 retry(socket_is_open, timeout)
787 def wait_for_closed_port(
788 self, port: int, addr: str = "localhost", timeout: int = 900
789 ) -> None:
791 Wait until nobody is listening on the given TCP port and IP address
792 (default `localhost`).
795 def port_is_closed(_: Any) -> bool:
796 status, _ = self.execute(f"nc -z {addr} {port}")
797 return status != 0
799 with self.nested(f"waiting for TCP port {port} on {addr} to be closed"):
800 retry(port_is_closed, timeout)
802 def start_job(self, jobname: str, user: str | None = None) -> tuple[int, str]:
803 return self.systemctl(f"start {jobname}", user)
805 def stop_job(self, jobname: str, user: str | None = None) -> tuple[int, str]:
806 return self.systemctl(f"stop {jobname}", user)
808 def wait_for_job(self, jobname: str) -> None:
809 self.wait_for_unit(jobname)
811 def connect(self) -> None:
812 def shell_ready(timeout_secs: int) -> bool:
813 """We sent some data from the backdoor service running on the guest
814 to indicate that the backdoor shell is ready.
815 As soon as we read some data from the socket here, we assume that
816 our root shell is operational.
818 (ready, _, _) = select.select([self.shell], [], [], timeout_secs)
819 return bool(ready)
821 if self.connected:
822 return
824 with self.nested("waiting for the VM to finish booting"):
825 self.start()
827 assert self.shell
829 tic = time.time()
830 # TODO: do we want to bail after a set number of attempts?
831 while not shell_ready(timeout_secs=30):
832 self.log("Guest root shell did not produce any data yet...")
833 self.log(
834 " To debug, enter the VM and run 'systemctl status backdoor.service'."
837 while True:
838 chunk = self.shell.recv(1024)
839 # No need to print empty strings, it means we are waiting.
840 if len(chunk) == 0:
841 continue
842 self.log(f"Guest shell says: {chunk!r}")
843 # NOTE: for this to work, nothing must be printed after this line!
844 if b"Spawning backdoor root shell..." in chunk:
845 break
847 toc = time.time()
849 self.log("connected to guest root shell")
850 self.log(f"(connecting took {toc - tic:.2f} seconds)")
851 self.connected = True
853 def screenshot(self, filename: str) -> None:
855 Take a picture of the display of the virtual machine, in PNG format.
856 The screenshot will be available in the derivation output.
858 if "." not in filename:
859 filename += ".png"
860 if "/" not in filename:
861 filename = os.path.join(self.out_dir, filename)
862 tmp = f"{filename}.ppm"
864 with self.nested(
865 f"making screenshot {filename}",
866 {"image": os.path.basename(filename)},
868 self.send_monitor_command(f"screendump {tmp}")
869 ret = subprocess.run(f"pnmtopng '{tmp}' > '{filename}'", shell=True)
870 os.unlink(tmp)
871 if ret.returncode != 0:
872 raise Exception("Cannot convert screenshot")
874 def copy_from_host_via_shell(self, source: str, target: str) -> None:
875 """Copy a file from the host into the guest by piping it over the
876 shell into the destination file. Works without host-guest shared folder.
877 Prefer copy_from_host for whenever possible.
879 with open(source, "rb") as fh:
880 content_b64 = base64.b64encode(fh.read()).decode()
881 self.succeed(
882 f"mkdir -p $(dirname {target})",
883 f"echo -n {content_b64} | base64 -d > {target}",
886 def copy_from_host(self, source: str, target: str) -> None:
888 Copies a file from host to machine, e.g.,
889 `copy_from_host("myfile", "/etc/my/important/file")`.
891 The first argument is the file on the host. Note that the "host" refers
892 to the environment in which the test driver runs, which is typically the
893 Nix build sandbox.
895 The second argument is the location of the file on the machine that will
896 be written to.
898 The file is copied via the `shared_dir` directory which is shared among
899 all the VMs (using a temporary directory).
900 The access rights bits will mimic the ones from the host file and
901 user:group will be root:root.
903 host_src = Path(source)
904 vm_target = Path(target)
905 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
906 shared_temp = Path(shared_td)
907 host_intermediate = shared_temp / host_src.name
908 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
909 vm_intermediate = vm_shared_temp / host_src.name
911 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
912 if host_src.is_dir():
913 shutil.copytree(host_src, host_intermediate)
914 else:
915 shutil.copy(host_src, host_intermediate)
916 self.succeed(make_command(["mkdir", "-p", vm_target.parent]))
917 self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target]))
919 def copy_from_vm(self, source: str, target_dir: str = "") -> None:
920 """Copy a file from the VM (specified by an in-VM source path) to a path
921 relative to `$out`. The file is copied via the `shared_dir` shared among
922 all the VMs (using a temporary directory).
924 # Compute the source, target, and intermediate shared file names
925 vm_src = Path(source)
926 with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td:
927 shared_temp = Path(shared_td)
928 vm_shared_temp = Path("/tmp/shared") / shared_temp.name
929 vm_intermediate = vm_shared_temp / vm_src.name
930 intermediate = shared_temp / vm_src.name
931 # Copy the file to the shared directory inside VM
932 self.succeed(make_command(["mkdir", "-p", vm_shared_temp]))
933 self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate]))
934 abs_target = self.out_dir / target_dir / vm_src.name
935 abs_target.parent.mkdir(exist_ok=True, parents=True)
936 # Copy the file from the shared directory outside VM
937 if intermediate.is_dir():
938 shutil.copytree(intermediate, abs_target)
939 else:
940 shutil.copy(intermediate, abs_target)
942 def dump_tty_contents(self, tty: str) -> None:
943 """Debugging: Dump the contents of the TTY<n>"""
944 self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat")
946 def _get_screen_text_variants(self, model_ids: Iterable[int]) -> list[str]:
947 with tempfile.TemporaryDirectory() as tmpdir:
948 screenshot_path = os.path.join(tmpdir, "ppm")
949 self.send_monitor_command(f"screendump {screenshot_path}")
950 return _perform_ocr_on_screenshot(screenshot_path, model_ids)
952 def get_screen_text_variants(self) -> list[str]:
954 Return a list of different interpretations of what is currently
955 visible on the machine's screen using optical character
956 recognition. The number and order of the interpretations is not
957 specified and is subject to change, but if no exception is raised at
958 least one will be returned.
960 ::: {.note}
961 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
964 return self._get_screen_text_variants([0, 1, 2])
966 def get_screen_text(self) -> str:
968 Return a textual representation of what is currently visible on the
969 machine's screen using optical character recognition.
971 ::: {.note}
972 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
975 return self._get_screen_text_variants([2])[0]
977 def wait_for_text(self, regex: str, timeout: int = 900) -> None:
979 Wait until the supplied regular expressions matches the textual
980 contents of the screen by using optical character recognition (see
981 `get_screen_text` and `get_screen_text_variants`).
983 ::: {.note}
984 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
988 def screen_matches(last: bool) -> bool:
989 variants = self.get_screen_text_variants()
990 for text in variants:
991 if re.search(regex, text) is not None:
992 return True
994 if last:
995 self.log(f"Last OCR attempt failed. Text was: {variants}")
997 return False
999 with self.nested(f"waiting for {regex} to appear on screen"):
1000 retry(screen_matches, timeout)
1002 def wait_for_console_text(self, regex: str, timeout: int | None = None) -> None:
1004 Wait until the supplied regular expressions match a line of the
1005 serial console output.
1006 This method is useful when OCR is not possible or inaccurate.
1008 # Buffer the console output, this is needed
1009 # to match multiline regexes.
1010 console = io.StringIO()
1012 def console_matches(_: Any) -> bool:
1013 nonlocal console
1014 try:
1015 # This will return as soon as possible and
1016 # sleep 1 second.
1017 console.write(self.last_lines.get(block=False))
1018 except queue.Empty:
1019 pass
1020 console.seek(0)
1021 matches = re.search(regex, console.read())
1022 return matches is not None
1024 with self.nested(f"waiting for {regex} to appear on console"):
1025 if timeout is not None:
1026 retry(console_matches, timeout)
1027 else:
1028 while not console_matches(False):
1029 pass
1031 def send_key(
1032 self, key: str, delay: float | None = 0.01, log: bool | None = True
1033 ) -> None:
1035 Simulate pressing keys on the virtual keyboard, e.g.,
1036 `send_key("ctrl-alt-delete")`.
1038 Please also refer to the QEMU documentation for more information on the
1039 input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys
1041 key = CHAR_TO_KEY.get(key, key)
1042 context = self.nested(f"sending key {repr(key)}") if log else nullcontext()
1043 with context:
1044 self.send_monitor_command(f"sendkey {key}")
1045 if delay is not None:
1046 time.sleep(delay)
1048 def send_console(self, chars: str) -> None:
1049 r"""
1050 Send keys to the kernel console. This allows interaction with the systemd
1051 emergency mode, for example. Takes a string that is sent, e.g.,
1052 `send_console("\n\nsystemctl default\n")`.
1054 assert self.process
1055 assert self.process.stdin
1056 self.process.stdin.write(chars.encode())
1057 self.process.stdin.flush()
1059 def start(self, allow_reboot: bool = False) -> None:
1061 Start the virtual machine. This method is asynchronous --- it does
1062 not wait for the machine to finish booting.
1064 if self.booted:
1065 return
1067 self.log("starting vm")
1069 def clear(path: Path) -> Path:
1070 if path.exists():
1071 path.unlink()
1072 return path
1074 def create_socket(path: Path) -> socket.socket:
1075 s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
1076 s.bind(str(path))
1077 s.listen(1)
1078 return s
1080 monitor_socket = create_socket(clear(self.monitor_path))
1081 shell_socket = create_socket(clear(self.shell_path))
1082 self.process = self.start_command.run(
1083 self.state_dir,
1084 self.shared_dir,
1085 self.monitor_path,
1086 self.qmp_path,
1087 self.shell_path,
1088 allow_reboot,
1090 self.monitor, _ = monitor_socket.accept()
1091 self.shell, _ = shell_socket.accept()
1092 self.qmp_client = QMPSession.from_path(self.qmp_path)
1094 # Store last serial console lines for use
1095 # of wait_for_console_text
1096 self.last_lines: Queue = Queue()
1098 def process_serial_output() -> None:
1099 assert self.process
1100 assert self.process.stdout
1101 for _line in self.process.stdout:
1102 # Ignore undecodable bytes that may occur in boot menus
1103 line = _line.decode(errors="ignore").replace("\r", "").rstrip()
1104 self.last_lines.put(line)
1105 self.log_serial(line)
1107 self.serial_thread = threading.Thread(target=process_serial_output)
1108 self.serial_thread.start()
1110 self.wait_for_monitor_prompt()
1112 self.pid = self.process.pid
1113 self.booted = True
1115 self.log(f"QEMU running (pid {self.pid})")
1117 def cleanup_statedir(self) -> None:
1118 shutil.rmtree(self.state_dir)
1119 self.logger.log(f"deleting VM state directory {self.state_dir}")
1120 self.logger.log("if you want to keep the VM state, pass --keep-vm-state")
1122 def shutdown(self) -> None:
1124 Shut down the machine, waiting for the VM to exit.
1126 if not self.booted:
1127 return
1129 assert self.shell
1130 self.shell.send(b"poweroff\n")
1131 self.wait_for_shutdown()
1133 def crash(self) -> None:
1135 Simulate a sudden power failure, by telling the VM to exit immediately.
1137 if not self.booted:
1138 return
1140 self.log("forced crash")
1141 self.send_monitor_command("quit")
1142 self.wait_for_shutdown()
1144 def reboot(self) -> None:
1145 """Press Ctrl+Alt+Delete in the guest.
1147 Prepares the machine to be reconnected which is useful if the
1148 machine was started with `allow_reboot = True`
1150 self.send_key("ctrl-alt-delete")
1151 self.connected = False
1153 def wait_for_x(self, timeout: int = 900) -> None:
1155 Wait until it is possible to connect to the X server.
1158 def check_x(_: Any) -> bool:
1159 cmd = (
1160 "journalctl -b SYSLOG_IDENTIFIER=systemd | "
1161 + 'grep "Reached target Current graphical"'
1163 status, _ = self.execute(cmd)
1164 if status != 0:
1165 return False
1166 status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
1167 return status == 0
1169 with self.nested("waiting for the X11 server"):
1170 retry(check_x, timeout)
1172 def get_window_names(self) -> list[str]:
1173 return self.succeed(
1174 r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'"
1175 ).splitlines()
1177 def wait_for_window(self, regexp: str, timeout: int = 900) -> None:
1179 Wait until an X11 window has appeared whose name matches the given
1180 regular expression, e.g., `wait_for_window("Terminal")`.
1182 pattern = re.compile(regexp)
1184 def window_is_visible(last_try: bool) -> bool:
1185 names = self.get_window_names()
1186 if last_try:
1187 self.log(
1188 f"Last chance to match {regexp} on the window list,"
1189 + " which currently contains: "
1190 + ", ".join(names)
1192 return any(pattern.search(name) for name in names)
1194 with self.nested("waiting for a window to appear"):
1195 retry(window_is_visible, timeout)
1197 def sleep(self, secs: int) -> None:
1198 # We want to sleep in *guest* time, not *host* time.
1199 self.succeed(f"sleep {secs}")
1201 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None:
1203 Forward a TCP port on the host to a TCP port on the guest.
1204 Useful during interactive testing.
1206 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}")
1208 def block(self) -> None:
1210 Simulate unplugging the Ethernet cable that connects the machine to
1211 the other machines.
1212 This happens by shutting down eth1 (the multicast interface used to talk
1213 to the other VMs). eth0 is kept online to still enable the test driver
1214 to communicate with the machine.
1216 self.send_monitor_command("set_link virtio-net-pci.1 off")
1218 def unblock(self) -> None:
1220 Undo the effect of `block`.
1222 self.send_monitor_command("set_link virtio-net-pci.1 on")
1224 def release(self) -> None:
1225 if self.pid is None:
1226 return
1227 self.logger.info(f"kill machine (pid {self.pid})")
1228 assert self.process
1229 assert self.shell
1230 assert self.monitor
1231 assert self.serial_thread
1233 self.process.terminate()
1234 self.shell.close()
1235 self.monitor.close()
1236 self.serial_thread.join()
1238 if self.qmp_client:
1239 self.qmp_client.close()
1241 def run_callbacks(self) -> None:
1242 for callback in self.callbacks:
1243 callback()
1245 def switch_root(self) -> None:
1247 Transition from stage 1 to stage 2. This requires the
1248 machine to be configured with `testing.initrdBackdoor = true`
1249 and `boot.initrd.systemd.enable = true`.
1251 self.wait_for_unit("initrd.target")
1252 self.execute(
1253 "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null",
1254 check_return=False,
1255 check_output=False,
1257 self.connected = False
1258 self.connect()