sd/milkymist-memcard: Fix format string
[qemu/armbru.git] / tests / qemu-iotests / iotests.py
blob3590ed78a0e4ea1f8602de63c49d8c7b6a10f707
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 from typing import (Any, Callable, Dict, Iterable,
32 List, Optional, Sequence, Tuple, TypeVar)
33 import unittest
35 # pylint: disable=import-error, wrong-import-position
36 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
37 from qemu import qtest
38 from qemu.qmp import QMPMessage
40 assert sys.version_info >= (3, 6)
42 # Use this logger for logging messages directly from the iotests module
43 logger = logging.getLogger('qemu.iotests')
44 logger.addHandler(logging.NullHandler())
46 # Use this logger for messages that ought to be used for diff output.
47 test_logger = logging.getLogger('qemu.iotests.diff_io')
50 faulthandler.enable()
52 # This will not work if arguments contain spaces but is necessary if we
53 # want to support the override options that ./check supports.
54 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
55 if os.environ.get('QEMU_IMG_OPTIONS'):
56 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
59 if os.environ.get('QEMU_IO_OPTIONS'):
60 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
63 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
64 qemu_io_args_no_fmt += \
65 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
68 if os.environ.get('QEMU_NBD_OPTIONS'):
69 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
71 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
72 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
74 imgfmt = os.environ.get('IMGFMT', 'raw')
75 imgproto = os.environ.get('IMGPROTO', 'file')
76 test_dir = os.environ.get('TEST_DIR')
77 sock_dir = os.environ.get('SOCK_DIR')
78 output_dir = os.environ.get('OUTPUT_DIR', '.')
79 cachemode = os.environ.get('CACHEMODE')
80 aiomode = os.environ.get('AIOMODE')
81 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
83 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
85 luks_default_secret_object = 'secret,id=keysec0,data=' + \
86 os.environ.get('IMGKEYSECRET', '')
87 luks_default_key_secret_opt = 'key-secret=keysec0'
90 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
91 """
92 Run qemu-img and return both its output and its exit code
93 """
94 subp = subprocess.Popen(qemu_img_args + list(args),
95 stdout=subprocess.PIPE,
96 stderr=subprocess.STDOUT,
97 universal_newlines=True)
98 output = subp.communicate()[0]
99 if subp.returncode < 0:
100 sys.stderr.write('qemu-img received signal %i: %s\n'
101 % (-subp.returncode,
102 ' '.join(qemu_img_args + list(args))))
103 return (output, subp.returncode)
105 def qemu_img(*args: str) -> int:
106 '''Run qemu-img and return the exit code'''
107 return qemu_img_pipe_and_status(*args)[1]
109 def ordered_qmp(qmsg, conv_keys=True):
110 # Dictionaries are not ordered prior to 3.6, therefore:
111 if isinstance(qmsg, list):
112 return [ordered_qmp(atom) for atom in qmsg]
113 if isinstance(qmsg, dict):
114 od = OrderedDict()
115 for k, v in sorted(qmsg.items()):
116 if conv_keys:
117 k = k.replace('_', '-')
118 od[k] = ordered_qmp(v, conv_keys=False)
119 return od
120 return qmsg
122 def qemu_img_create(*args):
123 args = list(args)
125 # default luks support
126 if '-f' in args and args[args.index('-f') + 1] == 'luks':
127 if '-o' in args:
128 i = args.index('-o')
129 if 'key-secret' not in args[i + 1]:
130 args[i + 1].append(luks_default_key_secret_opt)
131 args.insert(i + 2, '--object')
132 args.insert(i + 3, luks_default_secret_object)
133 else:
134 args = ['-o', luks_default_key_secret_opt,
135 '--object', luks_default_secret_object] + args
137 args.insert(0, 'create')
139 return qemu_img(*args)
141 def qemu_img_verbose(*args):
142 '''Run qemu-img without suppressing its output and return the exit code'''
143 exitcode = subprocess.call(qemu_img_args + list(args))
144 if exitcode < 0:
145 sys.stderr.write('qemu-img received signal %i: %s\n'
146 % (-exitcode, ' '.join(qemu_img_args + list(args))))
147 return exitcode
149 def qemu_img_pipe(*args: str) -> str:
150 '''Run qemu-img and return its output'''
151 return qemu_img_pipe_and_status(*args)[0]
153 def qemu_img_log(*args):
154 result = qemu_img_pipe(*args)
155 log(result, filters=[filter_testfiles])
156 return result
158 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
159 args = ['info']
160 if imgopts:
161 args.append('--image-opts')
162 else:
163 args += ['-f', imgfmt]
164 args += extra_args
165 args.append(filename)
167 output = qemu_img_pipe(*args)
168 if not filter_path:
169 filter_path = filename
170 log(filter_img_info(output, filter_path))
172 def qemu_io(*args):
173 '''Run qemu-io and return the stdout data'''
174 args = qemu_io_args + list(args)
175 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
176 stderr=subprocess.STDOUT,
177 universal_newlines=True)
178 output = subp.communicate()[0]
179 if subp.returncode < 0:
180 sys.stderr.write('qemu-io received signal %i: %s\n'
181 % (-subp.returncode, ' '.join(args)))
182 return output
184 def qemu_io_log(*args):
185 result = qemu_io(*args)
186 log(result, filters=[filter_testfiles, filter_qemu_io])
187 return result
189 def qemu_io_silent(*args):
190 '''Run qemu-io and return the exit code, suppressing stdout'''
191 args = qemu_io_args + list(args)
192 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
193 if exitcode < 0:
194 sys.stderr.write('qemu-io received signal %i: %s\n' %
195 (-exitcode, ' '.join(args)))
196 return exitcode
198 def qemu_io_silent_check(*args):
199 '''Run qemu-io and return the true if subprocess returned 0'''
200 args = qemu_io_args + list(args)
201 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
202 stderr=subprocess.STDOUT)
203 return exitcode == 0
205 def get_virtio_scsi_device():
206 if qemu_default_machine == 's390-ccw-virtio':
207 return 'virtio-scsi-ccw'
208 return 'virtio-scsi-pci'
210 class QemuIoInteractive:
211 def __init__(self, *args):
212 self.args = qemu_io_args_no_fmt + list(args)
213 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
214 stdout=subprocess.PIPE,
215 stderr=subprocess.STDOUT,
216 universal_newlines=True)
217 out = self._p.stdout.read(9)
218 if out != 'qemu-io> ':
219 # Most probably qemu-io just failed to start.
220 # Let's collect the whole output and exit.
221 out += self._p.stdout.read()
222 self._p.wait(timeout=1)
223 raise ValueError(out)
225 def close(self):
226 self._p.communicate('q\n')
228 def _read_output(self):
229 pattern = 'qemu-io> '
230 n = len(pattern)
231 pos = 0
232 s = []
233 while pos != n:
234 c = self._p.stdout.read(1)
235 # check unexpected EOF
236 assert c != ''
237 s.append(c)
238 if c == pattern[pos]:
239 pos += 1
240 else:
241 pos = 0
243 return ''.join(s[:-n])
245 def cmd(self, cmd):
246 # quit command is in close(), '\n' is added automatically
247 assert '\n' not in cmd
248 cmd = cmd.strip()
249 assert cmd not in ('q', 'quit')
250 self._p.stdin.write(cmd + '\n')
251 self._p.stdin.flush()
252 return self._read_output()
255 def qemu_nbd(*args):
256 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
257 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
259 def qemu_nbd_early_pipe(*args):
260 '''Run qemu-nbd in daemon mode and return both the parent's exit code
261 and its output in case of an error'''
262 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
263 stdout=subprocess.PIPE,
264 universal_newlines=True)
265 output = subp.communicate()[0]
266 if subp.returncode < 0:
267 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
268 (-subp.returncode,
269 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
271 return subp.returncode, output if subp.returncode else ''
273 def qemu_nbd_popen(*args):
274 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
275 return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
277 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
278 '''Return True if two image files are identical'''
279 return qemu_img('compare', '-f', fmt1,
280 '-F', fmt2, img1, img2) == 0
282 def create_image(name, size):
283 '''Create a fully-allocated raw image with sector markers'''
284 file = open(name, 'wb')
285 i = 0
286 while i < size:
287 sector = struct.pack('>l504xl', i // 512, i // 512)
288 file.write(sector)
289 i = i + 512
290 file.close()
292 def image_size(img):
293 '''Return image's virtual size'''
294 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
295 return json.loads(r)['virtual-size']
297 def is_str(val):
298 return isinstance(val, str)
300 test_dir_re = re.compile(r"%s" % test_dir)
301 def filter_test_dir(msg):
302 return test_dir_re.sub("TEST_DIR", msg)
304 win32_re = re.compile(r"\r")
305 def filter_win32(msg):
306 return win32_re.sub("", msg)
308 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
309 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
310 r"and [0-9\/.inf]* ops\/sec\)")
311 def filter_qemu_io(msg):
312 msg = filter_win32(msg)
313 return qemu_io_re.sub("X ops; XX:XX:XX.X "
314 "(XXX YYY/sec and XXX ops/sec)", msg)
316 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
317 def filter_chown(msg):
318 return chown_re.sub("chown UID:GID", msg)
320 def filter_qmp_event(event):
321 '''Filter a QMP event dict'''
322 event = dict(event)
323 if 'timestamp' in event:
324 event['timestamp']['seconds'] = 'SECS'
325 event['timestamp']['microseconds'] = 'USECS'
326 return event
328 def filter_qmp(qmsg, filter_fn):
329 '''Given a string filter, filter a QMP object's values.
330 filter_fn takes a (key, value) pair.'''
331 # Iterate through either lists or dicts;
332 if isinstance(qmsg, list):
333 items = enumerate(qmsg)
334 else:
335 items = qmsg.items()
337 for k, v in items:
338 if isinstance(v, (dict, list)):
339 qmsg[k] = filter_qmp(v, filter_fn)
340 else:
341 qmsg[k] = filter_fn(k, v)
342 return qmsg
344 def filter_testfiles(msg):
345 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
346 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
347 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
349 def filter_qmp_testfiles(qmsg):
350 def _filter(_key, value):
351 if is_str(value):
352 return filter_testfiles(value)
353 return value
354 return filter_qmp(qmsg, _filter)
356 def filter_generated_node_ids(msg):
357 return re.sub("#block[0-9]+", "NODE_NAME", msg)
359 def filter_img_info(output, filename):
360 lines = []
361 for line in output.split('\n'):
362 if 'disk size' in line or 'actual-size' in line:
363 continue
364 line = line.replace(filename, 'TEST_IMG')
365 line = filter_testfiles(line)
366 line = line.replace(imgfmt, 'IMGFMT')
367 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
368 line = re.sub('uuid: [-a-f0-9]+',
369 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
370 line)
371 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
372 lines.append(line)
373 return '\n'.join(lines)
375 def filter_imgfmt(msg):
376 return msg.replace(imgfmt, 'IMGFMT')
378 def filter_qmp_imgfmt(qmsg):
379 def _filter(_key, value):
380 if is_str(value):
381 return filter_imgfmt(value)
382 return value
383 return filter_qmp(qmsg, _filter)
386 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
388 def log(msg: Msg,
389 filters: Iterable[Callable[[Msg], Msg]] = (),
390 indent: Optional[int] = None) -> None:
392 Logs either a string message or a JSON serializable message (like QMP).
393 If indent is provided, JSON serializable messages are pretty-printed.
395 for flt in filters:
396 msg = flt(msg)
397 if isinstance(msg, (dict, list)):
398 # Don't sort if it's already sorted
399 do_sort = not isinstance(msg, OrderedDict)
400 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
401 else:
402 test_logger.info(msg)
404 class Timeout:
405 def __init__(self, seconds, errmsg="Timeout"):
406 self.seconds = seconds
407 self.errmsg = errmsg
408 def __enter__(self):
409 signal.signal(signal.SIGALRM, self.timeout)
410 signal.setitimer(signal.ITIMER_REAL, self.seconds)
411 return self
412 def __exit__(self, exc_type, value, traceback):
413 signal.setitimer(signal.ITIMER_REAL, 0)
414 return False
415 def timeout(self, signum, frame):
416 raise Exception(self.errmsg)
418 def file_pattern(name):
419 return "{0}-{1}".format(os.getpid(), name)
421 class FilePaths:
423 FilePaths is an auto-generated filename that cleans itself up.
425 Use this context manager to generate filenames and ensure that the file
426 gets deleted::
428 with FilePaths(['test.img']) as img_path:
429 qemu_img('create', img_path, '1G')
430 # migration_sock_path is automatically deleted
432 def __init__(self, names, base_dir=test_dir):
433 self.paths = []
434 for name in names:
435 self.paths.append(os.path.join(base_dir, file_pattern(name)))
437 def __enter__(self):
438 return self.paths
440 def __exit__(self, exc_type, exc_val, exc_tb):
441 try:
442 for path in self.paths:
443 os.remove(path)
444 except OSError:
445 pass
446 return False
448 class FilePath(FilePaths):
450 FilePath is a specialization of FilePaths that takes a single filename.
452 def __init__(self, name, base_dir=test_dir):
453 super(FilePath, self).__init__([name], base_dir)
455 def __enter__(self):
456 return self.paths[0]
458 def file_path_remover():
459 for path in reversed(file_path_remover.paths):
460 try:
461 os.remove(path)
462 except OSError:
463 pass
466 def file_path(*names, base_dir=test_dir):
467 ''' Another way to get auto-generated filename that cleans itself up.
469 Use is as simple as:
471 img_a, img_b = file_path('a.img', 'b.img')
472 sock = file_path('socket')
475 if not hasattr(file_path_remover, 'paths'):
476 file_path_remover.paths = []
477 atexit.register(file_path_remover)
479 paths = []
480 for name in names:
481 filename = file_pattern(name)
482 path = os.path.join(base_dir, filename)
483 file_path_remover.paths.append(path)
484 paths.append(path)
486 return paths[0] if len(paths) == 1 else paths
488 def remote_filename(path):
489 if imgproto == 'file':
490 return path
491 elif imgproto == 'ssh':
492 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
493 else:
494 raise Exception("Protocol %s not supported" % (imgproto))
496 class VM(qtest.QEMUQtestMachine):
497 '''A QEMU VM'''
499 def __init__(self, path_suffix=''):
500 name = "qemu%s-%d" % (path_suffix, os.getpid())
501 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
502 test_dir=test_dir,
503 socket_scm_helper=socket_scm_helper,
504 sock_dir=sock_dir)
505 self._num_drives = 0
507 def add_object(self, opts):
508 self._args.append('-object')
509 self._args.append(opts)
510 return self
512 def add_device(self, opts):
513 self._args.append('-device')
514 self._args.append(opts)
515 return self
517 def add_drive_raw(self, opts):
518 self._args.append('-drive')
519 self._args.append(opts)
520 return self
522 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
523 '''Add a virtio-blk drive to the VM'''
524 options = ['if=%s' % interface,
525 'id=drive%d' % self._num_drives]
527 if path is not None:
528 options.append('file=%s' % path)
529 options.append('format=%s' % img_format)
530 options.append('cache=%s' % cachemode)
531 options.append('aio=%s' % aiomode)
533 if opts:
534 options.append(opts)
536 if img_format == 'luks' and 'key-secret' not in opts:
537 # default luks support
538 if luks_default_secret_object not in self._args:
539 self.add_object(luks_default_secret_object)
541 options.append(luks_default_key_secret_opt)
543 self._args.append('-drive')
544 self._args.append(','.join(options))
545 self._num_drives += 1
546 return self
548 def add_blockdev(self, opts):
549 self._args.append('-blockdev')
550 if isinstance(opts, str):
551 self._args.append(opts)
552 else:
553 self._args.append(','.join(opts))
554 return self
556 def add_incoming(self, addr):
557 self._args.append('-incoming')
558 self._args.append(addr)
559 return self
561 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
562 cmd = 'human-monitor-command'
563 kwargs = {'command-line': command_line}
564 if use_log:
565 return self.qmp_log(cmd, **kwargs)
566 else:
567 return self.qmp(cmd, **kwargs)
569 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
570 """Pause drive r/w operations"""
571 if not event:
572 self.pause_drive(drive, "read_aio")
573 self.pause_drive(drive, "write_aio")
574 return
575 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
577 def resume_drive(self, drive: str) -> None:
578 """Resume drive r/w operations"""
579 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
581 def hmp_qemu_io(self, drive: str, cmd: str,
582 use_log: bool = False) -> QMPMessage:
583 """Write to a given drive using an HMP command"""
584 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
586 def flatten_qmp_object(self, obj, output=None, basestr=''):
587 if output is None:
588 output = dict()
589 if isinstance(obj, list):
590 for i, item in enumerate(obj):
591 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
592 elif isinstance(obj, dict):
593 for key in obj:
594 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
595 else:
596 output[basestr[:-1]] = obj # Strip trailing '.'
597 return output
599 def qmp_to_opts(self, obj):
600 obj = self.flatten_qmp_object(obj)
601 output_list = list()
602 for key in obj:
603 output_list += [key + '=' + obj[key]]
604 return ','.join(output_list)
606 def get_qmp_events_filtered(self, wait=60.0):
607 result = []
608 for ev in self.get_qmp_events(wait=wait):
609 result.append(filter_qmp_event(ev))
610 return result
612 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
613 full_cmd = OrderedDict((
614 ("execute", cmd),
615 ("arguments", ordered_qmp(kwargs))
617 log(full_cmd, filters, indent=indent)
618 result = self.qmp(cmd, **kwargs)
619 log(result, filters, indent=indent)
620 return result
622 # Returns None on success, and an error string on failure
623 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
624 pre_finalize=None, cancel=False, wait=60.0):
626 run_job moves a job from creation through to dismissal.
628 :param job: String. ID of recently-launched job
629 :param auto_finalize: Bool. True if the job was launched with
630 auto_finalize. Defaults to True.
631 :param auto_dismiss: Bool. True if the job was launched with
632 auto_dismiss=True. Defaults to False.
633 :param pre_finalize: Callback. A callable that takes no arguments to be
634 invoked prior to issuing job-finalize, if any.
635 :param cancel: Bool. When true, cancels the job after the pre_finalize
636 callback.
637 :param wait: Float. Timeout value specifying how long to wait for any
638 event, in seconds. Defaults to 60.0.
640 match_device = {'data': {'device': job}}
641 match_id = {'data': {'id': job}}
642 events = [
643 ('BLOCK_JOB_COMPLETED', match_device),
644 ('BLOCK_JOB_CANCELLED', match_device),
645 ('BLOCK_JOB_ERROR', match_device),
646 ('BLOCK_JOB_READY', match_device),
647 ('BLOCK_JOB_PENDING', match_id),
648 ('JOB_STATUS_CHANGE', match_id)
650 error = None
651 while True:
652 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
653 if ev['event'] != 'JOB_STATUS_CHANGE':
654 log(ev)
655 continue
656 status = ev['data']['status']
657 if status == 'aborting':
658 result = self.qmp('query-jobs')
659 for j in result['return']:
660 if j['id'] == job:
661 error = j['error']
662 log('Job failed: %s' % (j['error']))
663 elif status == 'ready':
664 self.qmp_log('job-complete', id=job)
665 elif status == 'pending' and not auto_finalize:
666 if pre_finalize:
667 pre_finalize()
668 if cancel:
669 self.qmp_log('job-cancel', id=job)
670 else:
671 self.qmp_log('job-finalize', id=job)
672 elif status == 'concluded' and not auto_dismiss:
673 self.qmp_log('job-dismiss', id=job)
674 elif status == 'null':
675 return error
677 # Returns None on success, and an error string on failure
678 def blockdev_create(self, options, job_id='job0', filters=None):
679 if filters is None:
680 filters = [filter_qmp_testfiles]
681 result = self.qmp_log('blockdev-create', filters=filters,
682 job_id=job_id, options=options)
684 if 'return' in result:
685 assert result['return'] == {}
686 job_result = self.run_job(job_id)
687 else:
688 job_result = result['error']
690 log("")
691 return job_result
693 def enable_migration_events(self, name):
694 log('Enabling migration QMP events on %s...' % name)
695 log(self.qmp('migrate-set-capabilities', capabilities=[
697 'capability': 'events',
698 'state': True
702 def wait_migration(self, expect_runstate):
703 while True:
704 event = self.event_wait('MIGRATION')
705 log(event, filters=[filter_qmp_event])
706 if event['data']['status'] == 'completed':
707 break
708 # The event may occur in finish-migrate, so wait for the expected
709 # post-migration runstate
710 while self.qmp('query-status')['return']['status'] != expect_runstate:
711 pass
713 def node_info(self, node_name):
714 nodes = self.qmp('query-named-block-nodes')
715 for x in nodes['return']:
716 if x['node-name'] == node_name:
717 return x
718 return None
720 def query_bitmaps(self):
721 res = self.qmp("query-named-block-nodes")
722 return {device['node-name']: device['dirty-bitmaps']
723 for device in res['return'] if 'dirty-bitmaps' in device}
725 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
727 get a specific bitmap from the object returned by query_bitmaps.
728 :param recording: If specified, filter results by the specified value.
729 :param bitmaps: If specified, use it instead of call query_bitmaps()
731 if bitmaps is None:
732 bitmaps = self.query_bitmaps()
734 for bitmap in bitmaps[node_name]:
735 if bitmap.get('name', '') == bitmap_name:
736 if recording is None or bitmap.get('recording') == recording:
737 return bitmap
738 return None
740 def check_bitmap_status(self, node_name, bitmap_name, fields):
741 ret = self.get_bitmap(node_name, bitmap_name)
743 return fields.items() <= ret.items()
745 def assert_block_path(self, root, path, expected_node, graph=None):
747 Check whether the node under the given path in the block graph
748 is @expected_node.
750 @root is the node name of the node where the @path is rooted.
752 @path is a string that consists of child names separated by
753 slashes. It must begin with a slash.
755 Examples for @root + @path:
756 - root="qcow2-node", path="/backing/file"
757 - root="quorum-node", path="/children.2/file"
759 Hypothetically, @path could be empty, in which case it would
760 point to @root. However, in practice this case is not useful
761 and hence not allowed.
763 @expected_node may be None. (All elements of the path but the
764 leaf must still exist.)
766 @graph may be None or the result of an x-debug-query-block-graph
767 call that has already been performed.
769 if graph is None:
770 graph = self.qmp('x-debug-query-block-graph')['return']
772 iter_path = iter(path.split('/'))
774 # Must start with a /
775 assert next(iter_path) == ''
777 node = next((node for node in graph['nodes'] if node['name'] == root),
778 None)
780 # An empty @path is not allowed, so the root node must be present
781 assert node is not None, 'Root node %s not found' % root
783 for child_name in iter_path:
784 assert node is not None, 'Cannot follow path %s%s' % (root, path)
786 try:
787 node_id = next(edge['child'] for edge in graph['edges']
788 if (edge['parent'] == node['id'] and
789 edge['name'] == child_name))
791 node = next(node for node in graph['nodes']
792 if node['id'] == node_id)
794 except StopIteration:
795 node = None
797 if node is None:
798 assert expected_node is None, \
799 'No node found under %s (but expected %s)' % \
800 (path, expected_node)
801 else:
802 assert node['name'] == expected_node, \
803 'Found node %s under %s (but expected %s)' % \
804 (node['name'], path, expected_node)
806 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
808 class QMPTestCase(unittest.TestCase):
809 '''Abstract base class for QMP test cases'''
811 def __init__(self, *args, **kwargs):
812 super().__init__(*args, **kwargs)
813 # Many users of this class set a VM property we rely on heavily
814 # in the methods below.
815 self.vm = None
817 def dictpath(self, d, path):
818 '''Traverse a path in a nested dict'''
819 for component in path.split('/'):
820 m = index_re.match(component)
821 if m:
822 component, idx = m.groups()
823 idx = int(idx)
825 if not isinstance(d, dict) or component not in d:
826 self.fail(f'failed path traversal for "{path}" in "{d}"')
827 d = d[component]
829 if m:
830 if not isinstance(d, list):
831 self.fail(f'path component "{component}" in "{path}" '
832 f'is not a list in "{d}"')
833 try:
834 d = d[idx]
835 except IndexError:
836 self.fail(f'invalid index "{idx}" in path "{path}" '
837 f'in "{d}"')
838 return d
840 def assert_qmp_absent(self, d, path):
841 try:
842 result = self.dictpath(d, path)
843 except AssertionError:
844 return
845 self.fail('path "%s" has value "%s"' % (path, str(result)))
847 def assert_qmp(self, d, path, value):
848 '''Assert that the value for a specific path in a QMP dict
849 matches. When given a list of values, assert that any of
850 them matches.'''
852 result = self.dictpath(d, path)
854 # [] makes no sense as a list of valid values, so treat it as
855 # an actual single value.
856 if isinstance(value, list) and value != []:
857 for v in value:
858 if result == v:
859 return
860 self.fail('no match for "%s" in %s' % (str(result), str(value)))
861 else:
862 self.assertEqual(result, value,
863 '"%s" is "%s", expected "%s"'
864 % (path, str(result), str(value)))
866 def assert_no_active_block_jobs(self):
867 result = self.vm.qmp('query-block-jobs')
868 self.assert_qmp(result, 'return', [])
870 def assert_has_block_node(self, node_name=None, file_name=None):
871 """Issue a query-named-block-nodes and assert node_name and/or
872 file_name is present in the result"""
873 def check_equal_or_none(a, b):
874 return a is None or b is None or a == b
875 assert node_name or file_name
876 result = self.vm.qmp('query-named-block-nodes')
877 for x in result["return"]:
878 if check_equal_or_none(x.get("node-name"), node_name) and \
879 check_equal_or_none(x.get("file"), file_name):
880 return
881 self.fail("Cannot find %s %s in result:\n%s" %
882 (node_name, file_name, result))
884 def assert_json_filename_equal(self, json_filename, reference):
885 '''Asserts that the given filename is a json: filename and that its
886 content is equal to the given reference object'''
887 self.assertEqual(json_filename[:5], 'json:')
888 self.assertEqual(
889 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
890 self.vm.flatten_qmp_object(reference)
893 def cancel_and_wait(self, drive='drive0', force=False,
894 resume=False, wait=60.0):
895 '''Cancel a block job and wait for it to finish, returning the event'''
896 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
897 self.assert_qmp(result, 'return', {})
899 if resume:
900 self.vm.resume_drive(drive)
902 cancelled = False
903 result = None
904 while not cancelled:
905 for event in self.vm.get_qmp_events(wait=wait):
906 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
907 event['event'] == 'BLOCK_JOB_CANCELLED':
908 self.assert_qmp(event, 'data/device', drive)
909 result = event
910 cancelled = True
911 elif event['event'] == 'JOB_STATUS_CHANGE':
912 self.assert_qmp(event, 'data/id', drive)
915 self.assert_no_active_block_jobs()
916 return result
918 def wait_until_completed(self, drive='drive0', check_offset=True,
919 wait=60.0, error=None):
920 '''Wait for a block job to finish, returning the event'''
921 while True:
922 for event in self.vm.get_qmp_events(wait=wait):
923 if event['event'] == 'BLOCK_JOB_COMPLETED':
924 self.assert_qmp(event, 'data/device', drive)
925 if error is None:
926 self.assert_qmp_absent(event, 'data/error')
927 if check_offset:
928 self.assert_qmp(event, 'data/offset',
929 event['data']['len'])
930 else:
931 self.assert_qmp(event, 'data/error', error)
932 self.assert_no_active_block_jobs()
933 return event
934 if event['event'] == 'JOB_STATUS_CHANGE':
935 self.assert_qmp(event, 'data/id', drive)
937 def wait_ready(self, drive='drive0'):
938 """Wait until a BLOCK_JOB_READY event, and return the event."""
939 f = {'data': {'type': 'mirror', 'device': drive}}
940 return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
942 def wait_ready_and_cancel(self, drive='drive0'):
943 self.wait_ready(drive=drive)
944 event = self.cancel_and_wait(drive=drive)
945 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
946 self.assert_qmp(event, 'data/type', 'mirror')
947 self.assert_qmp(event, 'data/offset', event['data']['len'])
949 def complete_and_wait(self, drive='drive0', wait_ready=True,
950 completion_error=None):
951 '''Complete a block job and wait for it to finish'''
952 if wait_ready:
953 self.wait_ready(drive=drive)
955 result = self.vm.qmp('block-job-complete', device=drive)
956 self.assert_qmp(result, 'return', {})
958 event = self.wait_until_completed(drive=drive, error=completion_error)
959 self.assert_qmp(event, 'data/type', 'mirror')
961 def pause_wait(self, job_id='job0'):
962 with Timeout(3, "Timeout waiting for job to pause"):
963 while True:
964 result = self.vm.qmp('query-block-jobs')
965 found = False
966 for job in result['return']:
967 if job['device'] == job_id:
968 found = True
969 if job['paused'] and not job['busy']:
970 return job
971 break
972 assert found
974 def pause_job(self, job_id='job0', wait=True):
975 result = self.vm.qmp('block-job-pause', device=job_id)
976 self.assert_qmp(result, 'return', {})
977 if wait:
978 return self.pause_wait(job_id)
979 return result
981 def case_skip(self, reason):
982 '''Skip this test case'''
983 case_notrun(reason)
984 self.skipTest(reason)
987 def notrun(reason):
988 '''Skip this test suite'''
989 # Each test in qemu-iotests has a number ("seq")
990 seq = os.path.basename(sys.argv[0])
992 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
993 logger.warning("%s not run: %s", seq, reason)
994 sys.exit(0)
996 def case_notrun(reason):
997 '''Mark this test case as not having been run (without actually
998 skipping it, that is left to the caller). See
999 QMPTestCase.case_skip() for a variant that actually skips the
1000 current test case.'''
1002 # Each test in qemu-iotests has a number ("seq")
1003 seq = os.path.basename(sys.argv[0])
1005 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1006 ' [case not run] ' + reason + '\n')
1008 def _verify_image_format(supported_fmts: Sequence[str] = (),
1009 unsupported_fmts: Sequence[str] = ()) -> None:
1010 assert not (supported_fmts and unsupported_fmts)
1012 if 'generic' in supported_fmts and \
1013 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1014 # similar to
1015 # _supported_fmt generic
1016 # for bash tests
1017 if imgfmt == 'luks':
1018 verify_working_luks()
1019 return
1021 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1022 if not_sup or (imgfmt in unsupported_fmts):
1023 notrun('not suitable for this image format: %s' % imgfmt)
1025 if imgfmt == 'luks':
1026 verify_working_luks()
1028 def _verify_protocol(supported: Sequence[str] = (),
1029 unsupported: Sequence[str] = ()) -> None:
1030 assert not (supported and unsupported)
1032 if 'generic' in supported:
1033 return
1035 not_sup = supported and (imgproto not in supported)
1036 if not_sup or (imgproto in unsupported):
1037 notrun('not suitable for this protocol: %s' % imgproto)
1039 def _verify_platform(supported: Sequence[str] = (),
1040 unsupported: Sequence[str] = ()) -> None:
1041 if any((sys.platform.startswith(x) for x in unsupported)):
1042 notrun('not suitable for this OS: %s' % sys.platform)
1044 if supported:
1045 if not any((sys.platform.startswith(x) for x in supported)):
1046 notrun('not suitable for this OS: %s' % sys.platform)
1048 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1049 if supported_cache_modes and (cachemode not in supported_cache_modes):
1050 notrun('not suitable for this cache mode: %s' % cachemode)
1052 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1053 if supported_aio_modes and (aiomode not in supported_aio_modes):
1054 notrun('not suitable for this aio mode: %s' % aiomode)
1056 def supports_quorum():
1057 return 'quorum' in qemu_img_pipe('--help')
1059 def verify_quorum():
1060 '''Skip test suite if quorum support is not available'''
1061 if not supports_quorum():
1062 notrun('quorum support missing')
1064 def has_working_luks() -> Tuple[bool, str]:
1066 Check whether our LUKS driver can actually create images
1067 (this extends to LUKS encryption for qcow2).
1069 If not, return the reason why.
1072 img_file = f'{test_dir}/luks-test.luks'
1073 (output, status) = \
1074 qemu_img_pipe_and_status('create', '-f', 'luks',
1075 '--object', luks_default_secret_object,
1076 '-o', luks_default_key_secret_opt,
1077 '-o', 'iter-time=10',
1078 img_file, '1G')
1079 try:
1080 os.remove(img_file)
1081 except OSError:
1082 pass
1084 if status != 0:
1085 reason = output
1086 for line in output.splitlines():
1087 if img_file + ':' in line:
1088 reason = line.split(img_file + ':', 1)[1].strip()
1089 break
1091 return (False, reason)
1092 else:
1093 return (True, '')
1095 def verify_working_luks():
1097 Skip test suite if LUKS does not work
1099 (working, reason) = has_working_luks()
1100 if not working:
1101 notrun(reason)
1103 def qemu_pipe(*args):
1105 Run qemu with an option to print something and exit (e.g. a help option).
1107 :return: QEMU's stdout output.
1109 args = [qemu_prog] + qemu_opts + list(args)
1110 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1111 stderr=subprocess.STDOUT,
1112 universal_newlines=True)
1113 output = subp.communicate()[0]
1114 if subp.returncode < 0:
1115 sys.stderr.write('qemu received signal %i: %s\n' %
1116 (-subp.returncode, ' '.join(args)))
1117 return output
1119 def supported_formats(read_only=False):
1120 '''Set 'read_only' to True to check ro-whitelist
1121 Otherwise, rw-whitelist is checked'''
1123 if not hasattr(supported_formats, "formats"):
1124 supported_formats.formats = {}
1126 if read_only not in supported_formats.formats:
1127 format_message = qemu_pipe("-drive", "format=help")
1128 line = 1 if read_only else 0
1129 supported_formats.formats[read_only] = \
1130 format_message.splitlines()[line].split(":")[1].split()
1132 return supported_formats.formats[read_only]
1134 def skip_if_unsupported(required_formats=(), read_only=False):
1135 '''Skip Test Decorator
1136 Runs the test if all the required formats are whitelisted'''
1137 def skip_test_decorator(func):
1138 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1139 **kwargs: Dict[str, Any]) -> None:
1140 if callable(required_formats):
1141 fmts = required_formats(test_case)
1142 else:
1143 fmts = required_formats
1145 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1146 if usf_list:
1147 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1148 test_case.case_skip(msg)
1149 else:
1150 func(test_case, *args, **kwargs)
1151 return func_wrapper
1152 return skip_test_decorator
1154 def skip_for_formats(formats: Sequence[str] = ()) \
1155 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1156 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1157 '''Skip Test Decorator
1158 Skips the test for the given formats'''
1159 def skip_test_decorator(func):
1160 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1161 **kwargs: Dict[str, Any]) -> None:
1162 if imgfmt in formats:
1163 msg = f'{test_case}: Skipped for format {imgfmt}'
1164 test_case.case_skip(msg)
1165 else:
1166 func(test_case, *args, **kwargs)
1167 return func_wrapper
1168 return skip_test_decorator
1170 def skip_if_user_is_root(func):
1171 '''Skip Test Decorator
1172 Runs the test only without root permissions'''
1173 def func_wrapper(*args, **kwargs):
1174 if os.getuid() == 0:
1175 case_notrun('{}: cannot be run as root'.format(args[0]))
1176 return None
1177 else:
1178 return func(*args, **kwargs)
1179 return func_wrapper
1181 def execute_unittest(debug=False):
1182 """Executes unittests within the calling module."""
1184 verbosity = 2 if debug else 1
1186 if debug:
1187 output = sys.stdout
1188 else:
1189 # We need to filter out the time taken from the output so that
1190 # qemu-iotest can reliably diff the results against master output.
1191 output = io.StringIO()
1193 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1194 verbosity=verbosity)
1195 try:
1196 # unittest.main() will use sys.exit(); so expect a SystemExit
1197 # exception
1198 unittest.main(testRunner=runner)
1199 finally:
1200 # We need to filter out the time taken from the output so that
1201 # qemu-iotest can reliably diff the results against master output.
1202 if not debug:
1203 out = output.getvalue()
1204 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1206 # Hide skipped tests from the reference output
1207 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1208 out_first_line, out_rest = out.split('\n', 1)
1209 out = out_first_line.replace('s', '.') + '\n' + out_rest
1211 sys.stderr.write(out)
1213 def execute_setup_common(supported_fmts: Sequence[str] = (),
1214 supported_platforms: Sequence[str] = (),
1215 supported_cache_modes: Sequence[str] = (),
1216 supported_aio_modes: Sequence[str] = (),
1217 unsupported_fmts: Sequence[str] = (),
1218 supported_protocols: Sequence[str] = (),
1219 unsupported_protocols: Sequence[str] = ()) -> bool:
1221 Perform necessary setup for either script-style or unittest-style tests.
1223 :return: Bool; Whether or not debug mode has been requested via the CLI.
1225 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1227 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1228 # indicate that we're not being run via "check". There may be
1229 # other things set up by "check" that individual test cases rely
1230 # on.
1231 if test_dir is None or qemu_default_machine is None:
1232 sys.stderr.write('Please run this test via the "check" script\n')
1233 sys.exit(os.EX_USAGE)
1235 debug = '-d' in sys.argv
1236 if debug:
1237 sys.argv.remove('-d')
1238 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1240 _verify_image_format(supported_fmts, unsupported_fmts)
1241 _verify_protocol(supported_protocols, unsupported_protocols)
1242 _verify_platform(supported=supported_platforms)
1243 _verify_cache_mode(supported_cache_modes)
1244 _verify_aio_mode(supported_aio_modes)
1246 return debug
1248 def execute_test(*args, test_function=None, **kwargs):
1249 """Run either unittest or script-style tests."""
1251 debug = execute_setup_common(*args, **kwargs)
1252 if not test_function:
1253 execute_unittest(debug)
1254 else:
1255 test_function()
1257 def activate_logging():
1258 """Activate iotests.log() output to stdout for script-style tests."""
1259 handler = logging.StreamHandler(stream=sys.stdout)
1260 formatter = logging.Formatter('%(message)s')
1261 handler.setFormatter(formatter)
1262 test_logger.addHandler(handler)
1263 test_logger.setLevel(logging.INFO)
1264 test_logger.propagate = False
1266 # This is called from script-style iotests without a single point of entry
1267 def script_initialize(*args, **kwargs):
1268 """Initialize script-style tests without running any tests."""
1269 activate_logging()
1270 execute_setup_common(*args, **kwargs)
1272 # This is called from script-style iotests with a single point of entry
1273 def script_main(test_function, *args, **kwargs):
1274 """Run script-style tests outside of the unittest framework"""
1275 activate_logging()
1276 execute_test(*args, test_function=test_function, **kwargs)
1278 # This is called from unittest style iotests
1279 def main(*args, **kwargs):
1280 """Run tests using the unittest framework"""
1281 execute_test(*args, **kwargs)