qapi/parser: enable pylint checks
[qemu/armbru.git] / tests / qemu-iotests / iotests.py
blobce06cf5630489a3e86b232a0bf48c8bce684ce36
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 import bz2
21 from collections import OrderedDict
22 import faulthandler
23 import json
24 import logging
25 import os
26 import re
27 import shutil
28 import signal
29 import struct
30 import subprocess
31 import sys
32 import time
33 from typing import (Any, Callable, Dict, Iterable,
34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35 import unittest
37 from contextlib import contextmanager
39 # pylint: disable=import-error, wrong-import-position
40 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
41 from qemu.machine import qtest
42 from qemu.qmp import QMPMessage
44 # Use this logger for logging messages directly from the iotests module
45 logger = logging.getLogger('qemu.iotests')
46 logger.addHandler(logging.NullHandler())
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger = logging.getLogger('qemu.iotests.diff_io')
52 faulthandler.enable()
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os.environ.get('QEMU_IMG_OPTIONS'):
58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61 if os.environ.get('QEMU_IO_OPTIONS'):
62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66 qemu_io_args_no_fmt += \
67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args = [qemu_nbd_prog]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
77 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
78 qemu_gdb = []
79 if gdb_qemu_env:
80 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
82 qemu_print = os.environ.get('PRINT_QEMU', False)
84 imgfmt = os.environ.get('IMGFMT', 'raw')
85 imgproto = os.environ.get('IMGPROTO', 'file')
86 output_dir = os.environ.get('OUTPUT_DIR', '.')
88 try:
89 test_dir = os.environ['TEST_DIR']
90 sock_dir = os.environ['SOCK_DIR']
91 cachemode = os.environ['CACHEMODE']
92 aiomode = os.environ['AIOMODE']
93 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
94 except KeyError:
95 # We are using these variables as proxies to indicate that we're
96 # not being run via "check". There may be other things set up by
97 # "check" that individual test cases rely on.
98 sys.stderr.write('Please run this test via the "check" script\n')
99 sys.exit(os.EX_USAGE)
101 qemu_valgrind = []
102 if os.environ.get('VALGRIND_QEMU') == "y" and \
103 os.environ.get('NO_VALGRIND') != "y":
104 valgrind_logfile = "--log-file=" + test_dir
105 # %p allows to put the valgrind process PID, since
106 # we don't know it a priori (subprocess.Popen is
107 # not yet invoked)
108 valgrind_logfile += "/%p.valgrind"
110 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
114 luks_default_secret_object = 'secret,id=keysec0,data=' + \
115 os.environ.get('IMGKEYSECRET', '')
116 luks_default_key_secret_opt = 'key-secret=keysec0'
118 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
121 def unarchive_sample_image(sample, fname):
122 sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
123 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
124 shutil.copyfileobj(f_in, f_out)
127 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
128 connect_stderr: bool = True) -> Tuple[str, int]:
130 Run a tool and return both its output and its exit code
132 stderr = subprocess.STDOUT if connect_stderr else None
133 with subprocess.Popen(args, stdout=subprocess.PIPE,
134 stderr=stderr, universal_newlines=True) as subp:
135 output = subp.communicate()[0]
136 if subp.returncode < 0:
137 cmd = ' '.join(args)
138 sys.stderr.write(f'{tool} received signal \
139 {-subp.returncode}: {cmd}\n')
140 return (output, subp.returncode)
142 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
144 Run qemu-img and return both its output and its exit code
146 full_args = qemu_img_args + list(args)
147 return qemu_tool_pipe_and_status('qemu-img', full_args)
149 def qemu_img(*args: str) -> int:
150 '''Run qemu-img and return the exit code'''
151 return qemu_img_pipe_and_status(*args)[1]
153 def ordered_qmp(qmsg, conv_keys=True):
154 # Dictionaries are not ordered prior to 3.6, therefore:
155 if isinstance(qmsg, list):
156 return [ordered_qmp(atom) for atom in qmsg]
157 if isinstance(qmsg, dict):
158 od = OrderedDict()
159 for k, v in sorted(qmsg.items()):
160 if conv_keys:
161 k = k.replace('_', '-')
162 od[k] = ordered_qmp(v, conv_keys=False)
163 return od
164 return qmsg
166 def qemu_img_create(*args):
167 args = list(args)
169 # default luks support
170 if '-f' in args and args[args.index('-f') + 1] == 'luks':
171 if '-o' in args:
172 i = args.index('-o')
173 if 'key-secret' not in args[i + 1]:
174 args[i + 1].append(luks_default_key_secret_opt)
175 args.insert(i + 2, '--object')
176 args.insert(i + 3, luks_default_secret_object)
177 else:
178 args = ['-o', luks_default_key_secret_opt,
179 '--object', luks_default_secret_object] + args
181 args.insert(0, 'create')
183 return qemu_img(*args)
185 def qemu_img_measure(*args):
186 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
188 def qemu_img_check(*args):
189 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
191 def qemu_img_verbose(*args):
192 '''Run qemu-img without suppressing its output and return the exit code'''
193 exitcode = subprocess.call(qemu_img_args + list(args))
194 if exitcode < 0:
195 sys.stderr.write('qemu-img received signal %i: %s\n'
196 % (-exitcode, ' '.join(qemu_img_args + list(args))))
197 return exitcode
199 def qemu_img_pipe(*args: str) -> str:
200 '''Run qemu-img and return its output'''
201 return qemu_img_pipe_and_status(*args)[0]
203 def qemu_img_log(*args):
204 result = qemu_img_pipe(*args)
205 log(result, filters=[filter_testfiles])
206 return result
208 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
209 args = ['info']
210 if imgopts:
211 args.append('--image-opts')
212 else:
213 args += ['-f', imgfmt]
214 args += extra_args
215 args.append(filename)
217 output = qemu_img_pipe(*args)
218 if not filter_path:
219 filter_path = filename
220 log(filter_img_info(output, filter_path))
222 def qemu_io(*args):
223 '''Run qemu-io and return the stdout data'''
224 args = qemu_io_args + list(args)
225 return qemu_tool_pipe_and_status('qemu-io', args)[0]
227 def qemu_io_log(*args):
228 result = qemu_io(*args)
229 log(result, filters=[filter_testfiles, filter_qemu_io])
230 return result
232 def qemu_io_silent(*args):
233 '''Run qemu-io and return the exit code, suppressing stdout'''
234 if '-f' in args or '--image-opts' in args:
235 default_args = qemu_io_args_no_fmt
236 else:
237 default_args = qemu_io_args
239 args = default_args + list(args)
240 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
241 if result.returncode < 0:
242 sys.stderr.write('qemu-io received signal %i: %s\n' %
243 (-result.returncode, ' '.join(args)))
244 return result.returncode
246 def qemu_io_silent_check(*args):
247 '''Run qemu-io and return the true if subprocess returned 0'''
248 args = qemu_io_args + list(args)
249 result = subprocess.run(args, stdout=subprocess.DEVNULL,
250 stderr=subprocess.STDOUT, check=False)
251 return result.returncode == 0
253 class QemuIoInteractive:
254 def __init__(self, *args):
255 self.args = qemu_io_args_no_fmt + list(args)
256 # We need to keep the Popen objext around, and not
257 # close it immediately. Therefore, disable the pylint check:
258 # pylint: disable=consider-using-with
259 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
260 stdout=subprocess.PIPE,
261 stderr=subprocess.STDOUT,
262 universal_newlines=True)
263 out = self._p.stdout.read(9)
264 if out != 'qemu-io> ':
265 # Most probably qemu-io just failed to start.
266 # Let's collect the whole output and exit.
267 out += self._p.stdout.read()
268 self._p.wait(timeout=1)
269 raise ValueError(out)
271 def close(self):
272 self._p.communicate('q\n')
274 def _read_output(self):
275 pattern = 'qemu-io> '
276 n = len(pattern)
277 pos = 0
278 s = []
279 while pos != n:
280 c = self._p.stdout.read(1)
281 # check unexpected EOF
282 assert c != ''
283 s.append(c)
284 if c == pattern[pos]:
285 pos += 1
286 else:
287 pos = 0
289 return ''.join(s[:-n])
291 def cmd(self, cmd):
292 # quit command is in close(), '\n' is added automatically
293 assert '\n' not in cmd
294 cmd = cmd.strip()
295 assert cmd not in ('q', 'quit')
296 self._p.stdin.write(cmd + '\n')
297 self._p.stdin.flush()
298 return self._read_output()
301 def qemu_nbd(*args):
302 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
303 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
305 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
306 '''Run qemu-nbd in daemon mode and return both the parent's exit code
307 and its output in case of an error'''
308 full_args = qemu_nbd_args + ['--fork'] + list(args)
309 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
310 connect_stderr=False)
311 return returncode, output if returncode else ''
313 def qemu_nbd_list_log(*args: str) -> str:
314 '''Run qemu-nbd to list remote exports'''
315 full_args = [qemu_nbd_prog, '-L'] + list(args)
316 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
317 log(output, filters=[filter_testfiles, filter_nbd_exports])
318 return output
320 @contextmanager
321 def qemu_nbd_popen(*args):
322 '''Context manager running qemu-nbd within the context'''
323 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
325 assert not os.path.exists(pid_file)
327 cmd = list(qemu_nbd_args)
328 cmd.extend(('--persistent', '--pid-file', pid_file))
329 cmd.extend(args)
331 log('Start NBD server')
332 with subprocess.Popen(cmd) as p:
333 try:
334 while not os.path.exists(pid_file):
335 if p.poll() is not None:
336 raise RuntimeError(
337 "qemu-nbd terminated with exit code {}: {}"
338 .format(p.returncode, ' '.join(cmd)))
340 time.sleep(0.01)
341 yield
342 finally:
343 if os.path.exists(pid_file):
344 os.remove(pid_file)
345 log('Kill NBD server')
346 p.kill()
347 p.wait()
349 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
350 '''Return True if two image files are identical'''
351 return qemu_img('compare', '-f', fmt1,
352 '-F', fmt2, img1, img2) == 0
354 def create_image(name, size):
355 '''Create a fully-allocated raw image with sector markers'''
356 with open(name, 'wb') as file:
357 i = 0
358 while i < size:
359 sector = struct.pack('>l504xl', i // 512, i // 512)
360 file.write(sector)
361 i = i + 512
363 def image_size(img):
364 '''Return image's virtual size'''
365 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
366 return json.loads(r)['virtual-size']
368 def is_str(val):
369 return isinstance(val, str)
371 test_dir_re = re.compile(r"%s" % test_dir)
372 def filter_test_dir(msg):
373 return test_dir_re.sub("TEST_DIR", msg)
375 win32_re = re.compile(r"\r")
376 def filter_win32(msg):
377 return win32_re.sub("", msg)
379 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
380 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
381 r"and [0-9\/.inf]* ops\/sec\)")
382 def filter_qemu_io(msg):
383 msg = filter_win32(msg)
384 return qemu_io_re.sub("X ops; XX:XX:XX.X "
385 "(XXX YYY/sec and XXX ops/sec)", msg)
387 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
388 def filter_chown(msg):
389 return chown_re.sub("chown UID:GID", msg)
391 def filter_qmp_event(event):
392 '''Filter a QMP event dict'''
393 event = dict(event)
394 if 'timestamp' in event:
395 event['timestamp']['seconds'] = 'SECS'
396 event['timestamp']['microseconds'] = 'USECS'
397 return event
399 def filter_qmp(qmsg, filter_fn):
400 '''Given a string filter, filter a QMP object's values.
401 filter_fn takes a (key, value) pair.'''
402 # Iterate through either lists or dicts;
403 if isinstance(qmsg, list):
404 items = enumerate(qmsg)
405 else:
406 items = qmsg.items()
408 for k, v in items:
409 if isinstance(v, (dict, list)):
410 qmsg[k] = filter_qmp(v, filter_fn)
411 else:
412 qmsg[k] = filter_fn(k, v)
413 return qmsg
415 def filter_testfiles(msg):
416 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
417 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
418 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
420 def filter_qmp_testfiles(qmsg):
421 def _filter(_key, value):
422 if is_str(value):
423 return filter_testfiles(value)
424 return value
425 return filter_qmp(qmsg, _filter)
427 def filter_virtio_scsi(output: str) -> str:
428 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
430 def filter_qmp_virtio_scsi(qmsg):
431 def _filter(_key, value):
432 if is_str(value):
433 return filter_virtio_scsi(value)
434 return value
435 return filter_qmp(qmsg, _filter)
437 def filter_generated_node_ids(msg):
438 return re.sub("#block[0-9]+", "NODE_NAME", msg)
440 def filter_img_info(output, filename):
441 lines = []
442 for line in output.split('\n'):
443 if 'disk size' in line or 'actual-size' in line:
444 continue
445 line = line.replace(filename, 'TEST_IMG')
446 line = filter_testfiles(line)
447 line = line.replace(imgfmt, 'IMGFMT')
448 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
449 line = re.sub('uuid: [-a-f0-9]+',
450 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
451 line)
452 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
453 lines.append(line)
454 return '\n'.join(lines)
456 def filter_imgfmt(msg):
457 return msg.replace(imgfmt, 'IMGFMT')
459 def filter_qmp_imgfmt(qmsg):
460 def _filter(_key, value):
461 if is_str(value):
462 return filter_imgfmt(value)
463 return value
464 return filter_qmp(qmsg, _filter)
466 def filter_nbd_exports(output: str) -> str:
467 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
470 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
472 def log(msg: Msg,
473 filters: Iterable[Callable[[Msg], Msg]] = (),
474 indent: Optional[int] = None) -> None:
476 Logs either a string message or a JSON serializable message (like QMP).
477 If indent is provided, JSON serializable messages are pretty-printed.
479 for flt in filters:
480 msg = flt(msg)
481 if isinstance(msg, (dict, list)):
482 # Don't sort if it's already sorted
483 do_sort = not isinstance(msg, OrderedDict)
484 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
485 else:
486 test_logger.info(msg)
488 class Timeout:
489 def __init__(self, seconds, errmsg="Timeout"):
490 self.seconds = seconds
491 self.errmsg = errmsg
492 def __enter__(self):
493 if qemu_gdb or qemu_valgrind:
494 return self
495 signal.signal(signal.SIGALRM, self.timeout)
496 signal.setitimer(signal.ITIMER_REAL, self.seconds)
497 return self
498 def __exit__(self, exc_type, value, traceback):
499 if qemu_gdb or qemu_valgrind:
500 return False
501 signal.setitimer(signal.ITIMER_REAL, 0)
502 return False
503 def timeout(self, signum, frame):
504 raise Exception(self.errmsg)
506 def file_pattern(name):
507 return "{0}-{1}".format(os.getpid(), name)
509 class FilePath:
511 Context manager generating multiple file names. The generated files are
512 removed when exiting the context.
514 Example usage:
516 with FilePath('a.img', 'b.img') as (img_a, img_b):
517 # Use img_a and img_b here...
519 # a.img and b.img are automatically removed here.
521 By default images are created in iotests.test_dir. To create sockets use
522 iotests.sock_dir:
524 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
526 For convenience, calling with one argument yields a single file instead of
527 a tuple with one item.
530 def __init__(self, *names, base_dir=test_dir):
531 self.paths = [os.path.join(base_dir, file_pattern(name))
532 for name in names]
534 def __enter__(self):
535 if len(self.paths) == 1:
536 return self.paths[0]
537 else:
538 return self.paths
540 def __exit__(self, exc_type, exc_val, exc_tb):
541 for path in self.paths:
542 try:
543 os.remove(path)
544 except OSError:
545 pass
546 return False
549 def try_remove(img):
550 try:
551 os.remove(img)
552 except OSError:
553 pass
555 def file_path_remover():
556 for path in reversed(file_path_remover.paths):
557 try_remove(path)
560 def file_path(*names, base_dir=test_dir):
561 ''' Another way to get auto-generated filename that cleans itself up.
563 Use is as simple as:
565 img_a, img_b = file_path('a.img', 'b.img')
566 sock = file_path('socket')
569 if not hasattr(file_path_remover, 'paths'):
570 file_path_remover.paths = []
571 atexit.register(file_path_remover)
573 paths = []
574 for name in names:
575 filename = file_pattern(name)
576 path = os.path.join(base_dir, filename)
577 file_path_remover.paths.append(path)
578 paths.append(path)
580 return paths[0] if len(paths) == 1 else paths
582 def remote_filename(path):
583 if imgproto == 'file':
584 return path
585 elif imgproto == 'ssh':
586 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
587 else:
588 raise Exception("Protocol %s not supported" % (imgproto))
590 class VM(qtest.QEMUQtestMachine):
591 '''A QEMU VM'''
593 def __init__(self, path_suffix=''):
594 name = "qemu%s-%d" % (path_suffix, os.getpid())
595 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
596 if qemu_gdb and qemu_valgrind:
597 sys.stderr.write('gdb and valgrind are mutually exclusive\n')
598 sys.exit(1)
599 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
600 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
601 name=name,
602 base_temp_dir=test_dir,
603 socket_scm_helper=socket_scm_helper,
604 sock_dir=sock_dir, qmp_timer=timer)
605 self._num_drives = 0
607 def _post_shutdown(self) -> None:
608 super()._post_shutdown()
609 if not qemu_valgrind or not self._popen:
610 return
611 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
612 if self.exitcode() == 99:
613 with open(valgrind_filename, encoding='utf-8') as f:
614 print(f.read())
615 else:
616 os.remove(valgrind_filename)
618 def _pre_launch(self) -> None:
619 super()._pre_launch()
620 if qemu_print:
621 # set QEMU binary output to stdout
622 self._close_qemu_log_file()
624 def add_object(self, opts):
625 self._args.append('-object')
626 self._args.append(opts)
627 return self
629 def add_device(self, opts):
630 self._args.append('-device')
631 self._args.append(opts)
632 return self
634 def add_drive_raw(self, opts):
635 self._args.append('-drive')
636 self._args.append(opts)
637 return self
639 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
640 '''Add a virtio-blk drive to the VM'''
641 options = ['if=%s' % interface,
642 'id=drive%d' % self._num_drives]
644 if path is not None:
645 options.append('file=%s' % path)
646 options.append('format=%s' % img_format)
647 options.append('cache=%s' % cachemode)
648 options.append('aio=%s' % aiomode)
650 if opts:
651 options.append(opts)
653 if img_format == 'luks' and 'key-secret' not in opts:
654 # default luks support
655 if luks_default_secret_object not in self._args:
656 self.add_object(luks_default_secret_object)
658 options.append(luks_default_key_secret_opt)
660 self._args.append('-drive')
661 self._args.append(','.join(options))
662 self._num_drives += 1
663 return self
665 def add_blockdev(self, opts):
666 self._args.append('-blockdev')
667 if isinstance(opts, str):
668 self._args.append(opts)
669 else:
670 self._args.append(','.join(opts))
671 return self
673 def add_incoming(self, addr):
674 self._args.append('-incoming')
675 self._args.append(addr)
676 return self
678 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
679 cmd = 'human-monitor-command'
680 kwargs: Dict[str, Any] = {'command-line': command_line}
681 if use_log:
682 return self.qmp_log(cmd, **kwargs)
683 else:
684 return self.qmp(cmd, **kwargs)
686 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
687 """Pause drive r/w operations"""
688 if not event:
689 self.pause_drive(drive, "read_aio")
690 self.pause_drive(drive, "write_aio")
691 return
692 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
694 def resume_drive(self, drive: str) -> None:
695 """Resume drive r/w operations"""
696 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
698 def hmp_qemu_io(self, drive: str, cmd: str,
699 use_log: bool = False, qdev: bool = False) -> QMPMessage:
700 """Write to a given drive using an HMP command"""
701 d = '-d ' if qdev else ''
702 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
704 def flatten_qmp_object(self, obj, output=None, basestr=''):
705 if output is None:
706 output = {}
707 if isinstance(obj, list):
708 for i, item in enumerate(obj):
709 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
710 elif isinstance(obj, dict):
711 for key in obj:
712 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
713 else:
714 output[basestr[:-1]] = obj # Strip trailing '.'
715 return output
717 def qmp_to_opts(self, obj):
718 obj = self.flatten_qmp_object(obj)
719 output_list = []
720 for key in obj:
721 output_list += [key + '=' + obj[key]]
722 return ','.join(output_list)
724 def get_qmp_events_filtered(self, wait=60.0):
725 result = []
726 for ev in self.get_qmp_events(wait=wait):
727 result.append(filter_qmp_event(ev))
728 return result
730 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
731 full_cmd = OrderedDict((
732 ("execute", cmd),
733 ("arguments", ordered_qmp(kwargs))
735 log(full_cmd, filters, indent=indent)
736 result = self.qmp(cmd, **kwargs)
737 log(result, filters, indent=indent)
738 return result
740 # Returns None on success, and an error string on failure
741 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
742 pre_finalize=None, cancel=False, wait=60.0):
744 run_job moves a job from creation through to dismissal.
746 :param job: String. ID of recently-launched job
747 :param auto_finalize: Bool. True if the job was launched with
748 auto_finalize. Defaults to True.
749 :param auto_dismiss: Bool. True if the job was launched with
750 auto_dismiss=True. Defaults to False.
751 :param pre_finalize: Callback. A callable that takes no arguments to be
752 invoked prior to issuing job-finalize, if any.
753 :param cancel: Bool. When true, cancels the job after the pre_finalize
754 callback.
755 :param wait: Float. Timeout value specifying how long to wait for any
756 event, in seconds. Defaults to 60.0.
758 match_device = {'data': {'device': job}}
759 match_id = {'data': {'id': job}}
760 events = [
761 ('BLOCK_JOB_COMPLETED', match_device),
762 ('BLOCK_JOB_CANCELLED', match_device),
763 ('BLOCK_JOB_ERROR', match_device),
764 ('BLOCK_JOB_READY', match_device),
765 ('BLOCK_JOB_PENDING', match_id),
766 ('JOB_STATUS_CHANGE', match_id)
768 error = None
769 while True:
770 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
771 if ev['event'] != 'JOB_STATUS_CHANGE':
772 log(ev)
773 continue
774 status = ev['data']['status']
775 if status == 'aborting':
776 result = self.qmp('query-jobs')
777 for j in result['return']:
778 if j['id'] == job:
779 error = j['error']
780 log('Job failed: %s' % (j['error']))
781 elif status == 'ready':
782 self.qmp_log('job-complete', id=job)
783 elif status == 'pending' and not auto_finalize:
784 if pre_finalize:
785 pre_finalize()
786 if cancel:
787 self.qmp_log('job-cancel', id=job)
788 else:
789 self.qmp_log('job-finalize', id=job)
790 elif status == 'concluded' and not auto_dismiss:
791 self.qmp_log('job-dismiss', id=job)
792 elif status == 'null':
793 return error
795 # Returns None on success, and an error string on failure
796 def blockdev_create(self, options, job_id='job0', filters=None):
797 if filters is None:
798 filters = [filter_qmp_testfiles]
799 result = self.qmp_log('blockdev-create', filters=filters,
800 job_id=job_id, options=options)
802 if 'return' in result:
803 assert result['return'] == {}
804 job_result = self.run_job(job_id)
805 else:
806 job_result = result['error']
808 log("")
809 return job_result
811 def enable_migration_events(self, name):
812 log('Enabling migration QMP events on %s...' % name)
813 log(self.qmp('migrate-set-capabilities', capabilities=[
815 'capability': 'events',
816 'state': True
820 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
821 while True:
822 event = self.event_wait('MIGRATION')
823 # We use the default timeout, and with a timeout, event_wait()
824 # never returns None
825 assert event
827 log(event, filters=[filter_qmp_event])
828 if event['data']['status'] in ('completed', 'failed'):
829 break
831 if event['data']['status'] == 'completed':
832 # The event may occur in finish-migrate, so wait for the expected
833 # post-migration runstate
834 runstate = None
835 while runstate != expect_runstate:
836 runstate = self.qmp('query-status')['return']['status']
837 return True
838 else:
839 return False
841 def node_info(self, node_name):
842 nodes = self.qmp('query-named-block-nodes')
843 for x in nodes['return']:
844 if x['node-name'] == node_name:
845 return x
846 return None
848 def query_bitmaps(self):
849 res = self.qmp("query-named-block-nodes")
850 return {device['node-name']: device['dirty-bitmaps']
851 for device in res['return'] if 'dirty-bitmaps' in device}
853 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
855 get a specific bitmap from the object returned by query_bitmaps.
856 :param recording: If specified, filter results by the specified value.
857 :param bitmaps: If specified, use it instead of call query_bitmaps()
859 if bitmaps is None:
860 bitmaps = self.query_bitmaps()
862 for bitmap in bitmaps[node_name]:
863 if bitmap.get('name', '') == bitmap_name:
864 if recording is None or bitmap.get('recording') == recording:
865 return bitmap
866 return None
868 def check_bitmap_status(self, node_name, bitmap_name, fields):
869 ret = self.get_bitmap(node_name, bitmap_name)
871 return fields.items() <= ret.items()
873 def assert_block_path(self, root, path, expected_node, graph=None):
875 Check whether the node under the given path in the block graph
876 is @expected_node.
878 @root is the node name of the node where the @path is rooted.
880 @path is a string that consists of child names separated by
881 slashes. It must begin with a slash.
883 Examples for @root + @path:
884 - root="qcow2-node", path="/backing/file"
885 - root="quorum-node", path="/children.2/file"
887 Hypothetically, @path could be empty, in which case it would
888 point to @root. However, in practice this case is not useful
889 and hence not allowed.
891 @expected_node may be None. (All elements of the path but the
892 leaf must still exist.)
894 @graph may be None or the result of an x-debug-query-block-graph
895 call that has already been performed.
897 if graph is None:
898 graph = self.qmp('x-debug-query-block-graph')['return']
900 iter_path = iter(path.split('/'))
902 # Must start with a /
903 assert next(iter_path) == ''
905 node = next((node for node in graph['nodes'] if node['name'] == root),
906 None)
908 # An empty @path is not allowed, so the root node must be present
909 assert node is not None, 'Root node %s not found' % root
911 for child_name in iter_path:
912 assert node is not None, 'Cannot follow path %s%s' % (root, path)
914 try:
915 node_id = next(edge['child'] for edge in graph['edges']
916 if (edge['parent'] == node['id'] and
917 edge['name'] == child_name))
919 node = next(node for node in graph['nodes']
920 if node['id'] == node_id)
922 except StopIteration:
923 node = None
925 if node is None:
926 assert expected_node is None, \
927 'No node found under %s (but expected %s)' % \
928 (path, expected_node)
929 else:
930 assert node['name'] == expected_node, \
931 'Found node %s under %s (but expected %s)' % \
932 (node['name'], path, expected_node)
934 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
936 class QMPTestCase(unittest.TestCase):
937 '''Abstract base class for QMP test cases'''
939 def __init__(self, *args, **kwargs):
940 super().__init__(*args, **kwargs)
941 # Many users of this class set a VM property we rely on heavily
942 # in the methods below.
943 self.vm = None
945 def dictpath(self, d, path):
946 '''Traverse a path in a nested dict'''
947 for component in path.split('/'):
948 m = index_re.match(component)
949 if m:
950 component, idx = m.groups()
951 idx = int(idx)
953 if not isinstance(d, dict) or component not in d:
954 self.fail(f'failed path traversal for "{path}" in "{d}"')
955 d = d[component]
957 if m:
958 if not isinstance(d, list):
959 self.fail(f'path component "{component}" in "{path}" '
960 f'is not a list in "{d}"')
961 try:
962 d = d[idx]
963 except IndexError:
964 self.fail(f'invalid index "{idx}" in path "{path}" '
965 f'in "{d}"')
966 return d
968 def assert_qmp_absent(self, d, path):
969 try:
970 result = self.dictpath(d, path)
971 except AssertionError:
972 return
973 self.fail('path "%s" has value "%s"' % (path, str(result)))
975 def assert_qmp(self, d, path, value):
976 '''Assert that the value for a specific path in a QMP dict
977 matches. When given a list of values, assert that any of
978 them matches.'''
980 result = self.dictpath(d, path)
982 # [] makes no sense as a list of valid values, so treat it as
983 # an actual single value.
984 if isinstance(value, list) and value != []:
985 for v in value:
986 if result == v:
987 return
988 self.fail('no match for "%s" in %s' % (str(result), str(value)))
989 else:
990 self.assertEqual(result, value,
991 '"%s" is "%s", expected "%s"'
992 % (path, str(result), str(value)))
994 def assert_no_active_block_jobs(self):
995 result = self.vm.qmp('query-block-jobs')
996 self.assert_qmp(result, 'return', [])
998 def assert_has_block_node(self, node_name=None, file_name=None):
999 """Issue a query-named-block-nodes and assert node_name and/or
1000 file_name is present in the result"""
1001 def check_equal_or_none(a, b):
1002 return a is None or b is None or a == b
1003 assert node_name or file_name
1004 result = self.vm.qmp('query-named-block-nodes')
1005 for x in result["return"]:
1006 if check_equal_or_none(x.get("node-name"), node_name) and \
1007 check_equal_or_none(x.get("file"), file_name):
1008 return
1009 self.fail("Cannot find %s %s in result:\n%s" %
1010 (node_name, file_name, result))
1012 def assert_json_filename_equal(self, json_filename, reference):
1013 '''Asserts that the given filename is a json: filename and that its
1014 content is equal to the given reference object'''
1015 self.assertEqual(json_filename[:5], 'json:')
1016 self.assertEqual(
1017 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1018 self.vm.flatten_qmp_object(reference)
1021 def cancel_and_wait(self, drive='drive0', force=False,
1022 resume=False, wait=60.0):
1023 '''Cancel a block job and wait for it to finish, returning the event'''
1024 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1025 self.assert_qmp(result, 'return', {})
1027 if resume:
1028 self.vm.resume_drive(drive)
1030 cancelled = False
1031 result = None
1032 while not cancelled:
1033 for event in self.vm.get_qmp_events(wait=wait):
1034 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1035 event['event'] == 'BLOCK_JOB_CANCELLED':
1036 self.assert_qmp(event, 'data/device', drive)
1037 result = event
1038 cancelled = True
1039 elif event['event'] == 'JOB_STATUS_CHANGE':
1040 self.assert_qmp(event, 'data/id', drive)
1043 self.assert_no_active_block_jobs()
1044 return result
1046 def wait_until_completed(self, drive='drive0', check_offset=True,
1047 wait=60.0, error=None):
1048 '''Wait for a block job to finish, returning the event'''
1049 while True:
1050 for event in self.vm.get_qmp_events(wait=wait):
1051 if event['event'] == 'BLOCK_JOB_COMPLETED':
1052 self.assert_qmp(event, 'data/device', drive)
1053 if error is None:
1054 self.assert_qmp_absent(event, 'data/error')
1055 if check_offset:
1056 self.assert_qmp(event, 'data/offset',
1057 event['data']['len'])
1058 else:
1059 self.assert_qmp(event, 'data/error', error)
1060 self.assert_no_active_block_jobs()
1061 return event
1062 if event['event'] == 'JOB_STATUS_CHANGE':
1063 self.assert_qmp(event, 'data/id', drive)
1065 def wait_ready(self, drive='drive0'):
1066 """Wait until a BLOCK_JOB_READY event, and return the event."""
1067 return self.vm.events_wait([
1068 ('BLOCK_JOB_READY',
1069 {'data': {'type': 'mirror', 'device': drive}}),
1070 ('BLOCK_JOB_READY',
1071 {'data': {'type': 'commit', 'device': drive}})
1074 def wait_ready_and_cancel(self, drive='drive0'):
1075 self.wait_ready(drive=drive)
1076 event = self.cancel_and_wait(drive=drive)
1077 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1078 self.assert_qmp(event, 'data/type', 'mirror')
1079 self.assert_qmp(event, 'data/offset', event['data']['len'])
1081 def complete_and_wait(self, drive='drive0', wait_ready=True,
1082 completion_error=None):
1083 '''Complete a block job and wait for it to finish'''
1084 if wait_ready:
1085 self.wait_ready(drive=drive)
1087 result = self.vm.qmp('block-job-complete', device=drive)
1088 self.assert_qmp(result, 'return', {})
1090 event = self.wait_until_completed(drive=drive, error=completion_error)
1091 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1093 def pause_wait(self, job_id='job0'):
1094 with Timeout(3, "Timeout waiting for job to pause"):
1095 while True:
1096 result = self.vm.qmp('query-block-jobs')
1097 found = False
1098 for job in result['return']:
1099 if job['device'] == job_id:
1100 found = True
1101 if job['paused'] and not job['busy']:
1102 return job
1103 break
1104 assert found
1106 def pause_job(self, job_id='job0', wait=True):
1107 result = self.vm.qmp('block-job-pause', device=job_id)
1108 self.assert_qmp(result, 'return', {})
1109 if wait:
1110 return self.pause_wait(job_id)
1111 return result
1113 def case_skip(self, reason):
1114 '''Skip this test case'''
1115 case_notrun(reason)
1116 self.skipTest(reason)
1119 def notrun(reason):
1120 '''Skip this test suite'''
1121 # Each test in qemu-iotests has a number ("seq")
1122 seq = os.path.basename(sys.argv[0])
1124 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1125 as outfile:
1126 outfile.write(reason + '\n')
1127 logger.warning("%s not run: %s", seq, reason)
1128 sys.exit(0)
1130 def case_notrun(reason):
1131 '''Mark this test case as not having been run (without actually
1132 skipping it, that is left to the caller). See
1133 QMPTestCase.case_skip() for a variant that actually skips the
1134 current test case.'''
1136 # Each test in qemu-iotests has a number ("seq")
1137 seq = os.path.basename(sys.argv[0])
1139 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1140 as outfile:
1141 outfile.write(' [case not run] ' + reason + '\n')
1143 def _verify_image_format(supported_fmts: Sequence[str] = (),
1144 unsupported_fmts: Sequence[str] = ()) -> None:
1145 if 'generic' in supported_fmts and \
1146 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1147 # similar to
1148 # _supported_fmt generic
1149 # for bash tests
1150 supported_fmts = ()
1152 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1153 if not_sup or (imgfmt in unsupported_fmts):
1154 notrun('not suitable for this image format: %s' % imgfmt)
1156 if imgfmt == 'luks':
1157 verify_working_luks()
1159 def _verify_protocol(supported: Sequence[str] = (),
1160 unsupported: Sequence[str] = ()) -> None:
1161 assert not (supported and unsupported)
1163 if 'generic' in supported:
1164 return
1166 not_sup = supported and (imgproto not in supported)
1167 if not_sup or (imgproto in unsupported):
1168 notrun('not suitable for this protocol: %s' % imgproto)
1170 def _verify_platform(supported: Sequence[str] = (),
1171 unsupported: Sequence[str] = ()) -> None:
1172 if any((sys.platform.startswith(x) for x in unsupported)):
1173 notrun('not suitable for this OS: %s' % sys.platform)
1175 if supported:
1176 if not any((sys.platform.startswith(x) for x in supported)):
1177 notrun('not suitable for this OS: %s' % sys.platform)
1179 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1180 if supported_cache_modes and (cachemode not in supported_cache_modes):
1181 notrun('not suitable for this cache mode: %s' % cachemode)
1183 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1184 if supported_aio_modes and (aiomode not in supported_aio_modes):
1185 notrun('not suitable for this aio mode: %s' % aiomode)
1187 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1188 usf_list = list(set(required_formats) - set(supported_formats()))
1189 if usf_list:
1190 notrun(f'formats {usf_list} are not whitelisted')
1193 def _verify_virtio_blk() -> None:
1194 out = qemu_pipe('-M', 'none', '-device', 'help')
1195 if 'virtio-blk' not in out:
1196 notrun('Missing virtio-blk in QEMU binary')
1198 def _verify_virtio_scsi_pci_or_ccw() -> None:
1199 out = qemu_pipe('-M', 'none', '-device', 'help')
1200 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1201 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1204 def supports_quorum():
1205 return 'quorum' in qemu_img_pipe('--help')
1207 def verify_quorum():
1208 '''Skip test suite if quorum support is not available'''
1209 if not supports_quorum():
1210 notrun('quorum support missing')
1212 def has_working_luks() -> Tuple[bool, str]:
1214 Check whether our LUKS driver can actually create images
1215 (this extends to LUKS encryption for qcow2).
1217 If not, return the reason why.
1220 img_file = f'{test_dir}/luks-test.luks'
1221 (output, status) = \
1222 qemu_img_pipe_and_status('create', '-f', 'luks',
1223 '--object', luks_default_secret_object,
1224 '-o', luks_default_key_secret_opt,
1225 '-o', 'iter-time=10',
1226 img_file, '1G')
1227 try:
1228 os.remove(img_file)
1229 except OSError:
1230 pass
1232 if status != 0:
1233 reason = output
1234 for line in output.splitlines():
1235 if img_file + ':' in line:
1236 reason = line.split(img_file + ':', 1)[1].strip()
1237 break
1239 return (False, reason)
1240 else:
1241 return (True, '')
1243 def verify_working_luks():
1245 Skip test suite if LUKS does not work
1247 (working, reason) = has_working_luks()
1248 if not working:
1249 notrun(reason)
1251 def qemu_pipe(*args: str) -> str:
1253 Run qemu with an option to print something and exit (e.g. a help option).
1255 :return: QEMU's stdout output.
1257 full_args = [qemu_prog] + qemu_opts + list(args)
1258 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1259 return output
1261 def supported_formats(read_only=False):
1262 '''Set 'read_only' to True to check ro-whitelist
1263 Otherwise, rw-whitelist is checked'''
1265 if not hasattr(supported_formats, "formats"):
1266 supported_formats.formats = {}
1268 if read_only not in supported_formats.formats:
1269 format_message = qemu_pipe("-drive", "format=help")
1270 line = 1 if read_only else 0
1271 supported_formats.formats[read_only] = \
1272 format_message.splitlines()[line].split(":")[1].split()
1274 return supported_formats.formats[read_only]
1276 def skip_if_unsupported(required_formats=(), read_only=False):
1277 '''Skip Test Decorator
1278 Runs the test if all the required formats are whitelisted'''
1279 def skip_test_decorator(func):
1280 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1281 **kwargs: Dict[str, Any]) -> None:
1282 if callable(required_formats):
1283 fmts = required_formats(test_case)
1284 else:
1285 fmts = required_formats
1287 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1288 if usf_list:
1289 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1290 test_case.case_skip(msg)
1291 else:
1292 func(test_case, *args, **kwargs)
1293 return func_wrapper
1294 return skip_test_decorator
1296 def skip_for_formats(formats: Sequence[str] = ()) \
1297 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1298 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1299 '''Skip Test Decorator
1300 Skips the test for the given formats'''
1301 def skip_test_decorator(func):
1302 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1303 **kwargs: Dict[str, Any]) -> None:
1304 if imgfmt in formats:
1305 msg = f'{test_case}: Skipped for format {imgfmt}'
1306 test_case.case_skip(msg)
1307 else:
1308 func(test_case, *args, **kwargs)
1309 return func_wrapper
1310 return skip_test_decorator
1312 def skip_if_user_is_root(func):
1313 '''Skip Test Decorator
1314 Runs the test only without root permissions'''
1315 def func_wrapper(*args, **kwargs):
1316 if os.getuid() == 0:
1317 case_notrun('{}: cannot be run as root'.format(args[0]))
1318 return None
1319 else:
1320 return func(*args, **kwargs)
1321 return func_wrapper
1323 # We need to filter out the time taken from the output so that
1324 # qemu-iotest can reliably diff the results against master output,
1325 # and hide skipped tests from the reference output.
1327 class ReproducibleTestResult(unittest.TextTestResult):
1328 def addSkip(self, test, reason):
1329 # Same as TextTestResult, but print dot instead of "s"
1330 unittest.TestResult.addSkip(self, test, reason)
1331 if self.showAll:
1332 self.stream.writeln("skipped {0!r}".format(reason))
1333 elif self.dots:
1334 self.stream.write(".")
1335 self.stream.flush()
1337 class ReproducibleStreamWrapper:
1338 def __init__(self, stream: TextIO):
1339 self.stream = stream
1341 def __getattr__(self, attr):
1342 if attr in ('stream', '__getstate__'):
1343 raise AttributeError(attr)
1344 return getattr(self.stream, attr)
1346 def write(self, arg=None):
1347 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1348 arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1349 self.stream.write(arg)
1351 class ReproducibleTestRunner(unittest.TextTestRunner):
1352 def __init__(self, stream: Optional[TextIO] = None,
1353 resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1354 **kwargs: Any) -> None:
1355 rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1356 super().__init__(stream=rstream, # type: ignore
1357 descriptions=True,
1358 resultclass=resultclass,
1359 **kwargs)
1361 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1362 """Executes unittests within the calling module."""
1364 # Some tests have warnings, especially ResourceWarnings for unclosed
1365 # files and sockets. Ignore them for now to ensure reproducibility of
1366 # the test output.
1367 unittest.main(argv=argv,
1368 testRunner=ReproducibleTestRunner,
1369 verbosity=2 if debug else 1,
1370 warnings=None if sys.warnoptions else 'ignore')
1372 def execute_setup_common(supported_fmts: Sequence[str] = (),
1373 supported_platforms: Sequence[str] = (),
1374 supported_cache_modes: Sequence[str] = (),
1375 supported_aio_modes: Sequence[str] = (),
1376 unsupported_fmts: Sequence[str] = (),
1377 supported_protocols: Sequence[str] = (),
1378 unsupported_protocols: Sequence[str] = (),
1379 required_fmts: Sequence[str] = ()) -> bool:
1381 Perform necessary setup for either script-style or unittest-style tests.
1383 :return: Bool; Whether or not debug mode has been requested via the CLI.
1385 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1387 debug = '-d' in sys.argv
1388 if debug:
1389 sys.argv.remove('-d')
1390 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1392 _verify_image_format(supported_fmts, unsupported_fmts)
1393 _verify_protocol(supported_protocols, unsupported_protocols)
1394 _verify_platform(supported=supported_platforms)
1395 _verify_cache_mode(supported_cache_modes)
1396 _verify_aio_mode(supported_aio_modes)
1397 _verify_formats(required_fmts)
1398 _verify_virtio_blk()
1400 return debug
1402 def execute_test(*args, test_function=None, **kwargs):
1403 """Run either unittest or script-style tests."""
1405 debug = execute_setup_common(*args, **kwargs)
1406 if not test_function:
1407 execute_unittest(sys.argv, debug)
1408 else:
1409 test_function()
1411 def activate_logging():
1412 """Activate iotests.log() output to stdout for script-style tests."""
1413 handler = logging.StreamHandler(stream=sys.stdout)
1414 formatter = logging.Formatter('%(message)s')
1415 handler.setFormatter(formatter)
1416 test_logger.addHandler(handler)
1417 test_logger.setLevel(logging.INFO)
1418 test_logger.propagate = False
1420 # This is called from script-style iotests without a single point of entry
1421 def script_initialize(*args, **kwargs):
1422 """Initialize script-style tests without running any tests."""
1423 activate_logging()
1424 execute_setup_common(*args, **kwargs)
1426 # This is called from script-style iotests with a single point of entry
1427 def script_main(test_function, *args, **kwargs):
1428 """Run script-style tests outside of the unittest framework"""
1429 activate_logging()
1430 execute_test(*args, test_function=test_function, **kwargs)
1432 # This is called from unittest style iotests
1433 def main(*args, **kwargs):
1434 """Run tests using the unittest framework"""
1435 execute_test(*args, **kwargs)