15 from contextlib
import _GeneratorContextManager
, nullcontext
16 from pathlib
import Path
17 from queue
import Queue
18 from typing
import Any
, Callable
, Dict
, Iterable
, List
, Optional
, Tuple
20 from test_driver
.logger
import AbstractLogger
22 from .qmp
import QMPSession
88 def make_command(args
: list) -> str:
89 return " ".join(map(shlex
.quote
, (map(str, args
))))
92 def _perform_ocr_on_screenshot(
93 screenshot_path
: str, model_ids
: Iterable
[int]
95 if shutil
.which("tesseract") is None:
96 raise Exception("OCR requested but enableOCR is false")
99 "-filter Catrom -density 72 -resample 300 "
100 + "-contrast -normalize -despeckle -type grayscale "
101 + "-sharpen 1 -posterize 3 -negate -gamma 100 "
105 tess_args
= "-c debug_file=/dev/null --psm 11"
107 cmd
= f
"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'"
108 ret
= subprocess
.run(cmd
, shell
=True, capture_output
=True)
109 if ret
.returncode
!= 0:
110 raise Exception(f
"TIFF conversion failed with exit code {ret.returncode}")
113 for model_id
in model_ids
:
114 cmd
= f
"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'"
115 ret
= subprocess
.run(cmd
, shell
=True, capture_output
=True)
116 if ret
.returncode
!= 0:
117 raise Exception(f
"OCR failed with exit code {ret.returncode}")
118 model_results
.append(ret
.stdout
.decode("utf-8"))
123 def retry(fn
: Callable
, timeout
: int = 900) -> None:
124 """Call the given function repeatedly, with 1 second intervals,
125 until it returns True or a timeout is reached.
128 for _
in range(timeout
):
134 raise Exception(f
"action timed out after {timeout} seconds")
138 """The Base Start Command knows how to append the necessary
139 runtime qemu options as determined by a particular test driver
140 run. Any such start command is expected to happily receive and
141 append additional qemu args.
148 monitor_socket_path
: Path
,
149 qmp_socket_path
: Path
,
150 shell_socket_path
: Path
,
151 allow_reboot
: bool = False,
154 display_available
= any(x
in os
.environ
for x
in ["DISPLAY", "WAYLAND_DISPLAY"])
155 if not display_available
:
156 display_opts
+= " -nographic"
160 " -device virtio-serial"
161 # Note: virtconsole will map to /dev/hvc0 in Linux guests
162 " -device virtconsole,chardev=shell"
163 " -device virtio-rng-pci"
167 qemu_opts
+= " -no-reboot"
171 f
" -qmp unix:{qmp_socket_path},server=on,wait=off"
172 f
" -monitor unix:{monitor_socket_path}"
173 f
" -chardev socket,id=shell,path={shell_socket_path}"
179 def build_environment(
183 # We make a copy to not update the current environment
184 env
= dict(os
.environ
)
187 "TMPDIR": str(state_dir
),
188 "SHARED_DIR": str(shared_dir
),
198 monitor_socket_path
: Path
,
199 qmp_socket_path
: Path
,
200 shell_socket_path
: Path
,
202 ) -> subprocess
.Popen
:
203 return subprocess
.Popen(
205 monitor_socket_path
, qmp_socket_path
, shell_socket_path
, allow_reboot
207 stdin
=subprocess
.PIPE
,
208 stdout
=subprocess
.PIPE
,
211 env
=self
.build_environment(state_dir
, shared_dir
),
215 class NixStartScript(StartCommand
):
216 """A start script from nixos/modules/virtualiation/qemu-vm.nix
217 that also satisfies the requirement of the BaseStartCommand.
218 These Nix commands have the particular characteristic that the
219 machine name can be extracted out of them via a regex match.
220 (Admittedly a _very_ implicit contract, evtl. TODO fix)
223 def __init__(self
, script
: str):
227 def machine_name(self
) -> str:
228 match
= re
.search("run-(.+)-vm$", self
._cmd
)
231 name
= match
.group(1)
236 """A handle to the machine with this name, that also knows how to manage
237 the machine lifecycle with the help of a start script / command."""
248 start_command
: StartCommand
251 process
: Optional
[subprocess
.Popen
]
253 monitor
: Optional
[socket
.socket
]
254 qmp_client
: Optional
[QMPSession
]
255 shell
: Optional
[socket
.socket
]
256 serial_thread
: Optional
[threading
.Thread
]
260 # Store last serial console lines for use
261 # of wait_for_console_text
262 last_lines
: Queue
= Queue()
263 callbacks
: List
[Callable
]
265 def __repr__(self
) -> str:
266 return f
"<Machine '{self.name}'>"
272 start_command
: StartCommand
,
273 logger
: AbstractLogger
,
274 name
: str = "machine",
275 keep_vm_state
: bool = False,
276 callbacks
: Optional
[List
[Callable
]] = None,
278 self
.out_dir
= out_dir
279 self
.tmp_dir
= tmp_dir
280 self
.keep_vm_state
= keep_vm_state
282 self
.start_command
= start_command
283 self
.callbacks
= callbacks
if callbacks
is not None else []
287 self
.shared_dir
= self
.tmp_dir
/ "shared-xchg"
288 self
.shared_dir
.mkdir(mode
=0o700, exist_ok
=True)
290 self
.state_dir
= self
.tmp_dir
/ f
"vm-state-{self.name}"
291 self
.monitor_path
= self
.state_dir
/ "monitor"
292 self
.qmp_path
= self
.state_dir
/ "qmp"
293 self
.shell_path
= self
.state_dir
/ "shell"
294 if (not self
.keep_vm_state
) and self
.state_dir
.exists():
295 self
.cleanup_statedir()
296 self
.state_dir
.mkdir(mode
=0o700, exist_ok
=True)
301 self
.qmp_client
= None
303 self
.serial_thread
= None
306 self
.connected
= False
308 def is_up(self
) -> bool:
309 return self
.booted
and self
.connected
311 def log(self
, msg
: str) -> None:
312 self
.logger
.log(msg
, {"machine": self
.name
})
314 def log_serial(self
, msg
: str) -> None:
315 self
.logger
.log_serial(msg
, self
.name
)
317 def nested(self
, msg
: str, attrs
: Dict
[str, str] = {}) -> _GeneratorContextManager
:
318 my_attrs
= {"machine": self
.name
}
319 my_attrs
.update(attrs
)
320 return self
.logger
.nested(msg
, my_attrs
)
322 def wait_for_monitor_prompt(self
) -> str:
323 assert self
.monitor
is not None
326 undecoded_answer
= self
.monitor
.recv(1024)
327 if not undecoded_answer
:
329 answer
+= undecoded_answer
.decode()
330 if answer
.endswith("(qemu) "):
334 def send_monitor_command(self
, command
: str) -> str:
336 Send a command to the QEMU monitor. This allows attaching
337 virtual USB disks to a running machine, among other things.
340 message
= f
"{command}\n".encode()
341 assert self
.monitor
is not None
342 self
.monitor
.send(message
)
343 return self
.wait_for_monitor_prompt()
346 self
, unit
: str, user
: Optional
[str] = None, timeout
: int = 900
349 Wait for a systemd unit to get into "active" state.
350 Throws exceptions on "failed" and "inactive" states as well as after
354 def check_active(_
: Any
) -> bool:
355 state
= self
.get_unit_property(unit
, "ActiveState", user
)
356 if state
== "failed":
357 raise Exception(f
'unit "{unit}" reached state "{state}"')
359 if state
== "inactive":
360 status
, jobs
= self
.systemctl("list-jobs --full 2>&1", user
)
361 if "No jobs" in jobs
:
362 info
= self
.get_unit_info(unit
, user
)
363 if info
["ActiveState"] == state
:
365 f
'unit "{unit}" is inactive and there are no pending jobs'
368 return state
== "active"
371 f
"waiting for unit {unit}"
372 + (f
" with user {user}" if user
is not None else "")
374 retry(check_active
, timeout
)
376 def get_unit_info(self
, unit
: str, user
: Optional
[str] = None) -> Dict
[str, str]:
377 status
, lines
= self
.systemctl(f
'--no-pager show "{unit}"', user
)
380 f
'retrieving systemctl info for unit "{unit}"'
381 + ("" if user
is None else f
' under user "{user}"')
382 + f
" failed with exit code {status}"
385 line_pattern
= re
.compile(r
"^([^=]+)=(.*)$")
387 def tuple_from_line(line
: str) -> Tuple
[str, str]:
388 match
= line_pattern
.match(line
)
389 assert match
is not None
390 return match
[1], match
[2]
393 tuple_from_line(line
)
394 for line
in lines
.split("\n")
395 if line_pattern
.match(line
)
398 def get_unit_property(
402 user
: Optional
[str] = None,
404 status
, lines
= self
.systemctl(
405 f
'--no-pager show "{unit}" --property="{property}"',
410 f
'retrieving systemctl property "{property}" for unit "{unit}"'
411 + ("" if user
is None else f
' under user "{user}"')
412 + f
" failed with exit code {status}"
415 invalid_output_message
= (
416 f
'systemctl show --property "{property}" "{unit}"'
417 f
"produced invalid output: {lines}"
420 line_pattern
= re
.compile(r
"^([^=]+)=(.*)$")
421 match
= line_pattern
.match(lines
)
422 assert match
is not None, invalid_output_message
424 assert match
[1] == property, invalid_output_message
427 def systemctl(self
, q
: str, user
: Optional
[str] = None) -> Tuple
[int, str]:
429 Runs `systemctl` commands with optional support for
433 # run `systemctl list-jobs --no-pager`
434 machine.systemctl("list-jobs --no-pager")
436 # spawn a shell for `any-user` and run
437 # `systemctl --user list-jobs --no-pager`
438 machine.systemctl("list-jobs --no-pager", "any-user")
442 q
= q
.replace("'", "\\'")
444 f
"su -l {user} --shell /bin/sh -c "
445 "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
446 f
"systemctl --user {q}'"
448 return self
.execute(f
"systemctl {q}")
450 def require_unit_state(self
, unit
: str, require_state
: str = "active") -> None:
452 f
"checking if unit '{unit}' has reached state '{require_state}'"
454 info
= self
.get_unit_info(unit
)
455 state
= info
["ActiveState"]
456 if state
!= require_state
:
458 f
"Expected unit '{unit}' to to be in state "
459 f
"'{require_state}' but it is in state '{state}'"
462 def _next_newline_closed_block_from_shell(self
) -> str:
466 # This receives up to 4096 bytes from the socket
467 chunk
= self
.shell
.recv(4096)
469 # Probably a broken pipe, return the output we have
472 decoded
= chunk
.decode()
473 output_buffer
+= [decoded
]
474 if decoded
[-1] == "\n":
476 return "".join(output_buffer
)
481 check_return
: bool = True,
482 check_output
: bool = True,
483 timeout
: Optional
[int] = 900,
484 ) -> Tuple
[int, str]:
486 Execute a shell command, returning a list `(status, stdout)`.
488 Commands are run with `set -euo pipefail` set:
490 - If several commands are separated by `;` and one fails, the
491 command as a whole will fail.
493 - For pipelines, the last non-zero exit status will be returned
494 (if there is one; otherwise zero will be returned).
496 - Dereferencing unset variables fails the command.
498 - It will wait for stdout to be closed.
500 If the command detaches, it must close stdout, as `execute` will wait
501 for this to consume all output reliably. This can be achieved by
502 redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or
503 a file. Examples of detaching commands are `sleep 365d &`, where the
504 shell forks a new process that can write to stdout and `xclip -i`, where
505 the `xclip` command itself forks without closing stdout.
507 Takes an optional parameter `check_return` that defaults to `True`.
508 Setting this parameter to `False` will not check for the return code
509 and return -1 instead. This can be used for commands that shut down
510 the VM and would therefore break the pipe that would be used for
511 retrieving the return code.
513 A timeout for the command can be specified (in seconds) using the optional
514 `timeout` parameter, e.g., `execute(cmd, timeout=10)` or
515 `execute(cmd, timeout=None)`. The default is 900 seconds.
520 # Always run command with shell opts
521 command
= f
"set -euo pipefail; {command}"
524 if timeout
is not None:
525 timeout_str
= f
"timeout {timeout}"
527 # While sh is bash on NixOS, this is not the case for every distro.
528 # We explicitly call bash here to allow for the driver to boot other distros as well.
530 f
"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n"
534 self
.shell
.send(out_command
.encode())
540 output
= base64
.b64decode(self
._next
_newline
_closed
_block
_from
_shell
())
543 return (-1, output
.decode())
545 # Get the return code
546 self
.shell
.send(b
"echo ${PIPESTATUS[0]}\n")
547 rc
= int(self
._next
_newline
_closed
_block
_from
_shell
().strip())
549 return (rc
, output
.decode(errors
="replace"))
551 def shell_interact(self
, address
: Optional
[str] = None) -> None:
553 Allows you to directly interact with the guest shell. This should
554 only be used during test development, not in production tests.
555 Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends
561 address
= "READLINE,prompt=$ "
562 self
.log("Terminal is ready (there is no initial prompt):")
567 ["socat", address
, f
"FD:{self.shell.fileno()}"],
568 pass_fds
=[self
.shell
.fileno()],
570 # allow users to cancel this command without breaking the test
571 except KeyboardInterrupt:
574 def console_interact(self
) -> None:
576 Allows you to directly interact with QEMU's stdin, by forwarding
577 terminal input to the QEMU process.
578 This is for use with the interactive test driver, not for production
579 tests, which run unattended.
580 Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and
581 `Ctrl-d` closes console and returns to the test runner.
583 self
.log("Terminal is ready (there is no prompt):")
586 assert self
.process
.stdin
590 char
= sys
.stdin
.buffer.read(1)
591 except KeyboardInterrupt:
593 if char
== b
"": # ctrl+d
594 self
.log("Closing connection to the console")
596 self
.send_console(char
.decode())
598 def succeed(self
, *commands
: str, timeout
: Optional
[int] = None) -> str:
600 Execute a shell command, raising an exception if the exit status is
601 not zero, otherwise returning the standard output. Similar to `execute`,
602 except that the timeout is `None` by default. See `execute` for details on
606 for command
in commands
:
607 with self
.nested(f
"must succeed: {command}"):
608 (status
, out
) = self
.execute(command
, timeout
=timeout
)
610 self
.log(f
"output: {out}")
611 raise Exception(f
"command `{command}` failed (exit code {status})")
615 def fail(self
, *commands
: str, timeout
: Optional
[int] = None) -> str:
617 Like `succeed`, but raising an exception if the command returns a zero
621 for command
in commands
:
622 with self
.nested(f
"must fail: {command}"):
623 (status
, out
) = self
.execute(command
, timeout
=timeout
)
625 raise Exception(f
"command `{command}` unexpectedly succeeded")
629 def wait_until_succeeds(self
, command
: str, timeout
: int = 900) -> str:
631 Repeat a shell command with 1-second intervals until it succeeds.
632 Has a default timeout of 900 seconds which can be modified, e.g.
633 `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on
635 Throws an exception on timeout.
639 def check_success(_
: Any
) -> bool:
641 status
, output
= self
.execute(command
, timeout
=timeout
)
644 with self
.nested(f
"waiting for success: {command}"):
645 retry(check_success
, timeout
)
648 def wait_until_fails(self
, command
: str, timeout
: int = 900) -> str:
650 Like `wait_until_succeeds`, but repeating the command until it fails.
654 def check_failure(_
: Any
) -> bool:
656 status
, output
= self
.execute(command
, timeout
=timeout
)
659 with self
.nested(f
"waiting for failure: {command}"):
660 retry(check_failure
, timeout
)
663 def wait_for_shutdown(self
) -> None:
667 with self
.nested("waiting for the VM to power off"):
674 self
.connected
= False
676 def wait_for_qmp_event(
677 self
, event_filter
: Callable
[[dict[str, Any
]], bool], timeout
: int = 60 * 10
680 Wait for a QMP event which you can filter with the `event_filter` function.
681 The function takes as an input a dictionary of the event and if it returns True, we return that event,
682 if it does not, we wait for the next event and retry.
684 It will skip all events received in the meantime, if you want to keep them,
685 you have to do the bookkeeping yourself and store them somewhere.
687 By default, it will wait up to 10 minutes, `timeout` is in seconds.
689 if self
.qmp_client
is None:
690 raise RuntimeError("QMP API is not ready yet, is the VM ready?")
694 evt
= self
.qmp_client
.wait_for_event(timeout
=timeout
)
695 if event_filter(evt
):
698 elapsed
= time
.time() - start
699 if elapsed
>= timeout
:
702 def get_tty_text(self
, tty
: str) -> str:
703 status
, output
= self
.execute(
704 f
"fold -w$(stty -F /dev/tty{tty} size | "
705 f
"awk '{{print $2}}') /dev/vcs{tty}"
709 def wait_until_tty_matches(self
, tty
: str, regexp
: str, timeout
: int = 900) -> None:
710 """Wait until the visible output on the chosen TTY matches regular
711 expression. Throws an exception on timeout.
713 matcher
= re
.compile(regexp
)
715 def tty_matches(last
: bool) -> bool:
716 text
= self
.get_tty_text(tty
)
719 f
"Last chance to match /{regexp}/ on TTY{tty}, "
720 f
"which currently contains: {text}"
722 return len(matcher
.findall(text
)) > 0
724 with self
.nested(f
"waiting for {regexp} to appear on tty {tty}"):
725 retry(tty_matches
, timeout
)
727 def send_chars(self
, chars
: str, delay
: Optional
[float] = 0.01) -> None:
729 Simulate typing a sequence of characters on the virtual keyboard,
730 e.g., `send_chars("foobar\n")` will type the string `foobar`
731 followed by the Enter key.
733 with self
.nested(f
"sending keys {repr(chars)}"):
735 self
.send_key(char
, delay
, log
=False)
737 def wait_for_file(self
, filename
: str, timeout
: int = 900) -> None:
739 Waits until the file exists in the machine's file system.
742 def check_file(_
: Any
) -> bool:
743 status
, _
= self
.execute(f
"test -e {filename}")
746 with self
.nested(f
"waiting for file '{filename}'"):
747 retry(check_file
, timeout
)
749 def wait_for_open_port(
750 self
, port
: int, addr
: str = "localhost", timeout
: int = 900
753 Wait until a process is listening on the given TCP port and IP address
754 (default `localhost`).
757 def port_is_open(_
: Any
) -> bool:
758 status
, _
= self
.execute(f
"nc -z {addr} {port}")
761 with self
.nested(f
"waiting for TCP port {port} on {addr}"):
762 retry(port_is_open
, timeout
)
764 def wait_for_open_unix_socket(
765 self
, addr
: str, is_datagram
: bool = False, timeout
: int = 900
768 Wait until a process is listening on the given UNIX-domain socket
769 (default to a UNIX-domain stream socket).
774 "-uU" if is_datagram
else "-U",
777 def socket_is_open(_
: Any
) -> bool:
778 status
, _
= self
.execute(f
"nc {' '.join(nc_flags)} {addr}")
782 f
"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'"
784 retry(socket_is_open
, timeout
)
786 def wait_for_closed_port(
787 self
, port
: int, addr
: str = "localhost", timeout
: int = 900
790 Wait until nobody is listening on the given TCP port and IP address
791 (default `localhost`).
794 def port_is_closed(_
: Any
) -> bool:
795 status
, _
= self
.execute(f
"nc -z {addr} {port}")
798 with self
.nested(f
"waiting for TCP port {port} on {addr} to be closed"):
799 retry(port_is_closed
, timeout
)
801 def start_job(self
, jobname
: str, user
: Optional
[str] = None) -> Tuple
[int, str]:
802 return self
.systemctl(f
"start {jobname}", user
)
804 def stop_job(self
, jobname
: str, user
: Optional
[str] = None) -> Tuple
[int, str]:
805 return self
.systemctl(f
"stop {jobname}", user
)
807 def wait_for_job(self
, jobname
: str) -> None:
808 self
.wait_for_unit(jobname
)
810 def connect(self
) -> None:
811 def shell_ready(timeout_secs
: int) -> bool:
812 """We sent some data from the backdoor service running on the guest
813 to indicate that the backdoor shell is ready.
814 As soon as we read some data from the socket here, we assume that
815 our root shell is operational.
817 (ready
, _
, _
) = select
.select([self
.shell
], [], [], timeout_secs
)
823 with self
.nested("waiting for the VM to finish booting"):
829 # TODO: do we want to bail after a set number of attempts?
830 while not shell_ready(timeout_secs
=30):
831 self
.log("Guest root shell did not produce any data yet...")
833 " To debug, enter the VM and run 'systemctl status backdoor.service'."
837 chunk
= self
.shell
.recv(1024)
838 # No need to print empty strings, it means we are waiting.
841 self
.log(f
"Guest shell says: {chunk!r}")
842 # NOTE: for this to work, nothing must be printed after this line!
843 if b
"Spawning backdoor root shell..." in chunk
:
848 self
.log("connected to guest root shell")
849 self
.log(f
"(connecting took {toc - tic:.2f} seconds)")
850 self
.connected
= True
852 def screenshot(self
, filename
: str) -> None:
854 Take a picture of the display of the virtual machine, in PNG format.
855 The screenshot will be available in the derivation output.
857 if "." not in filename
:
859 if "/" not in filename
:
860 filename
= os
.path
.join(self
.out_dir
, filename
)
861 tmp
= f
"{filename}.ppm"
864 f
"making screenshot {filename}",
865 {"image": os
.path
.basename(filename
)},
867 self
.send_monitor_command(f
"screendump {tmp}")
868 ret
= subprocess
.run(f
"pnmtopng '{tmp}' > '{filename}'", shell
=True)
870 if ret
.returncode
!= 0:
871 raise Exception("Cannot convert screenshot")
873 def copy_from_host_via_shell(self
, source
: str, target
: str) -> None:
874 """Copy a file from the host into the guest by piping it over the
875 shell into the destination file. Works without host-guest shared folder.
876 Prefer copy_from_host for whenever possible.
878 with
open(source
, "rb") as fh
:
879 content_b64
= base64
.b64encode(fh
.read()).decode()
881 f
"mkdir -p $(dirname {target})",
882 f
"echo -n {content_b64} | base64 -d > {target}",
885 def copy_from_host(self
, source
: str, target
: str) -> None:
887 Copies a file from host to machine, e.g.,
888 `copy_from_host("myfile", "/etc/my/important/file")`.
890 The first argument is the file on the host. Note that the "host" refers
891 to the environment in which the test driver runs, which is typically the
894 The second argument is the location of the file on the machine that will
897 The file is copied via the `shared_dir` directory which is shared among
898 all the VMs (using a temporary directory).
899 The access rights bits will mimic the ones from the host file and
900 user:group will be root:root.
902 host_src
= Path(source
)
903 vm_target
= Path(target
)
904 with tempfile
.TemporaryDirectory(dir=self
.shared_dir
) as shared_td
:
905 shared_temp
= Path(shared_td
)
906 host_intermediate
= shared_temp
/ host_src
.name
907 vm_shared_temp
= Path("/tmp/shared") / shared_temp
.name
908 vm_intermediate
= vm_shared_temp
/ host_src
.name
910 self
.succeed(make_command(["mkdir", "-p", vm_shared_temp
]))
911 if host_src
.is_dir():
912 shutil
.copytree(host_src
, host_intermediate
)
914 shutil
.copy(host_src
, host_intermediate
)
915 self
.succeed(make_command(["mkdir", "-p", vm_target
.parent
]))
916 self
.succeed(make_command(["cp", "-r", vm_intermediate
, vm_target
]))
918 def copy_from_vm(self
, source
: str, target_dir
: str = "") -> None:
919 """Copy a file from the VM (specified by an in-VM source path) to a path
920 relative to `$out`. The file is copied via the `shared_dir` shared among
921 all the VMs (using a temporary directory).
923 # Compute the source, target, and intermediate shared file names
924 vm_src
= Path(source
)
925 with tempfile
.TemporaryDirectory(dir=self
.shared_dir
) as shared_td
:
926 shared_temp
= Path(shared_td
)
927 vm_shared_temp
= Path("/tmp/shared") / shared_temp
.name
928 vm_intermediate
= vm_shared_temp
/ vm_src
.name
929 intermediate
= shared_temp
/ vm_src
.name
930 # Copy the file to the shared directory inside VM
931 self
.succeed(make_command(["mkdir", "-p", vm_shared_temp
]))
932 self
.succeed(make_command(["cp", "-r", vm_src
, vm_intermediate
]))
933 abs_target
= self
.out_dir
/ target_dir
/ vm_src
.name
934 abs_target
.parent
.mkdir(exist_ok
=True, parents
=True)
935 # Copy the file from the shared directory outside VM
936 if intermediate
.is_dir():
937 shutil
.copytree(intermediate
, abs_target
)
939 shutil
.copy(intermediate
, abs_target
)
941 def dump_tty_contents(self
, tty
: str) -> None:
942 """Debugging: Dump the contents of the TTY<n>"""
943 self
.execute(f
"fold -w 80 /dev/vcs{tty} | systemd-cat")
945 def _get_screen_text_variants(self
, model_ids
: Iterable
[int]) -> List
[str]:
946 with tempfile
.TemporaryDirectory() as tmpdir
:
947 screenshot_path
= os
.path
.join(tmpdir
, "ppm")
948 self
.send_monitor_command(f
"screendump {screenshot_path}")
949 return _perform_ocr_on_screenshot(screenshot_path
, model_ids
)
951 def get_screen_text_variants(self
) -> List
[str]:
953 Return a list of different interpretations of what is currently
954 visible on the machine's screen using optical character
955 recognition. The number and order of the interpretations is not
956 specified and is subject to change, but if no exception is raised at
957 least one will be returned.
960 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
963 return self
._get
_screen
_text
_variants
([0, 1, 2])
965 def get_screen_text(self
) -> str:
967 Return a textual representation of what is currently visible on the
968 machine's screen using optical character recognition.
971 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
974 return self
._get
_screen
_text
_variants
([2])[0]
976 def wait_for_text(self
, regex
: str, timeout
: int = 900) -> None:
978 Wait until the supplied regular expressions matches the textual
979 contents of the screen by using optical character recognition (see
980 `get_screen_text` and `get_screen_text_variants`).
983 This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`.
987 def screen_matches(last
: bool) -> bool:
988 variants
= self
.get_screen_text_variants()
989 for text
in variants
:
990 if re
.search(regex
, text
) is not None:
994 self
.log(f
"Last OCR attempt failed. Text was: {variants}")
998 with self
.nested(f
"waiting for {regex} to appear on screen"):
999 retry(screen_matches
, timeout
)
1001 def wait_for_console_text(self
, regex
: str, timeout
: int |
None = None) -> None:
1003 Wait until the supplied regular expressions match a line of the
1004 serial console output.
1005 This method is useful when OCR is not possible or inaccurate.
1007 # Buffer the console output, this is needed
1008 # to match multiline regexes.
1009 console
= io
.StringIO()
1011 def console_matches(_
: Any
) -> bool:
1014 # This will return as soon as possible and
1016 console
.write(self
.last_lines
.get(block
=False))
1020 matches
= re
.search(regex
, console
.read())
1021 return matches
is not None
1023 with self
.nested(f
"waiting for {regex} to appear on console"):
1024 if timeout
is not None:
1025 retry(console_matches
, timeout
)
1027 while not console_matches(False):
1031 self
, key
: str, delay
: Optional
[float] = 0.01, log
: Optional
[bool] = True
1034 Simulate pressing keys on the virtual keyboard, e.g.,
1035 `send_key("ctrl-alt-delete")`.
1037 Please also refer to the QEMU documentation for more information on the
1038 input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys
1040 key
= CHAR_TO_KEY
.get(key
, key
)
1041 context
= self
.nested(f
"sending key {repr(key)}") if log
else nullcontext()
1043 self
.send_monitor_command(f
"sendkey {key}")
1044 if delay
is not None:
1047 def send_console(self
, chars
: str) -> None:
1049 Send keys to the kernel console. This allows interaction with the systemd
1050 emergency mode, for example. Takes a string that is sent, e.g.,
1051 `send_console("\n\nsystemctl default\n")`.
1054 assert self
.process
.stdin
1055 self
.process
.stdin
.write(chars
.encode())
1056 self
.process
.stdin
.flush()
1058 def start(self
, allow_reboot
: bool = False) -> None:
1060 Start the virtual machine. This method is asynchronous --- it does
1061 not wait for the machine to finish booting.
1066 self
.log("starting vm")
1068 def clear(path
: Path
) -> Path
:
1073 def create_socket(path
: Path
) -> socket
.socket
:
1074 s
= socket
.socket(family
=socket
.AF_UNIX
, type=socket
.SOCK_STREAM
)
1079 monitor_socket
= create_socket(clear(self
.monitor_path
))
1080 shell_socket
= create_socket(clear(self
.shell_path
))
1081 self
.process
= self
.start_command
.run(
1089 self
.monitor
, _
= monitor_socket
.accept()
1090 self
.shell
, _
= shell_socket
.accept()
1091 self
.qmp_client
= QMPSession
.from_path(self
.qmp_path
)
1093 # Store last serial console lines for use
1094 # of wait_for_console_text
1095 self
.last_lines
: Queue
= Queue()
1097 def process_serial_output() -> None:
1099 assert self
.process
.stdout
1100 for _line
in self
.process
.stdout
:
1101 # Ignore undecodable bytes that may occur in boot menus
1102 line
= _line
.decode(errors
="ignore").replace("\r", "").rstrip()
1103 self
.last_lines
.put(line
)
1104 self
.log_serial(line
)
1106 self
.serial_thread
= threading
.Thread(target
=process_serial_output
)
1107 self
.serial_thread
.start()
1109 self
.wait_for_monitor_prompt()
1111 self
.pid
= self
.process
.pid
1114 self
.log(f
"QEMU running (pid {self.pid})")
1116 def cleanup_statedir(self
) -> None:
1117 shutil
.rmtree(self
.state_dir
)
1118 self
.logger
.log(f
"deleting VM state directory {self.state_dir}")
1119 self
.logger
.log("if you want to keep the VM state, pass --keep-vm-state")
1121 def shutdown(self
) -> None:
1123 Shut down the machine, waiting for the VM to exit.
1129 self
.shell
.send(b
"poweroff\n")
1130 self
.wait_for_shutdown()
1132 def crash(self
) -> None:
1134 Simulate a sudden power failure, by telling the VM to exit immediately.
1139 self
.log("forced crash")
1140 self
.send_monitor_command("quit")
1141 self
.wait_for_shutdown()
1143 def reboot(self
) -> None:
1144 """Press Ctrl+Alt+Delete in the guest.
1146 Prepares the machine to be reconnected which is useful if the
1147 machine was started with `allow_reboot = True`
1149 self
.send_key("ctrl-alt-delete")
1150 self
.connected
= False
1152 def wait_for_x(self
, timeout
: int = 900) -> None:
1154 Wait until it is possible to connect to the X server.
1157 def check_x(_
: Any
) -> bool:
1159 "journalctl -b SYSLOG_IDENTIFIER=systemd | "
1160 + 'grep "Reached target Current graphical"'
1162 status
, _
= self
.execute(cmd
)
1165 status
, _
= self
.execute("[ -e /tmp/.X11-unix/X0 ]")
1168 with self
.nested("waiting for the X11 server"):
1169 retry(check_x
, timeout
)
1171 def get_window_names(self
) -> List
[str]:
1172 return self
.succeed(
1173 r
"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\
([^
\"]*\
)\".*/\
1/; t
; d
'"
1176 def wait_for_window(self, regexp: str, timeout: int = 900) -> None:
1178 Wait until an X11 window has appeared whose name matches the given
1179 regular expression, e.g., `wait_for_window("Terminal")`.
1181 pattern = re.compile(regexp)
1183 def window_is_visible(last_try: bool) -> bool:
1184 names = self.get_window_names()
1187 f"Last chance to match {regexp} on the window list,"
1188 + " which currently contains: "
1191 return any(pattern.search(name) for name in names)
1193 with self.nested("waiting for a window to appear"):
1194 retry(window_is_visible, timeout)
1196 def sleep(self, secs: int) -> None:
1197 # We want to sleep in *guest* time, not *host* time.
1198 self.succeed(f"sleep {secs}")
1200 def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None:
1202 Forward a TCP port on the host to a TCP port on the guest.
1203 Useful during interactive testing.
1205 self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}")
1207 def block(self) -> None:
1209 Simulate unplugging the Ethernet cable that connects the machine to
1211 This happens by shutting down eth1 (the multicast interface used to talk
1212 to the other VMs). eth0 is kept online to still enable the test driver
1213 to communicate with the machine.
1215 self.send_monitor_command("set_link virtio-net-pci.1 off")
1217 def unblock(self) -> None:
1219 Undo the effect of `block`.
1221 self.send_monitor_command("set_link virtio-net-pci.1 on")
1223 def release(self) -> None:
1224 if self.pid is None:
1226 self.logger.info(f"kill machine (pid {self.pid})")
1230 assert self.serial_thread
1232 self.process.terminate()
1234 self.monitor.close()
1235 self.serial_thread.join()
1237 def run_callbacks(self) -> None:
1238 for callback in self.callbacks:
1241 def switch_root(self) -> None:
1243 Transition from stage 1 to stage 2. This requires the
1244 machine to be configured with `testing.initrdBackdoor = true`
1245 and `boot.initrd.systemd.enable = true`.
1247 self.wait_for_unit("initrd.target")
1249 "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null",
1253 self.connected = False