vuls: init at 0.27.0
[NixPkgs.git] / nixos / lib / test-driver / test_driver / qmp.py
blob62ca6d7d5b802c257360d8cb0980b08a144f044a
1 import json
2 import logging
3 import os
4 import socket
5 from collections.abc import Iterator
6 from pathlib import Path
7 from queue import Queue
8 from typing import Any
10 logger = logging.getLogger(__name__)
13 class QMPAPIError(RuntimeError):
14 def __init__(self, message: dict[str, Any]):
15 assert "error" in message, "Not an error message!"
16 try:
17 self.class_name = message["class"]
18 self.description = message["desc"]
19 # NOTE: Some errors can occur before the Server is able to read the
20 # id member; in these cases the id member will not be part of the
21 # error response, even if provided by the client.
22 self.transaction_id = message.get("id")
23 except KeyError:
24 raise RuntimeError("Malformed QMP API error response")
26 def __str__(self) -> str:
27 return f"<QMP API error related to transaction {self.transaction_id} [{self.class_name}]: {self.description}>"
30 class QMPSession:
31 def __init__(self, sock: socket.socket) -> None:
32 self.sock = sock
33 self.results: Queue[dict[str, str]] = Queue()
34 self.pending_events: Queue[dict[str, Any]] = Queue()
35 self.reader = sock.makefile("r")
36 self.writer = sock.makefile("w")
37 # Make the reader non-blocking so we can kind of select on it.
38 os.set_blocking(self.reader.fileno(), False)
39 hello = self._wait_for_new_result()
40 logger.debug(f"Got greeting from QMP API: {hello}")
41 # The greeting message format is:
42 # { "QMP": { "version": json-object, "capabilities": json-array } }
43 assert "QMP" in hello, f"Unexpected result: {hello}"
44 self.send("qmp_capabilities")
46 @classmethod
47 def from_path(cls, path: Path) -> "QMPSession":
48 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
49 sock.connect(str(path))
50 return cls(sock)
52 def __del__(self) -> None:
53 self.sock.close()
55 def _wait_for_new_result(self) -> dict[str, str]:
56 assert self.results.empty(), "Results set is not empty, missed results!"
57 while self.results.empty():
58 self.read_pending_messages()
59 return self.results.get()
61 def read_pending_messages(self) -> None:
62 line = self.reader.readline()
63 if not line:
64 return
65 evt_or_result = json.loads(line)
66 logger.debug(f"Received a message: {evt_or_result}")
68 # It's a result
69 if "return" in evt_or_result or "QMP" in evt_or_result:
70 self.results.put(evt_or_result)
71 # It's an event
72 elif "event" in evt_or_result:
73 self.pending_events.put(evt_or_result)
74 else:
75 raise QMPAPIError(evt_or_result)
77 def wait_for_event(self, timeout: int = 10) -> dict[str, Any]:
78 while self.pending_events.empty():
79 self.read_pending_messages()
81 return self.pending_events.get(timeout=timeout)
83 def events(self, timeout: int = 10) -> Iterator[dict[str, Any]]:
84 while not self.pending_events.empty():
85 yield self.pending_events.get(timeout=timeout)
87 def send(self, cmd: str, args: dict[str, str] = {}) -> dict[str, str]:
88 self.read_pending_messages()
89 assert self.results.empty(), "Results set is not empty, missed results!"
90 data: dict[str, Any] = dict(execute=cmd)
91 if args != {}:
92 data["arguments"] = args
94 logger.debug(f"Sending {data} to QMP...")
95 json.dump(data, self.writer)
96 self.writer.write("\n")
97 self.writer.flush()
98 return self._wait_for_new_result()