5 from collections
.abc
import Iterator
6 from pathlib
import Path
7 from queue
import Queue
10 logger
= logging
.getLogger(__name__
)
13 class QMPAPIError(RuntimeError):
14 def __init__(self
, message
: dict[str, Any
]):
15 assert "error" in message
, "Not an error message!"
17 self
.class_name
= message
["class"]
18 self
.description
= message
["desc"]
19 # NOTE: Some errors can occur before the Server is able to read the
20 # id member; in these cases the id member will not be part of the
21 # error response, even if provided by the client.
22 self
.transaction_id
= message
.get("id")
24 raise RuntimeError("Malformed QMP API error response")
26 def __str__(self
) -> str:
27 return f
"<QMP API error related to transaction {self.transaction_id} [{self.class_name}]: {self.description}>"
31 def __init__(self
, sock
: socket
.socket
) -> None:
33 self
.results
: Queue
[dict[str, str]] = Queue()
34 self
.pending_events
: Queue
[dict[str, Any
]] = Queue()
35 self
.reader
= sock
.makefile("r")
36 self
.writer
= sock
.makefile("w")
37 # Make the reader non-blocking so we can kind of select on it.
38 os
.set_blocking(self
.reader
.fileno(), False)
39 hello
= self
._wait
_for
_new
_result
()
40 logger
.debug(f
"Got greeting from QMP API: {hello}")
41 # The greeting message format is:
42 # { "QMP": { "version": json-object, "capabilities": json-array } }
43 assert "QMP" in hello
, f
"Unexpected result: {hello}"
44 self
.send("qmp_capabilities")
47 def from_path(cls
, path
: Path
) -> "QMPSession":
48 sock
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
49 sock
.connect(str(path
))
52 def __del__(self
) -> None:
55 def _wait_for_new_result(self
) -> dict[str, str]:
56 assert self
.results
.empty(), "Results set is not empty, missed results!"
57 while self
.results
.empty():
58 self
.read_pending_messages()
59 return self
.results
.get()
61 def read_pending_messages(self
) -> None:
62 line
= self
.reader
.readline()
65 evt_or_result
= json
.loads(line
)
66 logger
.debug(f
"Received a message: {evt_or_result}")
69 if "return" in evt_or_result
or "QMP" in evt_or_result
:
70 self
.results
.put(evt_or_result
)
72 elif "event" in evt_or_result
:
73 self
.pending_events
.put(evt_or_result
)
75 raise QMPAPIError(evt_or_result
)
77 def wait_for_event(self
, timeout
: int = 10) -> dict[str, Any
]:
78 while self
.pending_events
.empty():
79 self
.read_pending_messages()
81 return self
.pending_events
.get(timeout
=timeout
)
83 def events(self
, timeout
: int = 10) -> Iterator
[dict[str, Any
]]:
84 while not self
.pending_events
.empty():
85 yield self
.pending_events
.get(timeout
=timeout
)
87 def send(self
, cmd
: str, args
: dict[str, str] = {}) -> dict[str, str]:
88 self
.read_pending_messages()
89 assert self
.results
.empty(), "Results set is not empty, missed results!"
90 data
: dict[str, Any
] = dict(execute
=cmd
)
92 data
["arguments"] = args
94 logger
.debug(f
"Sending {data} to QMP...")
95 json
.dump(data
, self
.writer
)
96 self
.writer
.write("\n")
98 return self
._wait
_for
_new
_result
()