1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from collections
import OrderedDict
34 from typing
import (Any
, Callable
, Dict
, Iterable
, Iterator
,
35 List
, Optional
, Sequence
, TextIO
, Tuple
, Type
, TypeVar
)
38 from contextlib
import contextmanager
40 from qemu
.machine
import qtest
41 from qemu
.qmp
.legacy
import QMPMessage
, QEMUMonitorProtocol
42 from qemu
.utils
import VerboseProcessError
44 # Use this logger for logging messages directly from the iotests module
45 logger
= logging
.getLogger('qemu.iotests')
46 logger
.addHandler(logging
.NullHandler())
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger
= logging
.getLogger('qemu.iotests.diff_io')
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args
= [os
.environ
.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os
.environ
.get('QEMU_IMG_OPTIONS'):
58 qemu_img_args
+= os
.environ
['QEMU_IMG_OPTIONS'].strip().split(' ')
60 qemu_io_args
= [os
.environ
.get('QEMU_IO_PROG', 'qemu-io')]
61 if os
.environ
.get('QEMU_IO_OPTIONS'):
62 qemu_io_args
+= os
.environ
['QEMU_IO_OPTIONS'].strip().split(' ')
64 qemu_io_args_no_fmt
= [os
.environ
.get('QEMU_IO_PROG', 'qemu-io')]
65 if os
.environ
.get('QEMU_IO_OPTIONS_NO_FMT'):
66 qemu_io_args_no_fmt
+= \
67 os
.environ
['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69 qemu_nbd_prog
= os
.environ
.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args
= [qemu_nbd_prog
]
71 if os
.environ
.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args
+= os
.environ
['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog
= os
.environ
.get('QEMU_PROG', 'qemu')
75 qemu_opts
= os
.environ
.get('QEMU_OPTIONS', '').strip().split(' ')
77 qsd_prog
= os
.environ
.get('QSD_PROG', 'qemu-storage-daemon')
79 gdb_qemu_env
= os
.environ
.get('GDB_OPTIONS')
82 qemu_gdb
= ['gdbserver'] + gdb_qemu_env
.strip().split(' ')
84 qemu_print
= os
.environ
.get('PRINT_QEMU', False)
86 imgfmt
= os
.environ
.get('IMGFMT', 'raw')
87 imgproto
= os
.environ
.get('IMGPROTO', 'file')
90 test_dir
= os
.environ
['TEST_DIR']
91 sock_dir
= os
.environ
['SOCK_DIR']
92 cachemode
= os
.environ
['CACHEMODE']
93 aiomode
= os
.environ
['AIOMODE']
94 qemu_default_machine
= os
.environ
['QEMU_DEFAULT_MACHINE']
96 # We are using these variables as proxies to indicate that we're
97 # not being run via "check". There may be other things set up by
98 # "check" that individual test cases rely on.
99 sys
.stderr
.write('Please run this test via the "check" script\n')
100 sys
.exit(os
.EX_USAGE
)
103 if os
.environ
.get('VALGRIND_QEMU') == "y" and \
104 os
.environ
.get('NO_VALGRIND') != "y":
105 valgrind_logfile
= "--log-file=" + test_dir
106 # %p allows to put the valgrind process PID, since
107 # we don't know it a priori (subprocess.Popen is
109 valgrind_logfile
+= "/%p.valgrind"
111 qemu_valgrind
= ['valgrind', valgrind_logfile
, '--error-exitcode=99']
113 luks_default_secret_object
= 'secret,id=keysec0,data=' + \
114 os
.environ
.get('IMGKEYSECRET', '')
115 luks_default_key_secret_opt
= 'key-secret=keysec0'
117 sample_img_dir
= os
.environ
['SAMPLE_IMG_DIR']
121 def change_log_level(
122 logger_name
: str, level
: int = logging
.CRITICAL
) -> Iterator
[None]:
124 Utility function for temporarily changing the log level of a logger.
126 This can be used to silence errors that are expected or uninteresting.
128 _logger
= logging
.getLogger(logger_name
)
129 current_level
= _logger
.level
130 _logger
.setLevel(level
)
135 _logger
.setLevel(current_level
)
138 def unarchive_sample_image(sample
, fname
):
139 sample_fname
= os
.path
.join(sample_img_dir
, sample
+ '.bz2')
140 with bz2
.open(sample_fname
) as f_in
, open(fname
, 'wb') as f_out
:
141 shutil
.copyfileobj(f_in
, f_out
)
144 def qemu_tool_popen(args
: Sequence
[str],
145 connect_stderr
: bool = True) -> 'subprocess.Popen[str]':
146 stderr
= subprocess
.STDOUT
if connect_stderr
else None
147 # pylint: disable=consider-using-with
148 return subprocess
.Popen(args
,
149 stdout
=subprocess
.PIPE
,
151 universal_newlines
=True)
154 def qemu_tool_pipe_and_status(tool
: str, args
: Sequence
[str],
155 connect_stderr
: bool = True,
156 drop_successful_output
: bool = False) \
159 Run a tool and return both its output and its exit code
161 with
qemu_tool_popen(args
, connect_stderr
) as subp
:
162 output
= subp
.communicate()[0]
163 if subp
.returncode
< 0:
165 sys
.stderr
.write(f
'{tool} received signal \
166 {-subp.returncode}: {cmd}\n')
167 if drop_successful_output
and subp
.returncode
== 0:
169 return (output
, subp
.returncode
)
171 def qemu_img_create_prepare_args(args
: List
[str]) -> List
[str]:
172 if not args
or args
[0] != 'create':
176 p
= argparse
.ArgumentParser(allow_abbrev
=False)
177 # -o option may be specified several times
178 p
.add_argument('-o', action
='append', default
=[])
180 parsed
, remaining
= p
.parse_known_args(args
)
185 if parsed
.f
is not None:
186 result
+= ['-f', parsed
.f
]
188 # IMGOPTS most probably contain options specific for the selected format,
189 # like extended_l2 or compression_type for qcow2. Test may want to create
190 # additional images in other formats that doesn't support these options.
191 # So, use IMGOPTS only for images created in imgfmt format.
192 imgopts
= os
.environ
.get('IMGOPTS')
193 if imgopts
and parsed
.f
== imgfmt
:
194 opts_list
.insert(0, imgopts
)
196 # default luks support
197 if parsed
.f
== 'luks' and \
198 all('key-secret' not in opts
for opts
in opts_list
):
199 result
+= ['--object', luks_default_secret_object
]
200 opts_list
.append(luks_default_key_secret_opt
)
202 for opts
in opts_list
:
203 result
+= ['-o', opts
]
210 def qemu_tool(*args
: str, check
: bool = True, combine_stdio
: bool = True
211 ) -> 'subprocess.CompletedProcess[str]':
213 Run a qemu tool and return its status code and console output.
215 :param args: full command line to run.
216 :param check: Enforce a return code of zero.
217 :param combine_stdio: set to False to keep stdout/stderr separated.
219 :raise VerboseProcessError:
220 When the return code is negative, or on any non-zero exit code
221 when 'check=True' was provided (the default). This exception has
222 'stdout', 'stderr', and 'returncode' properties that may be
223 inspected to show greater detail. If this exception is not
224 handled, the command-line, return code, and all console output
225 will be included at the bottom of the stack trace.
228 a CompletedProcess. This object has args, returncode, and stdout
229 properties. If streams are not combined, it will also have a
232 subp
= subprocess
.run(
234 stdout
=subprocess
.PIPE
,
235 stderr
=subprocess
.STDOUT
if combine_stdio
else subprocess
.PIPE
,
236 universal_newlines
=True,
240 if check
and subp
.returncode
or (subp
.returncode
< 0):
241 raise VerboseProcessError(
242 subp
.returncode
, args
,
250 def qemu_img(*args
: str, check
: bool = True, combine_stdio
: bool = True
251 ) -> 'subprocess.CompletedProcess[str]':
253 Run QEMU_IMG_PROG and return its status code and console output.
255 This function always prepends QEMU_IMG_OPTIONS and may further alter
256 the args for 'create' commands.
258 See `qemu_tool()` for greater detail.
260 full_args
= qemu_img_args
+ qemu_img_create_prepare_args(list(args
))
261 return qemu_tool(*full_args
, check
=check
, combine_stdio
=combine_stdio
)
264 def ordered_qmp(qmsg
, conv_keys
=True):
265 # Dictionaries are not ordered prior to 3.6, therefore:
266 if isinstance(qmsg
, list):
267 return [ordered_qmp(atom
) for atom
in qmsg
]
268 if isinstance(qmsg
, dict):
270 for k
, v
in sorted(qmsg
.items()):
272 k
= k
.replace('_', '-')
273 od
[k
] = ordered_qmp(v
, conv_keys
=False)
277 def qemu_img_create(*args
: str) -> 'subprocess.CompletedProcess[str]':
278 return qemu_img('create', *args
)
280 def qemu_img_json(*args
: str) -> Any
:
282 Run qemu-img and return its output as deserialized JSON.
284 :raise CalledProcessError:
285 When qemu-img crashes, or returns a non-zero exit code without
286 producing a valid JSON document to stdout.
287 :raise JSONDecoderError:
288 When qemu-img returns 0, but failed to produce a valid JSON document.
290 :return: A deserialized JSON object; probably a dict[str, Any].
293 res
= qemu_img(*args
, combine_stdio
=False)
294 except subprocess
.CalledProcessError
as exc
:
295 # Terminated due to signal. Don't bother.
296 if exc
.returncode
< 0:
299 # Commands like 'check' can return failure (exit codes 2 and 3)
300 # to indicate command completion, but with errors found. For
301 # multi-command flexibility, ignore the exact error codes and
302 # *try* to load JSON.
304 return json
.loads(exc
.stdout
)
305 except json
.JSONDecodeError
:
306 # Nope. This thing is toast. Raise the /process/ error.
310 return json
.loads(res
.stdout
)
312 def qemu_img_measure(*args
: str) -> Any
:
313 return qemu_img_json("measure", "--output", "json", *args
)
315 def qemu_img_check(*args
: str) -> Any
:
316 return qemu_img_json("check", "--output", "json", *args
)
318 def qemu_img_info(*args
: str) -> Any
:
319 return qemu_img_json('info', "--output", "json", *args
)
321 def qemu_img_map(*args
: str) -> Any
:
322 return qemu_img_json('map', "--output", "json", *args
)
324 def qemu_img_log(*args
: str, check
: bool = True
325 ) -> 'subprocess.CompletedProcess[str]':
326 result
= qemu_img(*args
, check
=check
)
327 log(result
.stdout
, filters
=[filter_testfiles
])
330 def img_info_log(filename
: str, filter_path
: Optional
[str] = None,
331 use_image_opts
: bool = False, extra_args
: Sequence
[str] = (),
332 check
: bool = True, drop_child_info
: bool = True,
336 args
.append('--image-opts')
338 args
+= ['-f', imgfmt
]
340 args
.append(filename
)
342 output
= qemu_img(*args
, check
=check
).stdout
344 filter_path
= filename
345 log(filter_img_info(output
, filter_path
, drop_child_info
))
347 def qemu_io_wrap_args(args
: Sequence
[str]) -> List
[str]:
348 if '-f' in args
or '--image-opts' in args
:
349 return qemu_io_args_no_fmt
+ list(args
)
351 return qemu_io_args
+ list(args
)
353 def qemu_io_popen(*args
):
354 return qemu_tool_popen(qemu_io_wrap_args(args
))
356 def qemu_io(*args
: str, check
: bool = True, combine_stdio
: bool = True
357 ) -> 'subprocess.CompletedProcess[str]':
359 Run QEMU_IO_PROG and return the status code and console output.
361 This function always prepends either QEMU_IO_OPTIONS or
362 QEMU_IO_OPTIONS_NO_FMT.
364 return qemu_tool(*qemu_io_wrap_args(args
),
365 check
=check
, combine_stdio
=combine_stdio
)
367 def qemu_io_log(*args
: str, check
: bool = True
368 ) -> 'subprocess.CompletedProcess[str]':
369 result
= qemu_io(*args
, check
=check
)
370 log(result
.stdout
, filters
=[filter_testfiles
, filter_qemu_io
])
373 class QemuIoInteractive
:
374 def __init__(self
, *args
):
375 self
.args
= qemu_io_wrap_args(args
)
376 # We need to keep the Popen objext around, and not
377 # close it immediately. Therefore, disable the pylint check:
378 # pylint: disable=consider-using-with
379 self
._p
= subprocess
.Popen(self
.args
, stdin
=subprocess
.PIPE
,
380 stdout
=subprocess
.PIPE
,
381 stderr
=subprocess
.STDOUT
,
382 universal_newlines
=True)
383 out
= self
._p
.stdout
.read(9)
384 if out
!= 'qemu-io> ':
385 # Most probably qemu-io just failed to start.
386 # Let's collect the whole output and exit.
387 out
+= self
._p
.stdout
.read()
388 self
._p
.wait(timeout
=1)
389 raise ValueError(out
)
392 self
._p
.communicate('q\n')
394 def _read_output(self
):
395 pattern
= 'qemu-io> '
400 c
= self
._p
.stdout
.read(1)
401 # check unexpected EOF
404 if c
== pattern
[pos
]:
409 return ''.join(s
[:-n
])
412 # quit command is in close(), '\n' is added automatically
413 assert '\n' not in cmd
415 assert cmd
not in ('q', 'quit')
416 self
._p
.stdin
.write(cmd
+ '\n')
417 self
._p
.stdin
.flush()
418 return self
._read
_output
()
421 class QemuStorageDaemon
:
422 _qmp
: Optional
[QEMUMonitorProtocol
] = None
423 _qmpsock
: Optional
[str] = None
424 # Python < 3.8 would complain if this type were not a string literal
425 # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426 _p
: 'Optional[subprocess.Popen[bytes]]' = None
428 def __init__(self
, *args
: str, instance_id
: str = 'a', qmp
: bool = False):
429 assert '--pidfile' not in args
430 self
.pidfile
= os
.path
.join(test_dir
, f
'qsd-{instance_id}-pid')
431 all_args
= [qsd_prog
] + list(args
) + ['--pidfile', self
.pidfile
]
434 self
._qmpsock
= os
.path
.join(sock_dir
, f
'qsd-{instance_id}.sock')
435 all_args
+= ['--chardev',
436 f
'socket,id=qmp-sock,path={self._qmpsock}',
437 '--monitor', 'qmp-sock']
439 self
._qmp
= QEMUMonitorProtocol(self
._qmpsock
, server
=True)
441 # Cannot use with here, we want the subprocess to stay around
442 # pylint: disable=consider-using-with
443 self
._p
= subprocess
.Popen(all_args
)
444 if self
._qmp
is not None:
446 while not os
.path
.exists(self
.pidfile
):
447 if self
._p
.poll() is not None:
448 cmd
= ' '.join(all_args
)
450 'qemu-storage-daemon terminated with exit code ' +
451 f
'{self._p.returncode}: {cmd}')
455 with
open(self
.pidfile
, encoding
='utf-8') as f
:
456 self
._pid
= int(f
.read().strip())
458 assert self
._pid
== self
._p
.pid
460 def qmp(self
, cmd
: str, args
: Optional
[Dict
[str, object]] = None) \
462 assert self
._qmp
is not None
463 return self
._qmp
.cmd(cmd
, args
)
465 def stop(self
, kill_signal
=15):
466 self
._p
.send_signal(kill_signal
)
473 if self
._qmpsock
is not None:
475 os
.remove(self
._qmpsock
)
479 os
.remove(self
.pidfile
)
484 if self
._p
is not None:
485 self
.stop(kill_signal
=9)
489 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
490 return subprocess
.call(qemu_nbd_args
+ ['--fork'] + list(args
))
492 def qemu_nbd_early_pipe(*args
: str) -> Tuple
[int, str]:
493 '''Run qemu-nbd in daemon mode and return both the parent's exit code
494 and its output in case of an error'''
495 full_args
= qemu_nbd_args
+ ['--fork'] + list(args
)
496 output
, returncode
= qemu_tool_pipe_and_status('qemu-nbd', full_args
,
497 connect_stderr
=False)
498 return returncode
, output
if returncode
else ''
500 def qemu_nbd_list_log(*args
: str) -> str:
501 '''Run qemu-nbd to list remote exports'''
502 full_args
= [qemu_nbd_prog
, '-L'] + list(args
)
503 output
, _
= qemu_tool_pipe_and_status('qemu-nbd', full_args
)
504 log(output
, filters
=[filter_testfiles
, filter_nbd_exports
])
508 def qemu_nbd_popen(*args
):
509 '''Context manager running qemu-nbd within the context'''
510 pid_file
= file_path("qemu_nbd_popen-nbd-pid-file")
512 assert not os
.path
.exists(pid_file
)
514 cmd
= list(qemu_nbd_args
)
515 cmd
.extend(('--persistent', '--pid-file', pid_file
))
518 log('Start NBD server')
519 with subprocess
.Popen(cmd
) as p
:
521 while not os
.path
.exists(pid_file
):
522 if p
.poll() is not None:
524 "qemu-nbd terminated with exit code {}: {}"
525 .format(p
.returncode
, ' '.join(cmd
)))
530 if os
.path
.exists(pid_file
):
532 log('Kill NBD server')
536 def compare_images(img1
: str, img2
: str,
537 fmt1
: str = imgfmt
, fmt2
: str = imgfmt
) -> bool:
539 Compare two images with QEMU_IMG; return True if they are identical.
541 :raise CalledProcessError:
542 when qemu-img crashes or returns a status code of anything other
543 than 0 (identical) or 1 (different).
546 qemu_img('compare', '-f', fmt1
, '-F', fmt2
, img1
, img2
)
548 except subprocess
.CalledProcessError
as exc
:
549 if exc
.returncode
== 1:
553 def create_image(name
, size
):
554 '''Create a fully-allocated raw image with sector markers'''
555 with
open(name
, 'wb') as file:
558 sector
= struct
.pack('>l504xl', i
// 512, i
// 512)
562 def image_size(img
: str) -> int:
563 """Return image's virtual size"""
564 value
= qemu_img_info('-f', imgfmt
, img
)['virtual-size']
565 if not isinstance(value
, int):
566 type_name
= type(value
).__name
__
567 raise TypeError("Expected 'int' for 'virtual-size', "
568 f
"got '{value}' of type '{type_name}'")
572 return isinstance(val
, str)
574 test_dir_re
= re
.compile(r
"%s" % test_dir
)
575 def filter_test_dir(msg
):
576 return test_dir_re
.sub("TEST_DIR", msg
)
578 win32_re
= re
.compile(r
"\r")
579 def filter_win32(msg
):
580 return win32_re
.sub("", msg
)
582 qemu_io_re
= re
.compile(r
"[0-9]* ops; [0-9\/:. sec]* "
583 r
"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
584 r
"and [0-9\/.inf]* ops\/sec\)")
585 def filter_qemu_io(msg
):
586 msg
= filter_win32(msg
)
587 return qemu_io_re
.sub("X ops; XX:XX:XX.X "
588 "(XXX YYY/sec and XXX ops/sec)", msg
)
590 chown_re
= re
.compile(r
"chown [0-9]+:[0-9]+")
591 def filter_chown(msg
):
592 return chown_re
.sub("chown UID:GID", msg
)
594 def filter_qmp_event(event
):
595 '''Filter a QMP event dict'''
597 if 'timestamp' in event
:
598 event
['timestamp']['seconds'] = 'SECS'
599 event
['timestamp']['microseconds'] = 'USECS'
602 def filter_qmp(qmsg
, filter_fn
):
603 '''Given a string filter, filter a QMP object's values.
604 filter_fn takes a (key, value) pair.'''
605 # Iterate through either lists or dicts;
606 if isinstance(qmsg
, list):
607 items
= enumerate(qmsg
)
608 elif isinstance(qmsg
, dict):
611 return filter_fn(None, qmsg
)
614 if isinstance(v
, (dict, list)):
615 qmsg
[k
] = filter_qmp(v
, filter_fn
)
617 qmsg
[k
] = filter_fn(k
, v
)
620 def filter_testfiles(msg
):
621 pref1
= os
.path
.join(test_dir
, "%s-" % (os
.getpid()))
622 pref2
= os
.path
.join(sock_dir
, "%s-" % (os
.getpid()))
623 return msg
.replace(pref1
, 'TEST_DIR/PID-').replace(pref2
, 'SOCK_DIR/PID-')
625 def filter_qmp_testfiles(qmsg
):
626 def _filter(_key
, value
):
628 return filter_testfiles(value
)
630 return filter_qmp(qmsg
, _filter
)
632 def filter_virtio_scsi(output
: str) -> str:
633 return re
.sub(r
'(virtio-scsi)-(ccw|pci)', r
'\1', output
)
635 def filter_qmp_virtio_scsi(qmsg
):
636 def _filter(_key
, value
):
638 return filter_virtio_scsi(value
)
640 return filter_qmp(qmsg
, _filter
)
642 def filter_generated_node_ids(msg
):
643 return re
.sub("#block[0-9]+", "NODE_NAME", msg
)
645 def filter_img_info(output
: str, filename
: str,
646 drop_child_info
: bool = True) -> str:
648 drop_indented
= False
649 for line
in output
.split('\n'):
650 if 'disk size' in line
or 'actual-size' in line
:
653 # Drop child node info
655 if line
.startswith(' '):
657 drop_indented
= False
658 if drop_child_info
and "Child node '/" in line
:
662 line
= line
.replace(filename
, 'TEST_IMG')
663 line
= filter_testfiles(line
)
664 line
= line
.replace(imgfmt
, 'IMGFMT')
665 line
= re
.sub('iters: [0-9]+', 'iters: XXX', line
)
666 line
= re
.sub('uuid: [-a-f0-9]+',
667 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
669 line
= re
.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line
)
670 line
= re
.sub('(compression type: )(zlib|zstd)', r
'\1COMPRESSION_TYPE',
673 return '\n'.join(lines
)
675 def filter_imgfmt(msg
):
676 return msg
.replace(imgfmt
, 'IMGFMT')
678 def filter_qmp_imgfmt(qmsg
):
679 def _filter(_key
, value
):
681 return filter_imgfmt(value
)
683 return filter_qmp(qmsg
, _filter
)
685 def filter_nbd_exports(output
: str) -> str:
686 return re
.sub(r
'((min|opt|max) block): [0-9]+', r
'\1: XXX', output
)
689 Msg
= TypeVar('Msg', Dict
[str, Any
], List
[Any
], str)
692 filters
: Iterable
[Callable
[[Msg
], Msg
]] = (),
693 indent
: Optional
[int] = None) -> None:
695 Logs either a string message or a JSON serializable message (like QMP).
696 If indent is provided, JSON serializable messages are pretty-printed.
700 if isinstance(msg
, (dict, list)):
701 # Don't sort if it's already sorted
702 do_sort
= not isinstance(msg
, OrderedDict
)
703 test_logger
.info(json
.dumps(msg
, sort_keys
=do_sort
, indent
=indent
))
705 test_logger
.info(msg
)
708 def __init__(self
, seconds
, errmsg
="Timeout"):
709 self
.seconds
= seconds
712 if qemu_gdb
or qemu_valgrind
:
714 signal
.signal(signal
.SIGALRM
, self
.timeout
)
715 signal
.setitimer(signal
.ITIMER_REAL
, self
.seconds
)
717 def __exit__(self
, exc_type
, value
, traceback
):
718 if qemu_gdb
or qemu_valgrind
:
720 signal
.setitimer(signal
.ITIMER_REAL
, 0)
722 def timeout(self
, signum
, frame
):
723 raise Exception(self
.errmsg
)
725 def file_pattern(name
):
726 return "{0}-{1}".format(os
.getpid(), name
)
730 Context manager generating multiple file names. The generated files are
731 removed when exiting the context.
735 with FilePath('a.img', 'b.img') as (img_a, img_b):
736 # Use img_a and img_b here...
738 # a.img and b.img are automatically removed here.
740 By default images are created in iotests.test_dir. To create sockets use
743 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
745 For convenience, calling with one argument yields a single file instead of
746 a tuple with one item.
749 def __init__(self
, *names
, base_dir
=test_dir
):
750 self
.paths
= [os
.path
.join(base_dir
, file_pattern(name
))
754 if len(self
.paths
) == 1:
759 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
760 for path
in self
.paths
:
774 def file_path_remover():
775 for path
in reversed(file_path_remover
.paths
):
779 def file_path(*names
, base_dir
=test_dir
):
780 ''' Another way to get auto-generated filename that cleans itself up.
784 img_a, img_b = file_path('a.img', 'b.img')
785 sock = file_path('socket')
788 if not hasattr(file_path_remover
, 'paths'):
789 file_path_remover
.paths
= []
790 atexit
.register(file_path_remover
)
794 filename
= file_pattern(name
)
795 path
= os
.path
.join(base_dir
, filename
)
796 file_path_remover
.paths
.append(path
)
799 return paths
[0] if len(paths
) == 1 else paths
801 def remote_filename(path
):
802 if imgproto
== 'file':
804 elif imgproto
== 'ssh':
805 return "ssh://%s@127.0.0.1:22%s" % (os
.environ
.get('USER'), path
)
807 raise Exception("Protocol %s not supported" % (imgproto
))
809 class VM(qtest
.QEMUQtestMachine
):
812 def __init__(self
, path_suffix
=''):
813 name
= "qemu%s-%d" % (path_suffix
, os
.getpid())
814 timer
= 15.0 if not (qemu_gdb
or qemu_valgrind
) else None
815 if qemu_gdb
and qemu_valgrind
:
816 sys
.stderr
.write('gdb and valgrind are mutually exclusive\n')
818 wrapper
= qemu_gdb
if qemu_gdb
else qemu_valgrind
819 super().__init
__(qemu_prog
, qemu_opts
, wrapper
=wrapper
,
821 base_temp_dir
=test_dir
,
822 sock_dir
=sock_dir
, qmp_timer
=timer
)
825 def _post_shutdown(self
) -> None:
826 super()._post
_shutdown
()
827 if not qemu_valgrind
or not self
._popen
:
829 valgrind_filename
= f
"{test_dir}/{self._popen.pid}.valgrind"
830 if self
.exitcode() == 99:
831 with
open(valgrind_filename
, encoding
='utf-8') as f
:
834 os
.remove(valgrind_filename
)
836 def _pre_launch(self
) -> None:
837 super()._pre
_launch
()
839 # set QEMU binary output to stdout
840 self
._close
_qemu
_log
_file
()
842 def add_object(self
, opts
):
843 self
._args
.append('-object')
844 self
._args
.append(opts
)
847 def add_device(self
, opts
):
848 self
._args
.append('-device')
849 self
._args
.append(opts
)
852 def add_drive_raw(self
, opts
):
853 self
._args
.append('-drive')
854 self
._args
.append(opts
)
857 def add_drive(self
, path
, opts
='', interface
='virtio', img_format
=imgfmt
):
858 '''Add a virtio-blk drive to the VM'''
859 options
= ['if=%s' % interface
,
860 'id=drive%d' % self
._num
_drives
]
863 options
.append('file=%s' % path
)
864 options
.append('format=%s' % img_format
)
865 options
.append('cache=%s' % cachemode
)
866 options
.append('aio=%s' % aiomode
)
871 if img_format
== 'luks' and 'key-secret' not in opts
:
872 # default luks support
873 if luks_default_secret_object
not in self
._args
:
874 self
.add_object(luks_default_secret_object
)
876 options
.append(luks_default_key_secret_opt
)
878 self
._args
.append('-drive')
879 self
._args
.append(','.join(options
))
880 self
._num
_drives
+= 1
883 def add_blockdev(self
, opts
):
884 self
._args
.append('-blockdev')
885 if isinstance(opts
, str):
886 self
._args
.append(opts
)
888 self
._args
.append(','.join(opts
))
891 def add_incoming(self
, addr
):
892 self
._args
.append('-incoming')
893 self
._args
.append(addr
)
896 def hmp(self
, command_line
: str, use_log
: bool = False) -> QMPMessage
:
897 cmd
= 'human-monitor-command'
898 kwargs
: Dict
[str, Any
] = {'command-line': command_line
}
900 return self
.qmp_log(cmd
, **kwargs
)
902 return self
.qmp(cmd
, **kwargs
)
904 def pause_drive(self
, drive
: str, event
: Optional
[str] = None) -> None:
905 """Pause drive r/w operations"""
907 self
.pause_drive(drive
, "read_aio")
908 self
.pause_drive(drive
, "write_aio")
910 self
.hmp(f
'qemu-io {drive} "break {event} bp_{drive}"')
912 def resume_drive(self
, drive
: str) -> None:
913 """Resume drive r/w operations"""
914 self
.hmp(f
'qemu-io {drive} "remove_break bp_{drive}"')
916 def hmp_qemu_io(self
, drive
: str, cmd
: str,
917 use_log
: bool = False, qdev
: bool = False) -> QMPMessage
:
918 """Write to a given drive using an HMP command"""
919 d
= '-d ' if qdev
else ''
920 return self
.hmp(f
'qemu-io {d}{drive} "{cmd}"', use_log
=use_log
)
922 def flatten_qmp_object(self
, obj
, output
=None, basestr
=''):
925 if isinstance(obj
, list):
926 for i
, item
in enumerate(obj
):
927 self
.flatten_qmp_object(item
, output
, basestr
+ str(i
) + '.')
928 elif isinstance(obj
, dict):
930 self
.flatten_qmp_object(obj
[key
], output
, basestr
+ key
+ '.')
932 output
[basestr
[:-1]] = obj
# Strip trailing '.'
935 def qmp_to_opts(self
, obj
):
936 obj
= self
.flatten_qmp_object(obj
)
939 output_list
+= [key
+ '=' + obj
[key
]]
940 return ','.join(output_list
)
942 def get_qmp_events_filtered(self
, wait
=60.0):
944 for ev
in self
.get_qmp_events(wait
=wait
):
945 result
.append(filter_qmp_event(ev
))
948 def qmp_log(self
, cmd
, filters
=(), indent
=None, **kwargs
):
949 full_cmd
= OrderedDict((
951 ("arguments", ordered_qmp(kwargs
))
953 log(full_cmd
, filters
, indent
=indent
)
954 result
= self
.qmp(cmd
, **kwargs
)
955 log(result
, filters
, indent
=indent
)
958 # Returns None on success, and an error string on failure
959 def run_job(self
, job
: str, auto_finalize
: bool = True,
960 auto_dismiss
: bool = False,
961 pre_finalize
: Optional
[Callable
[[], None]] = None,
962 cancel
: bool = False, wait
: float = 60.0,
963 filters
: Iterable
[Callable
[[Any
], Any
]] = (),
966 run_job moves a job from creation through to dismissal.
968 :param job: String. ID of recently-launched job
969 :param auto_finalize: Bool. True if the job was launched with
970 auto_finalize. Defaults to True.
971 :param auto_dismiss: Bool. True if the job was launched with
972 auto_dismiss=True. Defaults to False.
973 :param pre_finalize: Callback. A callable that takes no arguments to be
974 invoked prior to issuing job-finalize, if any.
975 :param cancel: Bool. When true, cancels the job after the pre_finalize
977 :param wait: Float. Timeout value specifying how long to wait for any
978 event, in seconds. Defaults to 60.0.
980 match_device
= {'data': {'device': job
}}
981 match_id
= {'data': {'id': job
}}
983 ('BLOCK_JOB_COMPLETED', match_device
),
984 ('BLOCK_JOB_CANCELLED', match_device
),
985 ('BLOCK_JOB_ERROR', match_device
),
986 ('BLOCK_JOB_READY', match_device
),
987 ('BLOCK_JOB_PENDING', match_id
),
988 ('JOB_STATUS_CHANGE', match_id
)
992 ev
= filter_qmp_event(self
.events_wait(events
, timeout
=wait
))
993 if ev
['event'] != 'JOB_STATUS_CHANGE':
994 log(ev
, filters
=filters
)
996 status
= ev
['data']['status']
997 if status
== 'aborting':
998 result
= self
.qmp('query-jobs')
999 for j
in result
['return']:
1002 log('Job failed: %s' % (j
['error']), filters
=filters
)
1003 elif status
== 'ready':
1004 self
.qmp_log('job-complete', id=job
, filters
=filters
)
1005 elif status
== 'pending' and not auto_finalize
:
1009 self
.qmp_log('job-cancel', id=job
, filters
=filters
)
1011 self
.qmp_log('job-finalize', id=job
, filters
=filters
)
1012 elif status
== 'concluded' and not auto_dismiss
:
1013 self
.qmp_log('job-dismiss', id=job
, filters
=filters
)
1014 elif status
== 'null':
1017 # Returns None on success, and an error string on failure
1018 def blockdev_create(self
, options
, job_id
='job0', filters
=None):
1020 filters
= [filter_qmp_testfiles
]
1021 result
= self
.qmp_log('blockdev-create', filters
=filters
,
1022 job_id
=job_id
, options
=options
)
1024 if 'return' in result
:
1025 assert result
['return'] == {}
1026 job_result
= self
.run_job(job_id
, filters
=filters
)
1028 job_result
= result
['error']
1033 def enable_migration_events(self
, name
):
1034 log('Enabling migration QMP events on %s...' % name
)
1035 log(self
.qmp('migrate-set-capabilities', capabilities
=[
1037 'capability': 'events',
1042 def wait_migration(self
, expect_runstate
: Optional
[str]) -> bool:
1044 event
= self
.event_wait('MIGRATION')
1045 # We use the default timeout, and with a timeout, event_wait()
1046 # never returns None
1049 log(event
, filters
=[filter_qmp_event
])
1050 if event
['data']['status'] in ('completed', 'failed'):
1053 if event
['data']['status'] == 'completed':
1054 # The event may occur in finish-migrate, so wait for the expected
1055 # post-migration runstate
1057 while runstate
!= expect_runstate
:
1058 runstate
= self
.qmp('query-status')['return']['status']
1063 def node_info(self
, node_name
):
1064 nodes
= self
.qmp('query-named-block-nodes')
1065 for x
in nodes
['return']:
1066 if x
['node-name'] == node_name
:
1070 def query_bitmaps(self
):
1071 res
= self
.qmp("query-named-block-nodes")
1072 return {device
['node-name']: device
['dirty-bitmaps']
1073 for device
in res
['return'] if 'dirty-bitmaps' in device
}
1075 def get_bitmap(self
, node_name
, bitmap_name
, recording
=None, bitmaps
=None):
1077 get a specific bitmap from the object returned by query_bitmaps.
1078 :param recording: If specified, filter results by the specified value.
1079 :param bitmaps: If specified, use it instead of call query_bitmaps()
1082 bitmaps
= self
.query_bitmaps()
1084 for bitmap
in bitmaps
[node_name
]:
1085 if bitmap
.get('name', '') == bitmap_name
:
1086 if recording
is None or bitmap
.get('recording') == recording
:
1090 def check_bitmap_status(self
, node_name
, bitmap_name
, fields
):
1091 ret
= self
.get_bitmap(node_name
, bitmap_name
)
1093 return fields
.items() <= ret
.items()
1095 def assert_block_path(self
, root
, path
, expected_node
, graph
=None):
1097 Check whether the node under the given path in the block graph
1100 @root is the node name of the node where the @path is rooted.
1102 @path is a string that consists of child names separated by
1103 slashes. It must begin with a slash.
1105 Examples for @root + @path:
1106 - root="qcow2-node", path="/backing/file"
1107 - root="quorum-node", path="/children.2/file"
1109 Hypothetically, @path could be empty, in which case it would
1110 point to @root. However, in practice this case is not useful
1111 and hence not allowed.
1113 @expected_node may be None. (All elements of the path but the
1114 leaf must still exist.)
1116 @graph may be None or the result of an x-debug-query-block-graph
1117 call that has already been performed.
1120 graph
= self
.qmp('x-debug-query-block-graph')['return']
1122 iter_path
= iter(path
.split('/'))
1124 # Must start with a /
1125 assert next(iter_path
) == ''
1127 node
= next((node
for node
in graph
['nodes'] if node
['name'] == root
),
1130 # An empty @path is not allowed, so the root node must be present
1131 assert node
is not None, 'Root node %s not found' % root
1133 for child_name
in iter_path
:
1134 assert node
is not None, 'Cannot follow path %s%s' % (root
, path
)
1137 node_id
= next(edge
['child'] for edge
in graph
['edges']
1138 if (edge
['parent'] == node
['id'] and
1139 edge
['name'] == child_name
))
1141 node
= next(node
for node
in graph
['nodes']
1142 if node
['id'] == node_id
)
1144 except StopIteration:
1148 assert expected_node
is None, \
1149 'No node found under %s (but expected %s)' % \
1150 (path
, expected_node
)
1152 assert node
['name'] == expected_node
, \
1153 'Found node %s under %s (but expected %s)' % \
1154 (node
['name'], path
, expected_node
)
1156 index_re
= re
.compile(r
'([^\[]+)\[([^\]]+)\]')
1158 class QMPTestCase(unittest
.TestCase
):
1159 '''Abstract base class for QMP test cases'''
1161 def __init__(self
, *args
, **kwargs
):
1162 super().__init
__(*args
, **kwargs
)
1163 # Many users of this class set a VM property we rely on heavily
1164 # in the methods below.
1167 def dictpath(self
, d
, path
):
1168 '''Traverse a path in a nested dict'''
1169 for component
in path
.split('/'):
1170 m
= index_re
.match(component
)
1172 component
, idx
= m
.groups()
1175 if not isinstance(d
, dict) or component
not in d
:
1176 self
.fail(f
'failed path traversal for "{path}" in "{d}"')
1180 if not isinstance(d
, list):
1181 self
.fail(f
'path component "{component}" in "{path}" '
1182 f
'is not a list in "{d}"')
1186 self
.fail(f
'invalid index "{idx}" in path "{path}" '
1190 def assert_qmp_absent(self
, d
, path
):
1192 result
= self
.dictpath(d
, path
)
1193 except AssertionError:
1195 self
.fail('path "%s" has value "%s"' % (path
, str(result
)))
1197 def assert_qmp(self
, d
, path
, value
):
1198 '''Assert that the value for a specific path in a QMP dict
1199 matches. When given a list of values, assert that any of
1202 result
= self
.dictpath(d
, path
)
1204 # [] makes no sense as a list of valid values, so treat it as
1205 # an actual single value.
1206 if isinstance(value
, list) and value
!= []:
1210 self
.fail('no match for "%s" in %s' % (str(result
), str(value
)))
1212 self
.assertEqual(result
, value
,
1213 '"%s" is "%s", expected "%s"'
1214 % (path
, str(result
), str(value
)))
1216 def assert_no_active_block_jobs(self
):
1217 result
= self
.vm
.qmp('query-block-jobs')
1218 self
.assert_qmp(result
, 'return', [])
1220 def assert_has_block_node(self
, node_name
=None, file_name
=None):
1221 """Issue a query-named-block-nodes and assert node_name and/or
1222 file_name is present in the result"""
1223 def check_equal_or_none(a
, b
):
1224 return a
is None or b
is None or a
== b
1225 assert node_name
or file_name
1226 result
= self
.vm
.qmp('query-named-block-nodes')
1227 for x
in result
["return"]:
1228 if check_equal_or_none(x
.get("node-name"), node_name
) and \
1229 check_equal_or_none(x
.get("file"), file_name
):
1231 self
.fail("Cannot find %s %s in result:\n%s" %
1232 (node_name
, file_name
, result
))
1234 def assert_json_filename_equal(self
, json_filename
, reference
):
1235 '''Asserts that the given filename is a json: filename and that its
1236 content is equal to the given reference object'''
1237 self
.assertEqual(json_filename
[:5], 'json:')
1239 self
.vm
.flatten_qmp_object(json
.loads(json_filename
[5:])),
1240 self
.vm
.flatten_qmp_object(reference
)
1243 def cancel_and_wait(self
, drive
='drive0', force
=False,
1244 resume
=False, wait
=60.0):
1245 '''Cancel a block job and wait for it to finish, returning the event'''
1246 result
= self
.vm
.qmp('block-job-cancel', device
=drive
, force
=force
)
1247 self
.assert_qmp(result
, 'return', {})
1250 self
.vm
.resume_drive(drive
)
1254 while not cancelled
:
1255 for event
in self
.vm
.get_qmp_events(wait
=wait
):
1256 if event
['event'] == 'BLOCK_JOB_COMPLETED' or \
1257 event
['event'] == 'BLOCK_JOB_CANCELLED':
1258 self
.assert_qmp(event
, 'data/device', drive
)
1261 elif event
['event'] == 'JOB_STATUS_CHANGE':
1262 self
.assert_qmp(event
, 'data/id', drive
)
1265 self
.assert_no_active_block_jobs()
1268 def wait_until_completed(self
, drive
='drive0', check_offset
=True,
1269 wait
=60.0, error
=None):
1270 '''Wait for a block job to finish, returning the event'''
1272 for event
in self
.vm
.get_qmp_events(wait
=wait
):
1273 if event
['event'] == 'BLOCK_JOB_COMPLETED':
1274 self
.assert_qmp(event
, 'data/device', drive
)
1276 self
.assert_qmp_absent(event
, 'data/error')
1278 self
.assert_qmp(event
, 'data/offset',
1279 event
['data']['len'])
1281 self
.assert_qmp(event
, 'data/error', error
)
1282 self
.assert_no_active_block_jobs()
1284 if event
['event'] == 'JOB_STATUS_CHANGE':
1285 self
.assert_qmp(event
, 'data/id', drive
)
1287 def wait_ready(self
, drive
='drive0'):
1288 """Wait until a BLOCK_JOB_READY event, and return the event."""
1289 return self
.vm
.events_wait([
1291 {'data': {'type': 'mirror', 'device': drive
}}),
1293 {'data': {'type': 'commit', 'device': drive
}})
1296 def wait_ready_and_cancel(self
, drive
='drive0'):
1297 self
.wait_ready(drive
=drive
)
1298 event
= self
.cancel_and_wait(drive
=drive
)
1299 self
.assertEqual(event
['event'], 'BLOCK_JOB_COMPLETED')
1300 self
.assert_qmp(event
, 'data/type', 'mirror')
1301 self
.assert_qmp(event
, 'data/offset', event
['data']['len'])
1303 def complete_and_wait(self
, drive
='drive0', wait_ready
=True,
1304 completion_error
=None):
1305 '''Complete a block job and wait for it to finish'''
1307 self
.wait_ready(drive
=drive
)
1309 result
= self
.vm
.qmp('block-job-complete', device
=drive
)
1310 self
.assert_qmp(result
, 'return', {})
1312 event
= self
.wait_until_completed(drive
=drive
, error
=completion_error
)
1313 self
.assertTrue(event
['data']['type'] in ['mirror', 'commit'])
1315 def pause_wait(self
, job_id
='job0'):
1316 with
Timeout(3, "Timeout waiting for job to pause"):
1318 result
= self
.vm
.qmp('query-block-jobs')
1320 for job
in result
['return']:
1321 if job
['device'] == job_id
:
1323 if job
['paused'] and not job
['busy']:
1328 def pause_job(self
, job_id
='job0', wait
=True):
1329 result
= self
.vm
.qmp('block-job-pause', device
=job_id
)
1330 self
.assert_qmp(result
, 'return', {})
1332 return self
.pause_wait(job_id
)
1335 def case_skip(self
, reason
):
1336 '''Skip this test case'''
1338 self
.skipTest(reason
)
1342 '''Skip this test suite'''
1343 # Each test in qemu-iotests has a number ("seq")
1344 seq
= os
.path
.basename(sys
.argv
[0])
1346 with
open('%s/%s.notrun' % (test_dir
, seq
), 'w', encoding
='utf-8') \
1348 outfile
.write(reason
+ '\n')
1349 logger
.warning("%s not run: %s", seq
, reason
)
1352 def case_notrun(reason
):
1353 '''Mark this test case as not having been run (without actually
1354 skipping it, that is left to the caller). See
1355 QMPTestCase.case_skip() for a variant that actually skips the
1356 current test case.'''
1358 # Each test in qemu-iotests has a number ("seq")
1359 seq
= os
.path
.basename(sys
.argv
[0])
1361 with
open('%s/%s.casenotrun' % (test_dir
, seq
), 'a', encoding
='utf-8') \
1363 outfile
.write(' [case not run] ' + reason
+ '\n')
1365 def _verify_image_format(supported_fmts
: Sequence
[str] = (),
1366 unsupported_fmts
: Sequence
[str] = ()) -> None:
1367 if 'generic' in supported_fmts
and \
1368 os
.environ
.get('IMGFMT_GENERIC', 'true') == 'true':
1370 # _supported_fmt generic
1374 not_sup
= supported_fmts
and (imgfmt
not in supported_fmts
)
1375 if not_sup
or (imgfmt
in unsupported_fmts
):
1376 notrun('not suitable for this image format: %s' % imgfmt
)
1378 if imgfmt
== 'luks':
1379 verify_working_luks()
1381 def _verify_protocol(supported
: Sequence
[str] = (),
1382 unsupported
: Sequence
[str] = ()) -> None:
1383 assert not (supported
and unsupported
)
1385 if 'generic' in supported
:
1388 not_sup
= supported
and (imgproto
not in supported
)
1389 if not_sup
or (imgproto
in unsupported
):
1390 notrun('not suitable for this protocol: %s' % imgproto
)
1392 def _verify_platform(supported
: Sequence
[str] = (),
1393 unsupported
: Sequence
[str] = ()) -> None:
1394 if any((sys
.platform
.startswith(x
) for x
in unsupported
)):
1395 notrun('not suitable for this OS: %s' % sys
.platform
)
1398 if not any((sys
.platform
.startswith(x
) for x
in supported
)):
1399 notrun('not suitable for this OS: %s' % sys
.platform
)
1401 def _verify_cache_mode(supported_cache_modes
: Sequence
[str] = ()) -> None:
1402 if supported_cache_modes
and (cachemode
not in supported_cache_modes
):
1403 notrun('not suitable for this cache mode: %s' % cachemode
)
1405 def _verify_aio_mode(supported_aio_modes
: Sequence
[str] = ()) -> None:
1406 if supported_aio_modes
and (aiomode
not in supported_aio_modes
):
1407 notrun('not suitable for this aio mode: %s' % aiomode
)
1409 def _verify_formats(required_formats
: Sequence
[str] = ()) -> None:
1410 usf_list
= list(set(required_formats
) - set(supported_formats()))
1412 notrun(f
'formats {usf_list} are not whitelisted')
1415 def _verify_virtio_blk() -> None:
1416 out
= qemu_pipe('-M', 'none', '-device', 'help')
1417 if 'virtio-blk' not in out
:
1418 notrun('Missing virtio-blk in QEMU binary')
1420 def _verify_virtio_scsi_pci_or_ccw() -> None:
1421 out
= qemu_pipe('-M', 'none', '-device', 'help')
1422 if 'virtio-scsi-pci' not in out
and 'virtio-scsi-ccw' not in out
:
1423 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1426 def _verify_imgopts(unsupported
: Sequence
[str] = ()) -> None:
1427 imgopts
= os
.environ
.get('IMGOPTS')
1428 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1429 # but it supported only for bash tests. We don't have a concept of global
1430 # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1431 # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1432 unsup
= list(unsupported
) + ['$']
1433 if imgopts
and any(x
in imgopts
for x
in unsup
):
1434 notrun(f
'not suitable for this imgopts: {imgopts}')
1437 def supports_quorum() -> bool:
1438 return 'quorum' in qemu_img('--help').stdout
1440 def verify_quorum():
1441 '''Skip test suite if quorum support is not available'''
1442 if not supports_quorum():
1443 notrun('quorum support missing')
1445 def has_working_luks() -> Tuple
[bool, str]:
1447 Check whether our LUKS driver can actually create images
1448 (this extends to LUKS encryption for qcow2).
1450 If not, return the reason why.
1453 img_file
= f
'{test_dir}/luks-test.luks'
1454 res
= qemu_img('create', '-f', 'luks',
1455 '--object', luks_default_secret_object
,
1456 '-o', luks_default_key_secret_opt
,
1457 '-o', 'iter-time=10',
1467 for line
in res
.stdout
.splitlines():
1468 if img_file
+ ':' in line
:
1469 reason
= line
.split(img_file
+ ':', 1)[1].strip()
1472 return (False, reason
)
1476 def verify_working_luks():
1478 Skip test suite if LUKS does not work
1480 (working
, reason
) = has_working_luks()
1484 def supports_qcow2_zstd_compression() -> bool:
1485 img_file
= f
'{test_dir}/qcow2-zstd-test.qcow2'
1486 res
= qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1494 if res
.returncode
== 1 and \
1495 "'compression-type' does not accept value 'zstd'" in res
.stdout
:
1500 def verify_qcow2_zstd_compression():
1501 if not supports_qcow2_zstd_compression():
1502 notrun('zstd compression not supported')
1504 def qemu_pipe(*args
: str) -> str:
1506 Run qemu with an option to print something and exit (e.g. a help option).
1508 :return: QEMU's stdout output.
1510 full_args
= [qemu_prog
] + qemu_opts
+ list(args
)
1511 output
, _
= qemu_tool_pipe_and_status('qemu', full_args
)
1514 def supported_formats(read_only
=False):
1515 '''Set 'read_only' to True to check ro-whitelist
1516 Otherwise, rw-whitelist is checked'''
1518 if not hasattr(supported_formats
, "formats"):
1519 supported_formats
.formats
= {}
1521 if read_only
not in supported_formats
.formats
:
1522 format_message
= qemu_pipe("-drive", "format=help")
1523 line
= 1 if read_only
else 0
1524 supported_formats
.formats
[read_only
] = \
1525 format_message
.splitlines()[line
].split(":")[1].split()
1527 return supported_formats
.formats
[read_only
]
1529 def skip_if_unsupported(required_formats
=(), read_only
=False):
1530 '''Skip Test Decorator
1531 Runs the test if all the required formats are whitelisted'''
1532 def skip_test_decorator(func
):
1533 def func_wrapper(test_case
: QMPTestCase
, *args
: List
[Any
],
1534 **kwargs
: Dict
[str, Any
]) -> None:
1535 if callable(required_formats
):
1536 fmts
= required_formats(test_case
)
1538 fmts
= required_formats
1540 usf_list
= list(set(fmts
) - set(supported_formats(read_only
)))
1542 msg
= f
'{test_case}: formats {usf_list} are not whitelisted'
1543 test_case
.case_skip(msg
)
1545 func(test_case
, *args
, **kwargs
)
1547 return skip_test_decorator
1549 def skip_for_formats(formats
: Sequence
[str] = ()) \
1550 -> Callable
[[Callable
[[QMPTestCase
, List
[Any
], Dict
[str, Any
]], None]],
1551 Callable
[[QMPTestCase
, List
[Any
], Dict
[str, Any
]], None]]:
1552 '''Skip Test Decorator
1553 Skips the test for the given formats'''
1554 def skip_test_decorator(func
):
1555 def func_wrapper(test_case
: QMPTestCase
, *args
: List
[Any
],
1556 **kwargs
: Dict
[str, Any
]) -> None:
1557 if imgfmt
in formats
:
1558 msg
= f
'{test_case}: Skipped for format {imgfmt}'
1559 test_case
.case_skip(msg
)
1561 func(test_case
, *args
, **kwargs
)
1563 return skip_test_decorator
1565 def skip_if_user_is_root(func
):
1566 '''Skip Test Decorator
1567 Runs the test only without root permissions'''
1568 def func_wrapper(*args
, **kwargs
):
1569 if os
.getuid() == 0:
1570 case_notrun('{}: cannot be run as root'.format(args
[0]))
1573 return func(*args
, **kwargs
)
1576 # We need to filter out the time taken from the output so that
1577 # qemu-iotest can reliably diff the results against master output,
1578 # and hide skipped tests from the reference output.
1580 class ReproducibleTestResult(unittest
.TextTestResult
):
1581 def addSkip(self
, test
, reason
):
1582 # Same as TextTestResult, but print dot instead of "s"
1583 unittest
.TestResult
.addSkip(self
, test
, reason
)
1585 self
.stream
.writeln("skipped {0!r}".format(reason
))
1587 self
.stream
.write(".")
1590 class ReproducibleStreamWrapper
:
1591 def __init__(self
, stream
: TextIO
):
1592 self
.stream
= stream
1594 def __getattr__(self
, attr
):
1595 if attr
in ('stream', '__getstate__'):
1596 raise AttributeError(attr
)
1597 return getattr(self
.stream
, attr
)
1599 def write(self
, arg
=None):
1600 arg
= re
.sub(r
'Ran (\d+) tests? in [\d.]+s', r
'Ran \1 tests', arg
)
1601 arg
= re
.sub(r
' \(skipped=\d+\)', r
'', arg
)
1602 self
.stream
.write(arg
)
1604 class ReproducibleTestRunner(unittest
.TextTestRunner
):
1605 def __init__(self
, stream
: Optional
[TextIO
] = None,
1606 resultclass
: Type
[unittest
.TestResult
] =
1607 ReproducibleTestResult
,
1608 **kwargs
: Any
) -> None:
1609 rstream
= ReproducibleStreamWrapper(stream
or sys
.stdout
)
1610 super().__init
__(stream
=rstream
, # type: ignore
1612 resultclass
=resultclass
,
1615 def execute_unittest(argv
: List
[str], debug
: bool = False) -> None:
1616 """Executes unittests within the calling module."""
1618 # Some tests have warnings, especially ResourceWarnings for unclosed
1619 # files and sockets. Ignore them for now to ensure reproducibility of
1621 unittest
.main(argv
=argv
,
1622 testRunner
=ReproducibleTestRunner
,
1623 verbosity
=2 if debug
else 1,
1624 warnings
=None if sys
.warnoptions
else 'ignore')
1626 def execute_setup_common(supported_fmts
: Sequence
[str] = (),
1627 supported_platforms
: Sequence
[str] = (),
1628 supported_cache_modes
: Sequence
[str] = (),
1629 supported_aio_modes
: Sequence
[str] = (),
1630 unsupported_fmts
: Sequence
[str] = (),
1631 supported_protocols
: Sequence
[str] = (),
1632 unsupported_protocols
: Sequence
[str] = (),
1633 required_fmts
: Sequence
[str] = (),
1634 unsupported_imgopts
: Sequence
[str] = ()) -> bool:
1636 Perform necessary setup for either script-style or unittest-style tests.
1638 :return: Bool; Whether or not debug mode has been requested via the CLI.
1640 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1642 debug
= '-d' in sys
.argv
1644 sys
.argv
.remove('-d')
1645 logging
.basicConfig(level
=(logging
.DEBUG
if debug
else logging
.WARN
))
1647 _verify_image_format(supported_fmts
, unsupported_fmts
)
1648 _verify_protocol(supported_protocols
, unsupported_protocols
)
1649 _verify_platform(supported
=supported_platforms
)
1650 _verify_cache_mode(supported_cache_modes
)
1651 _verify_aio_mode(supported_aio_modes
)
1652 _verify_formats(required_fmts
)
1653 _verify_virtio_blk()
1654 _verify_imgopts(unsupported_imgopts
)
1658 def execute_test(*args
, test_function
=None, **kwargs
):
1659 """Run either unittest or script-style tests."""
1661 debug
= execute_setup_common(*args
, **kwargs
)
1662 if not test_function
:
1663 execute_unittest(sys
.argv
, debug
)
1667 def activate_logging():
1668 """Activate iotests.log() output to stdout for script-style tests."""
1669 handler
= logging
.StreamHandler(stream
=sys
.stdout
)
1670 formatter
= logging
.Formatter('%(message)s')
1671 handler
.setFormatter(formatter
)
1672 test_logger
.addHandler(handler
)
1673 test_logger
.setLevel(logging
.INFO
)
1674 test_logger
.propagate
= False
1676 # This is called from script-style iotests without a single point of entry
1677 def script_initialize(*args
, **kwargs
):
1678 """Initialize script-style tests without running any tests."""
1680 execute_setup_common(*args
, **kwargs
)
1682 # This is called from script-style iotests with a single point of entry
1683 def script_main(test_function
, *args
, **kwargs
):
1684 """Run script-style tests outside of the unittest framework"""
1686 execute_test(*args
, test_function
=test_function
, **kwargs
)
1688 # This is called from unittest style iotests
1689 def main(*args
, **kwargs
):
1690 """Run tests using the unittest framework"""
1691 execute_test(*args
, **kwargs
)