15 from collections
.abc
import Callable
, Iterable
16 from contextlib
import _GeneratorContextManager
, nullcontext
17 from pathlib
import Path
18 from queue
import Queue
19 from typing
import Any
21 from test_driver
.logger
import AbstractLogger
23 from .qmp
import QMPSession
89 def make_command(args
: list) -> str:
90 return " ".join(map(shlex
.quote
, (map(str, args
))))
93 def _perform_ocr_on_screenshot(
94 screenshot_path
: str, model_ids
: Iterable
[int]
96 if shutil
.which("tesseract") is None:
97 raise Exception("OCR requested but enableOCR is false")
100 "-filter Catrom -density 72 -resample 300 "
101 + "-contrast -normalize -despeckle -type grayscale "
102 + "-sharpen 1 -posterize 3 -negate -gamma 100 "
106 tess_args
= "-c debug_file=/dev/null --psm 11"
108 cmd
= f
"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'"
109 ret
= subprocess
.run(cmd
, shell
=True, capture_output
=True)
110 if ret
.returncode
!= 0:
111 raise Exception(f
"TIFF conversion failed with exit code {ret.returncode}")
114 for model_id
in model_ids
:
115 cmd
= f
"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'"
116 ret
= subprocess
.run(cmd
, shell
=True, capture_output
=True)
117 if ret
.returncode
!= 0:
118 raise Exception(f
"OCR failed with exit code {ret.returncode}")
119 model_results
.append(ret
.stdout
.decode("utf-8"))
124 def retry(fn
: Callable
, timeout
: int = 900) -> None:
125 """Call the given function repeatedly, with 1 second intervals,
126 until it returns True or a timeout is reached.
129 for _
in range(timeout
):
135 raise Exception(f
"action timed out after {timeout} seconds")
139 """The Base Start Command knows how to append the necessary
140 runtime qemu options as determined by a particular test driver
141 run. Any such start command is expected to happily receive and
142 append additional qemu args.
149 monitor_socket_path
: Path
,
150 qmp_socket_path
: Path
,
151 shell_socket_path
: Path
,
152 allow_reboot
: bool = False,
155 display_available
= any(x
in os
.environ
for x
in ["DISPLAY", "WAYLAND_DISPLAY"])
156 if not display_available
:
157 display_opts
+= " -nographic"
161 " -device virtio-serial"
162 # Note: virtconsole will map to /dev/hvc0 in Linux guests
163 " -device virtconsole,chardev=shell"
164 " -device virtio-rng-pci"
168 qemu_opts
+= " -no-reboot"
172 f
" -qmp unix:{qmp_socket_path},server=on,wait=off"
173 f
" -monitor unix:{monitor_socket_path}"
174 f
" -chardev socket,id=shell,path={shell_socket_path}"
180 def build_environment(
184 # We make a copy to not update the current environment
185 env
= dict(os
.environ
)
188 "TMPDIR": str(state_dir
),
189 "SHARED_DIR": str(shared_dir
),
199 monitor_socket_path
: Path
,
200 qmp_socket_path
: Path
,
201 shell_socket_path
: Path
,
203 ) -> subprocess
.Popen
:
204 return subprocess
.Popen(
206 monitor_socket_path
, qmp_socket_path
, shell_socket_path
, allow_reboot
208 stdin
=subprocess
.PIPE
,
209 stdout
=subprocess
.PIPE
,
212 env
=self
.build_environment(state_dir
, shared_dir
),
216 class NixStartScript(StartCommand
):
217 """A start script from nixos/modules/virtualiation/qemu-vm.nix
218 that also satisfies the requirement of the BaseStartCommand.
219 These Nix commands have the particular characteristic that the
220 machine name can be extracted out of them via a regex match.
221 (Admittedly a _very_ implicit contract, evtl. TODO fix)
224 def __init__(self
, script
: str):
228 def machine_name(self
) -> str:
229 match
= re
.search("run-(.+)-vm$", self
._cmd
)
232 name
= match
.group(1)
237 """A handle to the machine with this name, that also knows how to manage
238 the machine lifecycle with the help of a start script / command."""
249 start_command
: StartCommand
252 process
: subprocess
.Popen |
None
254 monitor
: socket
.socket |
None
255 qmp_client
: QMPSession |
None
256 shell
: socket
.socket |
None
257 serial_thread
: threading
.Thread |
None
261 # Store last serial console lines for use
262 # of wait_for_console_text
263 last_lines
: Queue
= Queue()
264 callbacks
: list[Callable
]
266 def __repr__(self
) -> str:
267 return f
"<Machine '{self.name}'>"
273 start_command
: StartCommand
,
274 logger
: AbstractLogger
,
275 name
: str = "machine",
276 keep_vm_state
: bool = False,
277 callbacks
: list[Callable
] |
None = None,
279 self
.out_dir
= out_dir
280 self
.tmp_dir
= tmp_dir
281 self
.keep_vm_state
= keep_vm_state
283 self
.start_command
= start_command
284 self
.callbacks
= callbacks
if callbacks
is not None else []
288 self
.shared_dir
= self
.tmp_dir
/ "shared-xchg"
289 self
.shared_dir
.mkdir(mode
=0o700, exist_ok
=True)
291 self
.state_dir
= self
.tmp_dir
/ f
"vm-state-{self.name}"
292 self
.monitor_path
= self
.state_dir
/ "monitor"
293 self
.qmp_path
= self
.state_dir
/ "qmp"
294 self
.shell_path
= self
.state_dir
/ "shell"
295 if (not self
.keep_vm_state
) and self
.state_dir
.exists():
296 self
.cleanup_statedir()
297 self
.state_dir
.mkdir(mode
=0o700, exist_ok
=True)
302 self
.qmp_client
= None
304 self
.serial_thread
= None
307 self
.connected
= False
309 def is_up(self
) -> bool:
310 return self
.booted
and self
.connected
312 def log(self
, msg
: str) -> None:
313 self
.logger
.log(msg
, {"machine": self
.name
})
315 def log_serial(self
, msg
: str) -> None:
316 self
.logger
.log_serial(msg
, self
.name
)
318 def nested(self
, msg
: str, attrs
: dict[str, str] = {}) -> _GeneratorContextManager
:
319 my_attrs
= {"machine": self
.name
}
320 my_attrs
.update(attrs
)
321 return self
.logger
.nested(msg
, my_attrs
)
323 def wait_for_monitor_prompt(self
) -> str:
324 assert self
.monitor
is not None
327 undecoded_answer
= self
.monitor
.recv(1024)
328 if not undecoded_answer
:
330 answer
+= undecoded_answer
.decode()
331 if answer
.endswith("(qemu) "):
335 def send_monitor_command(self
, command
: str) -> str:
337 Send a command to the QEMU monitor. This allows attaching
338 virtual USB disks to a running machine, among other things.
341 message
= f
"{command}\n".encode()
342 assert self
.monitor
is not None
343 self
.monitor
.send(message
)
344 return self
.wait_for_monitor_prompt()
347 self
, unit
: str, user
: str |
None = None, timeout
: int = 900
350 Wait for a systemd unit to get into "active" state.
351 Throws exceptions on "failed" and "inactive" states as well as after
355 def check_active(_
: Any
) -> bool:
356 state
= self
.get_unit_property(unit
, "ActiveState", user
)
357 if state
== "failed":
358 raise Exception(f
'unit "{unit}" reached state "{state}"')
360 if state
== "inactive":
361 status
, jobs
= self
.systemctl("list-jobs --full 2>&1", user
)
362 if "No jobs" in jobs
:
363 info
= self
.get_unit_info(unit
, user
)
364 if info
["ActiveState"] == state
:
366 f
'unit "{unit}" is inactive and there are no pending jobs'
369 return state
== "active"
372 f
"waiting for unit {unit}"
373 + (f
" with user {user}" if user
is not None else "")
375 retry(check_active
, timeout
)
377 def get_unit_info(self
, unit
: str, user
: str |
None = None) -> dict[str, str]:
378 status
, lines
= self
.systemctl(f
'--no-pager show "{unit}"', user
)
381 f
'retrieving systemctl info for unit "{unit}"'
382 + ("" if user
is None else f
' under user "{user}"')
383 + f
" failed with exit code {status}"
386 line_pattern
= re
.compile(r
"^([^=]+)=(.*)$")
388 def tuple_from_line(line
: str) -> tuple[str, str]:
389 match
= line_pattern
.match(line
)
390 assert match
is not None
391 return match
[1], match
[2]
394 tuple_from_line(line
)
395 for line
in lines
.split("\n")
396 if line_pattern
.match(line
)
399 def get_unit_property(
403 user
: str |
None = None,
405 status
, lines
= self
.systemctl(
406 f
'--no-pager show "{unit}" --property="{property}"',
411 f
'retrieving systemctl property "{property}" for unit "{unit}"'
412 + ("" if user
is None else f
' under user "{user}"')
413 + f
" failed with exit code {status}"
416 invalid_output_message
= (
417 f
'systemctl show --property "{property}" "{unit}"'
418 f
"produced invalid output: {lines}"
421 line_pattern
= re
.compile(r
"^([^=]+)=(.*)$")
422 match
= line_pattern
.match(lines
)
423 assert match
is not None, invalid_output_message
425 assert match
[1] == property, invalid_output_message
428 def systemctl(self
, q
: str, user
: str |
None = None) -> tuple[int, str]:
430 Runs `systemctl` commands with optional support for
434 # run `systemctl list-jobs --no-pager`
435 machine.systemctl("list-jobs --no-pager")
437 # spawn a shell for `any-user` and run
438 # `systemctl --user list-jobs --no-pager`
439 machine.systemctl("list-jobs --no-pager", "any-user")
443 q
= q
.replace("'", "\\'")
445 f
"su -l {user} --shell /bin/sh -c "
446 "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
447 f
"systemctl --user {q}'"
449 return self
.execute(f
"systemctl {q}")
451 def require_unit_state(self
, unit
: str, require_state
: str = "active") -> None:
453 f
"checking if unit '{unit}' has reached state '{require_state}'"
455 info
= self
.get_unit_info(unit
)
456 state
= info
["ActiveState"]
457 if state
!= require_state
:
459 f
"Expected unit '{unit}' to to be in state "
460 f
"'{require_state}' but it is in state '{state}'"
463 def _next_newline_closed_block_from_shell(self
) -> str:
467 # This receives up to 4096 bytes from the socket
468 chunk
= self
.shell
.recv(4096)
470 # Probably a broken pipe, return the output we have
473 decoded
= chunk
.decode()
474 output_buffer
+= [decoded
]
475 if decoded
[-1] == "\n":
477 return "".join(output_buffer
)
482 check_return
: bool = True,
483 check_output
: bool = True,
484 timeout
: int |
None = 900,
485 ) -> tuple[int, str]:
487 Execute a shell command, returning a list `(status, stdout)`.
489 Commands are run with `set -euo pipefail` set:
491 - If several commands are separated by `;` and one fails, the
492 command as a whole will fail.
494 - For pipelines, the last non-zero exit status will be returned
495 (if there is one; otherwise zero will be returned).
497 - Dereferencing unset variables fails the command.
499 - It will wait for stdout to be closed.
501 If the command detaches, it must close stdout, as `execute` will wait
502 for this to consume all output reliably. This can be achieved by
503 redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or
504 a file. Examples of detaching commands are `sleep 365d &`, where the
505 shell forks a new process that can write to stdout and `xclip -i`, where
506 the `xclip` command itself forks without closing stdout.
508 Takes an optional parameter `check_return` that defaults to `True`.
509 Setting this parameter to `False` will not check for the return code
510 and return -1 instead. This can be used for commands that shut down
511 the VM and would therefore break the pipe that would be used for
512 retrieving the return code.
514 A timeout for the command can be specified (in seconds) using the optional
515 `timeout` parameter, e.g., `execute(cmd, timeout=10)` or
516 `execute(cmd, timeout=None)`. The default is 900 seconds.
521 # Always run command with shell opts
522 command
= f
"set -euo pipefail; {command}"
525 if timeout
is not None:
526 timeout_str
= f
"timeout {timeout}"
528 # While sh is bash on NixOS, this is not the case for every distro.
529 # We explicitly call bash here to allow for the driver to boot other distros as well.
531 f
"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n"
535 self
.shell
.send(out_command
.encode())
541 output
= base64
.b64decode(self
._next
_newline
_closed
_block
_from
_shell
())
544 return (-1, output
.decode())
546 # Get the return code
547 self
.shell
.send(b
"echo ${PIPESTATUS[0]}\n")
548 rc
= int(self
._next
_newline
_closed
_block
_from
_shell
().strip())
550 return (rc
, output
.decode(errors
="replace"))
552 def shell_interact(self
, address
: str |
None = None) -> None:
554 Allows you to directly interact with the guest shell. This should
555 only be used during test development, not in production tests.
556 Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends
562 address
= "READLINE,prompt=$ "
563 self
.log("Terminal is ready (there is no initial prompt):")
568 ["socat", address
, f
"FD:{self.shell.fileno()}"],
569 pass_fds
=[self
.shell
.fileno()],
571 # allow users to cancel this command without breaking the test
572 except KeyboardInterrupt:
575 def console_interact(self
) -> None:
577 Allows you to directly interact with QEMU's stdin, by forwarding
578 terminal input to the QEMU process.
579 This is for use with the interactive test driver, not for production
580 tests, which run unattended.
581 Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and
582 `Ctrl-d` closes console and returns to the test runner.
584 self
.log("Terminal is ready (there is no prompt):")
587 assert self
.process
.stdin
591 char
= sys
.stdin
.buffer.read(1)
592 except KeyboardInterrupt:
594 if char
== b
"": # ctrl+d
595 self
.log("Closing connection to the console")
597 self
.send_console(char
.decode())
599 def succeed(self
, *commands
: str, timeout
: int |
None = None) -> str:
601 Execute a shell command, raising an exception if the exit status is
602 not zero, otherwise returning the standard output. Similar to `execute`,
603 except that the timeout is `None` by default. See `execute` for details on
607 for command
in commands
:
608 with self
.nested(f
"must succeed: {command}"):
609 (status
, out
) = self
.execute(command
, timeout
=timeout
)
611 self
.log(f
"output: {out}")
612 raise Exception(f
"command `{command}` failed (exit code {status})")
616 def fail(self
, *commands
: str, timeout
: int |
None = None) -> str:
618 Like `succeed`, but raising an exception if the command returns a zero
622 for command
in commands
:
623 with self
.nested(f
"must fail: {command}"):
624 (status
, out
) = self
.execute(command
, timeout
=timeout
)
626 raise Exception(f
"command `{command}` unexpectedly succeeded")
630 def wait_until_succeeds(self
, command
: str, timeout
: int = 900) -> str:
632 Repeat a shell command with 1-second intervals until it succeeds.
633 Has a default timeout of 900 seconds which can be modified, e.g.
634 `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on
636 Throws an exception on timeout.
640 def check_success(_
: Any
) -> bool:
642 status
, output
= self
.execute(command
, timeout
=timeout
)
645 with self
.nested(f
"waiting for success: {command}"):
646 retry(check_success
, timeout
)
649 def wait_until_fails(self
, command
: str, timeout
: int = 900) -> str:
651 Like `wait_until_succeeds`, but repeating the command until it fails.
655 def check_failure(_
: Any
) -> bool:
657 status
, output
= self
.execute(command
, timeout
=timeout
)
660 with self
.nested(f
"waiting for failure: {command}"):
661 retry(check_failure
, timeout
)
664 def wait_for_shutdown(self
) -> None:
668 with self
.nested("waiting for the VM to power off"):
675 self
.connected
= False
677 def wait_for_qmp_event(
678 self
, event_filter
: Callable
[[dict[str, Any
]], bool], timeout
: int = 60 * 10
681 Wait for a QMP event which you can filter with the `event_filter` function.
682 The function takes as an input a dictionary of the event and if it returns True, we return that event,
683 if it does not, we wait for the next event and retry.
685 It will skip all events received in the meantime, if you want to keep them,
686 you have to do the bookkeeping yourself and store them somewhere.
688 By default, it will wait up to 10 minutes, `timeout` is in seconds.
690 if self
.qmp_client
is None:
691 raise RuntimeError("QMP API is not ready yet, is the VM ready?")
695 evt
= self
.qmp_client
.wait_for_event(timeout
=timeout
)
696 if event_filter(evt
):
699 elapsed
= time
.time() - start
700 if elapsed
>= timeout
:
703 def get_tty_text(self
, tty
: str) -> str:
704 status
, output
= self
.execute(
705 f
"fold -w$(stty -F /dev/tty{tty} size | "
706 f
"awk '{{print $2}}') /dev/vcs{tty}"
710 def wait_until_tty_matches(self
, tty
: str, regexp
: str, timeout
: int = 900) -> None:
711 """Wait until the visible output on the chosen TTY matches regular
712 expression. Throws an exception on timeout.
714 matcher
= re
.compile(regexp
)
716 def tty_matches(last
: bool) -> bool:
717 text
= self
.get_tty_text(tty
)
720 f
"Last chance to match /{regexp}/ on TTY{tty}, "
721 f
"which currently contains: {text}"
723 return len(matcher
.findall(text
)) > 0
725 with self
.nested(f
"waiting for {regexp} to appear on tty {tty}"):
726 retry(tty_matches
, timeout
)
728 def send_chars(self
, chars
: str, delay
: float |
None = 0.01) -> None:
730 Simulate typing a sequence of characters on the virtual keyboard,
731 e.g., `send_chars("foobar\n")` will type the string `foobar`
732 followed by the Enter key.
734 with self
.nested(f
"sending keys {repr(chars)}"):
736 self
.send_key(char
, delay
, log
=False)
738 def wait_for_file(self
, filename
: str, timeout
: int = 900) -> None:
740 Waits until the file exists in the machine's file system.
743 def check_file(_
: Any
) -> bool:
744 status
, _
= self
.execute(f
"test -e {filename}")
747 with self
.nested(f
"waiting for file '{filename}'"):
748 retry(check_file
, timeout
)
750 def wait_for_open_port(
751 self
, port
: int, addr
: str = "localhost", timeout
: int = 900
754 Wait until a process is listening on the given TCP port and IP address
755 (default `localhost`).
758 def port_is_open(_
: Any
) -> bool:
759 status
, _
= self
.execute(f
"nc -z {addr} {port}")
762 with self
.nested(f
"waiting for TCP port {port} on {addr}"):
763 retry(port_is_open
, timeout
)
765 def wait_for_open_unix_socket(
766 self
, addr
: str, is_datagram
: bool = False, timeout
: int = 900
769 Wait until a process is listening on the given UNIX-domain socket
770 (default to a UNIX-domain stream socket).
775 "-uU" if is_datagram
else "-U",
778 def socket_is_open(_
: Any
) -> bool:
779 status
, _
= self
.execute(f
"nc {' '.join(nc_flags)} {addr}")
783 f
"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'"
785 retry(socket_is_open
, timeout
)
787 def wait_for_closed_port(
788 self
, port
: int, addr
: str = "localhost", timeout
: int = 900
791 Wait until nobody is listening on the given TCP port and IP address
792 (default `localhost`).
795 def port_is_closed(_
: Any
) -> bool:
796 status
, _
= self
.execute(f
"nc -z {addr} {port}")
799 with self
.nested(f
"waiting for TCP port {port} on {addr} to be closed"):
800 retry(port_is_closed
, timeout
)
802 def start_job(self
, jobname
: str, user
: str |
None = None) -> tuple[int, str]:
803 return self
.systemctl(f
"start {jobname}", user
)
805 def stop_job(self
, jobname
: str, user
: str |
None = None) -> tuple[int, str]:
806 return self
.systemctl(f
"stop {jobname}", user
)
808 def wait_for_job(self
, jobname
: str) -> None:
809 self
.wait_for_unit(jobname
)
811 def connect(self
) -> None:
812 def shell_ready(timeout_secs
: int) -> bool:
813 """We sent some data from the backdoor service running on the guest
814 to indicate that the backdoor shell is ready.
815 As soon as we read some data from the socket here, we assume that
816 our root shell is operational.
818 (ready
, _
, _
) = select
.select([self
.shell
], [], [], timeout_secs
)
824 with self
.nested("waiting for the VM to finish booting"):
830 # TODO: do we want to bail after a set number of attempts?
831 while not shell_ready(timeout_secs
=30):
832 self
.log("Guest root shell did not produce any data yet...")
834 " To debug, enter the VM and run 'systemctl status backdoor.service'."
838 chunk
= self
.shell
.recv(1024)
839 # No need to print empty strings, it means we are waiting.
842 self
.log(f
"Guest shell says: {chunk!r}")
843 # NOTE: for this to work, nothing must be printed after this line!
844 if b
"Spawning backdoor root shell..." in chunk
:
849 self
.log("connected to guest root shell")
850 self
.log(f
"(connecting took {toc - tic:.2f} seconds)")
851 self
.connected
= True
853 def screenshot(self
, filename
: str) -> None:
855 Take a picture of the display of the virtual machine, in PNG format.
856 The screenshot will be available in the derivation output.
858 if "." not in filename
:
860 if "/" not in filename
:
861 filename
= os
.path
.join(self
.out_dir
, filename
)
862 tmp
= f
"{filename}.ppm"
865 f
"making screenshot {filename}",
866 {"image": os
.path
.basename(filename
)},
868 self
.send_monitor_command(f
"screendump {tmp}")
869 ret
= subprocess
.run(f
"pnmtopng '{tmp}' > '{filename}'", shell
=True)
871 if ret
.returncode
!= 0:
872 raise Exception("Cannot convert screenshot")
874 def copy_from_host_via_shell(self
, source
: str, target
: str) -> None:
875 """Copy a file from the host into the guest by piping it over the
876 shell into the destination file. Works without host-guest shared folder.
877 Prefer copy_from_host for whenever possible.
879 with
open(source
, "rb") as fh
:
880 content_b64
= base64
.b64encode(fh
.read()).decode()
882 f
"mkdir -p $(dirname {target})",
883 f
"echo -n {content_b64} | base64 -d > {target}",
886 def copy_from_host(self
, source
: str, target
: str) -> None:
888 Copies a file from host to machine, e.g.,
889 `copy_from_host("myfile", "/etc/my/important/file")`.
891 The first argument is the file on the host. Note that the "host" refers
892 to the environment in which the test driver runs, which is typically the
895 The second argument is the location of the file on the machine that will
898 The file is copied via the `shared_dir` directory which is shared among
899 all the VMs (using a temporary directory).
900 The access rights bits will mimic the ones from the host file and
901 user:group will be root:root.
903 host_src
= Path(source
)
904 vm_target
= Path(target
)
905 with tempfile
.TemporaryDirectory(dir=self
.shared_dir
) as shared_td
:
906 shared_temp
= Path(shared_td
)
907 host_intermediate
= shared_temp
/ host_src
.name
908 vm_shared_temp
= Path("/tmp/shared") / shared_temp
.name
909 vm_intermediate
= vm_shared_temp
/ host_src
.name
911 self
.succeed(make_command(["mkdir", "-p", vm_shared_temp
]))
912 if host_src
.is_dir():
913 shutil
.copytree(host_src
, host_intermediate
)
915 shutil
.copy(host_src
, host_intermediate
)
916 self
.succeed(make_command(["mkdir", "-p", vm_target
.parent
]))
917 self
.succeed(make_command(["cp", "-r", vm_intermediate
, vm_target
]))
919 def copy_from_vm(self
, source
: str, target_dir
: str = "") -> None:
920 """Copy a file from the VM (specified by an in-VM source path) to a path
921 relative to `$out`. The file is copied via the `shared_dir` shared among
922 all the VMs (using a temporary directory).
924 # Compute the source, target, and intermediate shared file names
925 vm_src
= Path(source
)
926 with tempfile
.TemporaryDirectory(dir=self
.shared_dir
) as shared_td
:
927 shared_temp
= Path(shared_td
)
928 vm_shared_temp
= Path("/tmp/shared") / shared_temp
.name
929 vm_intermediate
= vm_shared_temp
/ vm_src
.name
930 intermediate
= shared_temp
/ vm_src
.name
931 # Copy the file to the shared directory inside VM
932 self
.succeed(make_command(["mkdir", "-p", vm_shared_temp
]))
933 self
.succeed(make_command(["cp", "-r", vm_src
, vm_intermediate
]))
934 abs_target
= self
.out_dir
/ target_dir
/ vm_src
.name
935 abs_target
.parent
.mkdir(exist_ok
=True, parents
=True)
936 # Copy the file from the shared directory outside VM
937 if intermediate
.is_dir():
938 shutil
.copytree(intermediate
, abs_target
)
940 shutil
.copy(intermediate
, abs_target
)
942 def dump_tty_contents(self
, tty
: str) -> None:
943 """Debugging: Dump the contents of the TTY<n>"""
944 self
.execute(f
"fold -w 80 /dev/vcs{tty} | systemd-cat")
946 def _get_screen_text_variants(self
, model_ids
: Iterable
[int]) -> list[str]:
947 with tempfile
.TemporaryDirectory() as tmpdir
:
948 screenshot_path
= os
.path
.join(tmpdir
, "ppm")
949 self
.send_monitor_command(f
"screendump {screenshot_path}")
950 return _perform_ocr_on_screenshot(screenshot_path
, model_ids
)
952 def get_screen_text_variants(self
) -> list[str]:
954 Return a list of different interpretations of what is currently
955 visible on the machine's screen using optical character
956 recognition. The number and order of the interpretations is not
957 specified and is subject to change, but if no exception is raised at
958 least one will be returned.
961 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
964 return self
._get
_screen
_text
_variants
([0, 1, 2])
966 def get_screen_text(self
) -> str:
968 Return a textual representation of what is currently visible on the
969 machine's screen using optical character recognition.
972 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
975 return self
._get
_screen
_text
_variants
([2])[0]
977 def wait_for_text(self
, regex
: str, timeout
: int = 900) -> None:
979 Wait until the supplied regular expressions matches the textual
980 contents of the screen by using optical character recognition (see
981 `get_screen_text` and `get_screen_text_variants`).
984 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
988 def screen_matches(last
: bool) -> bool:
989 variants
= self
.get_screen_text_variants()
990 for text
in variants
:
991 if re
.search(regex
, text
) is not None:
995 self
.log(f
"Last OCR attempt failed. Text was: {variants}")
999 with self
.nested(f
"waiting for {regex} to appear on screen"):
1000 retry(screen_matches
, timeout
)
1002 def wait_for_console_text(self
, regex
: str, timeout
: int |
None = None) -> None:
1004 Wait until the supplied regular expressions match a line of the
1005 serial console output.
1006 This method is useful when OCR is not possible or inaccurate.
1008 # Buffer the console output, this is needed
1009 # to match multiline regexes.
1010 console
= io
.StringIO()
1012 def console_matches(_
: Any
) -> bool:
1015 # This will return as soon as possible and
1017 console
.write(self
.last_lines
.get(block
=False))
1021 matches
= re
.search(regex
, console
.read())
1022 return matches
is not None
1024 with self
.nested(f
"waiting for {regex} to appear on console"):
1025 if timeout
is not None:
1026 retry(console_matches
, timeout
)
1028 while not console_matches(False):
1032 self
, key
: str, delay
: float |
None = 0.01, log
: bool |
None = True
1035 Simulate pressing keys on the virtual keyboard, e.g.,
1036 `send_key("ctrl-alt-delete")`.
1038 Please also refer to the QEMU documentation for more information on the
1039 input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys
1041 key
= CHAR_TO_KEY
.get(key
, key
)
1042 context
= self
.nested(f
"sending key {repr(key)}") if log
else nullcontext()
1044 self
.send_monitor_command(f
"sendkey {key}")
1045 if delay
is not None:
1048 def send_console(self
, chars
: str) -> None:
1050 Send keys to the kernel console. This allows interaction with the systemd
1051 emergency mode, for example. Takes a string that is sent, e.g.,
1052 `send_console("\n\nsystemctl default\n")`.
1055 assert self
.process
.stdin
1056 self
.process
.stdin
.write(chars
.encode())
1057 self
.process
.stdin
.flush()
1059 def start(self
, allow_reboot
: bool = False) -> None:
1061 Start the virtual machine. This method is asynchronous --- it does
1062 not wait for the machine to finish booting.
1067 self
.log("starting vm")
1069 def clear(path
: Path
) -> Path
:
1074 def create_socket(path
: Path
) -> socket
.socket
:
1075 s
= socket
.socket(family
=socket
.AF_UNIX
, type=socket
.SOCK_STREAM
)
1080 monitor_socket
= create_socket(clear(self
.monitor_path
))
1081 shell_socket
= create_socket(clear(self
.shell_path
))
1082 self
.process
= self
.start_command
.run(
1090 self
.monitor
, _
= monitor_socket
.accept()
1091 self
.shell
, _
= shell_socket
.accept()
1092 self
.qmp_client
= QMPSession
.from_path(self
.qmp_path
)
1094 # Store last serial console lines for use
1095 # of wait_for_console_text
1096 self
.last_lines
: Queue
= Queue()
1098 def process_serial_output() -> None:
1100 assert self
.process
.stdout
1101 for _line
in self
.process
.stdout
:
1102 # Ignore undecodable bytes that may occur in boot menus
1103 line
= _line
.decode(errors
="ignore").replace("\r", "").rstrip()
1104 self
.last_lines
.put(line
)
1105 self
.log_serial(line
)
1107 self
.serial_thread
= threading
.Thread(target
=process_serial_output
)
1108 self
.serial_thread
.start()
1110 self
.wait_for_monitor_prompt()
1112 self
.pid
= self
.process
.pid
1115 self
.log(f
"QEMU running (pid {self.pid})")
1117 def cleanup_statedir(self
) -> None:
1118 shutil
.rmtree(self
.state_dir
)
1119 self
.logger
.log(f
"deleting VM state directory {self.state_dir}")
1120 self
.logger
.log("if you want to keep the VM state, pass --keep-vm-state")
1122 def shutdown(self
) -> None:
1124 Shut down the machine, waiting for the VM to exit.
1130 self
.shell
.send(b
"poweroff\n")
1131 self
.wait_for_shutdown()
1133 def crash(self
) -> None:
1135 Simulate a sudden power failure, by telling the VM to exit immediately.
1140 self
.log("forced crash")
1141 self
.send_monitor_command("quit")
1142 self
.wait_for_shutdown()
1144 def reboot(self
) -> None:
1145 """Press Ctrl+Alt+Delete in the guest.
1147 Prepares the machine to be reconnected which is useful if the
1148 machine was started with `allow_reboot = True`
1150 self
.send_key("ctrl-alt-delete")
1151 self
.connected
= False
1153 def wait_for_x(self
, timeout
: int = 900) -> None:
1155 Wait until it is possible to connect to the X server.
1158 def check_x(_
: Any
) -> bool:
1160 "journalctl -b SYSLOG_IDENTIFIER=systemd | "
1161 + 'grep "Reached target Current graphical"'
1163 status
, _
= self
.execute(cmd
)
1166 status
, _
= self
.execute("[ -e /tmp/.X11-unix/X0 ]")
1169 with self
.nested("waiting for the X11 server"):
1170 retry(check_x
, timeout
)
1172 def get_window_names(self
) -> list[str]:
1173 return self
.succeed(
1174 r
"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\
([^
\"]*\
)\".*/\
1/; t
; d
'"
1177 def wait_for_window(self, regexp: str, timeout: int = 900) -> None:
1179 Wait until an X11 window has appeared whose name matches the given
1180 regular expression, e.g., `wait_for_window("Terminal")`.
1182 pattern = re.compile(regexp)
1184 def window_is_visible(last_try: bool) -> bool:
1185 names = self.get_window_names()
1188 f"Last chance to match {regexp} on the window list,"
1189 + " which currently contains: "
1192 return any(pattern.search(name) for name in names)
1194 with self.nested("waiting for a window to appear"):
1195 retry(window_is_visible, timeout)
1197 def sleep(self, secs: int) -> None:
1198 # We want to sleep in *guest* time, not *host* time.
1199 self.succeed(f"sleep {secs}")
1201 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None:
1203 Forward a TCP port on the host to a TCP port on the guest.
1204 Useful during interactive testing.
1206 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}")
1208 def block(self) -> None:
1210 Simulate unplugging the Ethernet cable that connects the machine to
1212 This happens by shutting down eth1 (the multicast interface used to talk
1213 to the other VMs). eth0 is kept online to still enable the test driver
1214 to communicate with the machine.
1216 self.send_monitor_command("set_link virtio-net-pci.1 off")
1218 def unblock(self) -> None:
1220 Undo the effect of `block`.
1222 self.send_monitor_command("set_link virtio-net-pci.1 on")
1224 def release(self) -> None:
1225 if self.pid is None:
1227 self.logger.info(f"kill machine (pid {self.pid})")
1231 assert self.serial_thread
1233 self.process.terminate()
1235 self.monitor.close()
1236 self.serial_thread.join()
1239 self.qmp_client.close()
1241 def run_callbacks(self) -> None:
1242 for callback in self.callbacks:
1245 def switch_root(self) -> None:
1247 Transition from stage 1 to stage 2. This requires the
1248 machine to be configured with `testing.initrdBackdoor = true`
1249 and `boot.initrd.systemd.enable = true`.
1251 self.wait_for_unit("initrd.target")
1253 "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null",
1257 self.connected = False