Merge tag 'qemu-macppc-20230206' of https://github.com/mcayland/qemu into staging
[qemu.git] / tests / avocado / avocado_qemu / __init__.py
blob25a546842fab23e79c24e2099d2a02551a894f24
1 # Test class and utilities for functional tests
3 # Copyright (c) 2018 Red Hat, Inc.
5 # Author:
6 # Cleber Rosa <crosa@redhat.com>
8 # This work is licensed under the terms of the GNU GPL, version 2 or
9 # later. See the COPYING file in the top-level directory.
11 import logging
12 import os
13 import shutil
14 import subprocess
15 import sys
16 import tempfile
17 import time
18 import uuid
20 import avocado
21 from avocado.utils import cloudinit, datadrainer, process, ssh, vmimage
22 from avocado.utils.path import find_command
24 from qemu.machine import QEMUMachine
25 from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available,
26 tcg_available)
29 #: The QEMU build root directory. It may also be the source directory
30 #: if building from the source dir, but it's safer to use BUILD_DIR for
31 #: that purpose. Be aware that if this code is moved outside of a source
32 #: and build tree, it will not be accurate.
33 BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
35 if os.path.islink(os.path.dirname(os.path.dirname(__file__))):
36 # The link to the avocado tests dir in the source code directory
37 lnk = os.path.dirname(os.path.dirname(__file__))
38 #: The QEMU root source directory
39 SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk)))
40 else:
41 SOURCE_DIR = BUILD_DIR
44 def has_cmd(name, args=None):
45 """
46 This function is for use in a @avocado.skipUnless decorator, e.g.:
48 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
49 def test_something_that_needs_sudo(self):
50 ...
51 """
53 if args is None:
54 args = ('which', name)
56 try:
57 _, stderr, exitcode = run_cmd(args)
58 except Exception as e:
59 exitcode = -1
60 stderr = str(e)
62 if exitcode != 0:
63 cmd_line = ' '.join(args)
64 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
65 return (False, err)
66 else:
67 return (True, '')
69 def has_cmds(*cmds):
70 """
71 This function is for use in a @avocado.skipUnless decorator and
72 allows checking for the availability of multiple commands, e.g.:
74 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
75 'cmd2', 'cmd3'))
76 def test_something_that_needs_cmd1_and_cmd2(self):
77 ...
78 """
80 for cmd in cmds:
81 if isinstance(cmd, str):
82 cmd = (cmd,)
84 ok, errstr = has_cmd(*cmd)
85 if not ok:
86 return (False, errstr)
88 return (True, '')
90 def run_cmd(args):
91 subp = subprocess.Popen(args,
92 stdout=subprocess.PIPE,
93 stderr=subprocess.PIPE,
94 universal_newlines=True)
95 stdout, stderr = subp.communicate()
96 ret = subp.returncode
98 return (stdout, stderr, ret)
100 def is_readable_executable_file(path):
101 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
104 def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None):
106 Picks the path of a QEMU binary, starting either in the current working
107 directory or in the source tree root directory.
109 :param arch: the arch to use when looking for a QEMU binary (the target
110 will match the arch given). If None (the default), arch
111 will be the current host system arch (as given by
112 :func:`os.uname`).
113 :type arch: str
114 :returns: the path to the default QEMU binary or None if one could not
115 be found
116 :rtype: str or None
118 if arch is None:
119 arch = os.uname()[4]
120 # qemu binary path does not match arch for powerpc, handle it
121 if 'ppc64le' in arch:
122 arch = 'ppc64'
123 qemu_bin_name = bin_prefix + arch
124 qemu_bin_paths = [
125 os.path.join(".", qemu_bin_name),
126 os.path.join(BUILD_DIR, qemu_bin_name),
127 os.path.join(BUILD_DIR, "build", qemu_bin_name),
129 for path in qemu_bin_paths:
130 if is_readable_executable_file(path):
131 return path
132 return None
135 def _console_interaction(test, success_message, failure_message,
136 send_string, keep_sending=False, vm=None):
137 assert not keep_sending or send_string
138 if vm is None:
139 vm = test.vm
140 console = vm.console_socket.makefile(mode='rb', encoding='utf-8')
141 console_logger = logging.getLogger('console')
142 while True:
143 if send_string:
144 vm.console_socket.sendall(send_string.encode())
145 if not keep_sending:
146 send_string = None # send only once
147 try:
148 msg = console.readline().decode().strip()
149 except UnicodeDecodeError:
150 msg = None
151 if not msg:
152 continue
153 console_logger.debug(msg)
154 if success_message is None or success_message in msg:
155 break
156 if failure_message and failure_message in msg:
157 console.close()
158 fail = 'Failure message found in console: "%s". Expected: "%s"' % \
159 (failure_message, success_message)
160 test.fail(fail)
162 def interrupt_interactive_console_until_pattern(test, success_message,
163 failure_message=None,
164 interrupt_string='\r'):
166 Keep sending a string to interrupt a console prompt, while logging the
167 console output. Typical use case is to break a boot loader prompt, such:
169 Press a key within 5 seconds to interrupt boot process.
175 Booting default image...
177 :param test: an Avocado test containing a VM that will have its console
178 read and probed for a success or failure message
179 :type test: :class:`avocado_qemu.QemuSystemTest`
180 :param success_message: if this message appears, test succeeds
181 :param failure_message: if this message appears, test fails
182 :param interrupt_string: a string to send to the console before trying
183 to read a new line
185 _console_interaction(test, success_message, failure_message,
186 interrupt_string, True)
188 def wait_for_console_pattern(test, success_message, failure_message=None,
189 vm=None):
191 Waits for messages to appear on the console, while logging the content
193 :param test: an Avocado test containing a VM that will have its console
194 read and probed for a success or failure message
195 :type test: :class:`avocado_qemu.QemuSystemTest`
196 :param success_message: if this message appears, test succeeds
197 :param failure_message: if this message appears, test fails
199 _console_interaction(test, success_message, failure_message, None, vm=vm)
201 def exec_command(test, command):
203 Send a command to a console (appending CRLF characters), while logging
204 the content.
206 :param test: an Avocado test containing a VM.
207 :type test: :class:`avocado_qemu.QemuSystemTest`
208 :param command: the command to send
209 :type command: str
211 _console_interaction(test, None, None, command + '\r')
213 def exec_command_and_wait_for_pattern(test, command,
214 success_message, failure_message=None):
216 Send a command to a console (appending CRLF characters), then wait
217 for success_message to appear on the console, while logging the.
218 content. Mark the test as failed if failure_message is found instead.
220 :param test: an Avocado test containing a VM that will have its console
221 read and probed for a success or failure message
222 :type test: :class:`avocado_qemu.QemuSystemTest`
223 :param command: the command to send
224 :param success_message: if this message appears, test succeeds
225 :param failure_message: if this message appears, test fails
227 _console_interaction(test, success_message, failure_message, command + '\r')
229 class QemuBaseTest(avocado.Test):
231 # default timeout for all tests, can be overridden
232 timeout = 120
234 def _get_unique_tag_val(self, tag_name):
236 Gets a tag value, if unique for a key
238 vals = self.tags.get(tag_name, [])
239 if len(vals) == 1:
240 return vals.pop()
241 return None
243 def setUp(self, bin_prefix):
244 self.arch = self.params.get('arch',
245 default=self._get_unique_tag_val('arch'))
247 self.cpu = self.params.get('cpu',
248 default=self._get_unique_tag_val('cpu'))
250 default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch)
251 self.qemu_bin = self.params.get('qemu_bin',
252 default=default_qemu_bin)
253 if self.qemu_bin is None:
254 self.cancel("No QEMU binary defined or found in the build tree")
256 def fetch_asset(self, name,
257 asset_hash=None, algorithm=None,
258 locations=None, expire=None,
259 find_only=False, cancel_on_missing=True):
260 return super().fetch_asset(name,
261 asset_hash=asset_hash,
262 algorithm=algorithm,
263 locations=locations,
264 expire=expire,
265 find_only=find_only,
266 cancel_on_missing=cancel_on_missing)
269 class QemuSystemTest(QemuBaseTest):
270 """Facilitates system emulation tests."""
272 def setUp(self):
273 self._vms = {}
275 super().setUp('qemu-system-')
277 self.machine = self.params.get('machine',
278 default=self._get_unique_tag_val('machine'))
280 def require_accelerator(self, accelerator):
282 Requires an accelerator to be available for the test to continue
284 It takes into account the currently set qemu binary.
286 If the check fails, the test is canceled. If the check itself
287 for the given accelerator is not available, the test is also
288 canceled.
290 :param accelerator: name of the accelerator, such as "kvm" or "tcg"
291 :type accelerator: str
293 checker = {'tcg': tcg_available,
294 'kvm': kvm_available}.get(accelerator)
295 if checker is None:
296 self.cancel("Don't know how to check for the presence "
297 "of accelerator %s" % accelerator)
298 if not checker(qemu_bin=self.qemu_bin):
299 self.cancel("%s accelerator does not seem to be "
300 "available" % accelerator)
302 def require_netdev(self, netdevname):
303 netdevhelp = run_cmd([self.qemu_bin,
304 '-M', 'none', '-netdev', 'help'])[0];
305 if netdevhelp.find('\n' + netdevname + '\n') < 0:
306 self.cancel('no support for user networking')
308 def _new_vm(self, name, *args):
309 self._sd = tempfile.TemporaryDirectory(prefix="qemu_")
310 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
311 sock_dir=self._sd.name, log_dir=self.logdir)
312 self.log.debug('QEMUMachine "%s" created', name)
313 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
314 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
315 if args:
316 vm.add_args(*args)
317 return vm
319 @property
320 def vm(self):
321 return self.get_vm(name='default')
323 def get_vm(self, *args, name=None):
324 if not name:
325 name = str(uuid.uuid4())
326 if self._vms.get(name) is None:
327 self._vms[name] = self._new_vm(name, *args)
328 if self.cpu is not None:
329 self._vms[name].add_args('-cpu', self.cpu)
330 if self.machine is not None:
331 self._vms[name].set_machine(self.machine)
332 return self._vms[name]
334 def set_vm_arg(self, arg, value):
336 Set an argument to list of extra arguments to be given to the QEMU
337 binary. If the argument already exists then its value is replaced.
339 :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
340 :type arg: str
341 :param value: the argument value, such as "host" in "-cpu host"
342 :type value: str
344 if not arg or not value:
345 return
346 if arg not in self.vm.args:
347 self.vm.args.extend([arg, value])
348 else:
349 idx = self.vm.args.index(arg) + 1
350 if idx < len(self.vm.args):
351 self.vm.args[idx] = value
352 else:
353 self.vm.args.append(value)
355 def tearDown(self):
356 for vm in self._vms.values():
357 vm.shutdown()
358 self._sd = None
359 super().tearDown()
362 class QemuUserTest(QemuBaseTest):
363 """Facilitates user-mode emulation tests."""
365 def setUp(self):
366 self._ldpath = []
367 super().setUp('qemu-')
369 def add_ldpath(self, ldpath):
370 self._ldpath.append(os.path.abspath(ldpath))
372 def run(self, bin_path, args=[]):
373 qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath])
374 bin_args = " ".join(args)
375 return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args,
376 bin_path, bin_args))
379 class LinuxSSHMixIn:
380 """Contains utility methods for interacting with a guest via SSH."""
382 def ssh_connect(self, username, credential, credential_is_key=True):
383 self.ssh_logger = logging.getLogger('ssh')
384 res = self.vm.command('human-monitor-command',
385 command_line='info usernet')
386 port = get_info_usernet_hostfwd_port(res)
387 self.assertIsNotNone(port)
388 self.assertGreater(port, 0)
389 self.log.debug('sshd listening on port: %d', port)
390 if credential_is_key:
391 self.ssh_session = ssh.Session('127.0.0.1', port=port,
392 user=username, key=credential)
393 else:
394 self.ssh_session = ssh.Session('127.0.0.1', port=port,
395 user=username, password=credential)
396 for i in range(10):
397 try:
398 self.ssh_session.connect()
399 return
400 except:
401 time.sleep(i)
402 self.fail('ssh connection timeout')
404 def ssh_command(self, command):
405 self.ssh_logger.info(command)
406 result = self.ssh_session.cmd(command)
407 stdout_lines = [line.rstrip() for line
408 in result.stdout_text.splitlines()]
409 for line in stdout_lines:
410 self.ssh_logger.info(line)
411 stderr_lines = [line.rstrip() for line
412 in result.stderr_text.splitlines()]
413 for line in stderr_lines:
414 self.ssh_logger.warning(line)
416 self.assertEqual(result.exit_status, 0,
417 f'Guest command failed: {command}')
418 return stdout_lines, stderr_lines
420 class LinuxDistro:
421 """Represents a Linux distribution
423 Holds information of known distros.
425 #: A collection of known distros and their respective image checksum
426 KNOWN_DISTROS = {
427 'fedora': {
428 '31': {
429 'x86_64':
430 {'checksum': ('e3c1b309d9203604922d6e255c2c5d09'
431 '8a309c2d46215d8fc026954f3c5c27a0'),
432 'pxeboot_url': ('https://archives.fedoraproject.org/'
433 'pub/archive/fedora/linux/releases/31/'
434 'Everything/x86_64/os/images/pxeboot/'),
435 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-'
436 '08a96687f73c ro no_timer_check '
437 'net.ifnames=0 console=tty1 '
438 'console=ttyS0,115200n8'),
440 'aarch64':
441 {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae'
442 'd2af0ad0329383d5639c997fdf16fe49'),
443 'pxeboot_url': 'https://archives.fedoraproject.org/'
444 'pub/archive/fedora/linux/releases/31/'
445 'Everything/aarch64/os/images/pxeboot/',
446 'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-'
447 '355e8475b0a7 ro earlyprintk=pl011,0x9000000'
448 ' ignore_loglevel no_timer_check'
449 ' printk.time=1 rd_NO_PLYMOUTH'
450 ' console=ttyAMA0'),
452 'ppc64':
453 {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4'
454 '3f991c506f2cc390dc4efa2026ad2f58')},
455 's390x':
456 {'checksum': ('4caaab5a434fd4d1079149a072fdc789'
457 '1e354f834d355069ca982fdcaf5a122d')},
459 '32': {
460 'aarch64':
461 {'checksum': ('b367755c664a2d7a26955bbfff985855'
462 'adfa2ca15e908baf15b4b176d68d3967'),
463 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
464 'releases/32/Server/aarch64/os/images/'
465 'pxeboot/'),
466 'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-'
467 '14d95c0e90c5 ro no_timer_check net.ifnames=0'
468 ' console=tty1 console=ttyS0,115200n8'),
471 '33': {
472 'aarch64':
473 {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1'
474 'a81f386a17f969c1d1c7c87031008a6b'),
475 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
476 'releases/33/Server/aarch64/os/images/'
477 'pxeboot/'),
478 'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-'
479 '1126a0208f8a ro no_timer_check net.ifnames=0'
480 ' console=tty1 console=ttyS0,115200n8'
481 ' console=tty0'),
487 def __init__(self, name, version, arch):
488 self.name = name
489 self.version = version
490 self.arch = arch
491 try:
492 info = self.KNOWN_DISTROS.get(name).get(version).get(arch)
493 except AttributeError:
494 # Unknown distro
495 info = None
496 self._info = info or {}
498 @property
499 def checksum(self):
500 """Gets the cloud-image file checksum"""
501 return self._info.get('checksum', None)
503 @checksum.setter
504 def checksum(self, value):
505 self._info['checksum'] = value
507 @property
508 def pxeboot_url(self):
509 """Gets the repository url where pxeboot files can be found"""
510 return self._info.get('pxeboot_url', None)
512 @property
513 def default_kernel_params(self):
514 """Gets the default kernel parameters"""
515 return self._info.get('kernel_params', None)
518 class LinuxTest(LinuxSSHMixIn, QemuSystemTest):
519 """Facilitates having a cloud-image Linux based available.
521 For tests that intend to interact with guests, this is a better choice
522 to start with than the more vanilla `QemuSystemTest` class.
525 distro = None
526 username = 'root'
527 password = 'password'
528 smp = '2'
529 memory = '1024'
531 def _set_distro(self):
532 distro_name = self.params.get(
533 'distro',
534 default=self._get_unique_tag_val('distro'))
535 if not distro_name:
536 distro_name = 'fedora'
538 distro_version = self.params.get(
539 'distro_version',
540 default=self._get_unique_tag_val('distro_version'))
541 if not distro_version:
542 distro_version = '31'
544 self.distro = LinuxDistro(distro_name, distro_version, self.arch)
546 # The distro checksum behaves differently than distro name and
547 # version. First, it does not respect a tag with the same
548 # name, given that it's not expected to be used for filtering
549 # (distro name versions are the natural choice). Second, the
550 # order of precedence is: parameter, attribute and then value
551 # from KNOWN_DISTROS.
552 distro_checksum = self.params.get('distro_checksum',
553 default=None)
554 if distro_checksum:
555 self.distro.checksum = distro_checksum
557 def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'):
558 super().setUp()
559 self.require_netdev('user')
560 self._set_distro()
561 self.vm.add_args('-smp', self.smp)
562 self.vm.add_args('-m', self.memory)
563 # The following network device allows for SSH connections
564 self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22',
565 '-device', '%s,netdev=vnet' % network_device_type)
566 self.set_up_boot()
567 if ssh_pubkey is None:
568 ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys()
569 self.set_up_cloudinit(ssh_pubkey)
571 def set_up_existing_ssh_keys(self):
572 ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub')
573 source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa')
574 ssh_dir = os.path.join(self.workdir, '.ssh')
575 os.mkdir(ssh_dir, mode=0o700)
576 ssh_private_key = os.path.join(ssh_dir,
577 os.path.basename(source_private_key))
578 shutil.copyfile(source_private_key, ssh_private_key)
579 os.chmod(ssh_private_key, 0o600)
580 return (ssh_public_key, ssh_private_key)
582 def download_boot(self):
583 self.log.debug('Looking for and selecting a qemu-img binary to be '
584 'used to create the bootable snapshot image')
585 # If qemu-img has been built, use it, otherwise the system wide one
586 # will be used. If none is available, the test will cancel.
587 qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
588 if not os.path.exists(qemu_img):
589 qemu_img = find_command('qemu-img', False)
590 if qemu_img is False:
591 self.cancel('Could not find "qemu-img", which is required to '
592 'create the bootable image')
593 vmimage.QEMU_IMG = qemu_img
595 self.log.info('Downloading/preparing boot image')
596 # Fedora 31 only provides ppc64le images
597 image_arch = self.arch
598 if self.distro.name == 'fedora':
599 if image_arch == 'ppc64':
600 image_arch = 'ppc64le'
602 try:
603 boot = vmimage.get(
604 self.distro.name, arch=image_arch, version=self.distro.version,
605 checksum=self.distro.checksum,
606 algorithm='sha256',
607 cache_dir=self.cache_dirs[0],
608 snapshot_dir=self.workdir)
609 except:
610 self.cancel('Failed to download/prepare boot image')
611 return boot.path
613 def prepare_cloudinit(self, ssh_pubkey=None):
614 self.log.info('Preparing cloudinit image')
615 try:
616 cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso')
617 pubkey_content = None
618 if ssh_pubkey:
619 with open(ssh_pubkey) as pubkey:
620 pubkey_content = pubkey.read()
621 cloudinit.iso(cloudinit_iso, self.name,
622 username=self.username,
623 password=self.password,
624 # QEMU's hard coded usermode router address
625 phone_home_host='10.0.2.2',
626 phone_home_port=self.phone_server.server_port,
627 authorized_key=pubkey_content)
628 except Exception:
629 self.cancel('Failed to prepare the cloudinit image')
630 return cloudinit_iso
632 def set_up_boot(self):
633 path = self.download_boot()
634 self.vm.add_args('-drive', 'file=%s' % path)
636 def set_up_cloudinit(self, ssh_pubkey=None):
637 self.phone_server = cloudinit.PhoneHomeServer(('0.0.0.0', 0),
638 self.name)
639 cloudinit_iso = self.prepare_cloudinit(ssh_pubkey)
640 self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso)
642 def launch_and_wait(self, set_up_ssh_connection=True):
643 self.vm.set_console()
644 self.vm.launch()
645 console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(),
646 logger=self.log.getChild('console'))
647 console_drainer.start()
648 self.log.info('VM launched, waiting for boot confirmation from guest')
649 while not self.phone_server.instance_phoned_back:
650 self.phone_server.handle_request()
652 if set_up_ssh_connection:
653 self.log.info('Setting up the SSH connection')
654 self.ssh_connect(self.username, self.ssh_key)