Post-release version bump to 11.0.0
[libvirt.git] / tools / virt-qemu-sev-validate
blob115266976bbeab898add38ea7a7725cce5afad21
1 #!/usr/bin/env python3
3 # SPDX-License-Identifier: LGPL-2.1-or-later
5 # Validates a guest AMD SEV launch measurement
7 # A general principle in writing this tool is that it must calculate the
8 # expected measurement based entirely on information it receives on the CLI
9 # from the guest owner.
11 # It cannot generally trust information obtained from the guest XML or from the
12 # virtualization host OS. The main exceptions are:
14 #  - The guest measurement
16 #    This is a result of cryptographic operation using a shared secret known
17 #    only to the guest owner and SEV platform, not the host OS.
19 #  - The guest policy
21 #    This is encoded in the launch session blob that is encrypted with a shared
22 #    secret known only to the guest owner and SEV platform, not the host OS. It
23 #    is impossible for the host OS to maliciously launch a guest with different
24 #    policy and the user provided launch session blob.
26 #    CAVEAT: the user must ALWAYS create a launch blob with freshly generated
27 #    TIK/TEK for every new VM. Re-use of the same TIK/TEK for multiple VMs
28 #    is insecure.
30 #  - The SEV API version / build ID
32 #    This does not have an impact on the security of the measurement, unless
33 #    the guest owner needs a guarantee that the host is not using specific
34 #    firmware versions with known flaws.
37 import abc
38 import argparse
39 import hmac
40 import logging
41 import os
42 import re
43 import socket
44 import sys
45 import traceback
46 from base64 import b64decode, b64encode
47 from hashlib import sha256
48 from struct import pack
49 from uuid import UUID
51 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
53 import libvirt
55 from lxml import etree
57 log = logging.getLogger()
60 class AttestationFailedException(Exception):
61     pass
64 class UnsupportedUsageException(Exception):
65     pass
68 class InsecureUsageException(Exception):
69     pass
72 class IncorrectConfigException(Exception):
73     pass
76 class InvalidStateException(Exception):
77     pass
80 class Field(object):
81     U8 = 0
82     U16 = 2
83     U32 = 4
84     U64 = 8
86     SCALAR = 0
87     BITMASK = 1
88     ARRAY = 2
90     def __init__(self, name, size, fmt, value, order):
91         self.name = name
92         self.size = size
93         self.value = value
94         self.fmt = fmt
95         self.order = order
98 class Struct(object):
99     def __init__(self, size):
100         self._fields = {}
101         self.size = size
103     def register_field(self, name, size, fmt=Field.SCALAR, defvalue=0):
104         self._fields[name] = Field(name, size, fmt,
105                                    defvalue, len(self.fields))
107     @property
108     def fields(self):
109         return sorted(self._fields.values(), key=lambda f: f.order)
111     def __getattr__(self, name):
112         return self._fields[name]
114     def __setattr__(self, name, value):
115         if name in ["_fields", "size"]:
116             super().__setattr__(name, value)
117         else:
118             self._fields[name].value = value
120     def binary_format(self):
121         fmt = ["<"]
122         datalen = 0
123         for field in self.fields:
124             if field.size == Field.U8:
125                 if field.fmt == Field.ARRAY:
126                     datalen += len(field.value)
127                     fmt += ["%dB" % len(field.value)]
128                 else:
129                     datalen += 1
130                     fmt += ["B"]
131             elif field.size == Field.U16:
132                 datalen += 2
133                 fmt += ["H"]
134             elif field.size == Field.U32:
135                 datalen += 4
136                 fmt += ["L"]
137             elif field.size == Field.U64:
138                 datalen += 8
139                 fmt += ["Q"]
141         pad = self.size - datalen
142         assert self.size >= 1
143         fmt += ["%dB" % pad]
145         return "".join(fmt), pad
147     def pack(self):
148         fmt, pad = self.binary_format()
150         values = []
151         for field in self.fields:
152             if field.size == Field.U8 and field.fmt == Field.ARRAY:
153                 for _, k in enumerate(field.value):
154                     values.append(k)
155             else:
156                 values.append(field.value)
157         values.extend([0] * pad)
159         return pack(fmt, *values)
162 class VMSA(Struct):
163     ATTR_G_SHIFT = 23
164     ATTR_G_MASK = (1 << ATTR_G_SHIFT)
165     ATTR_B_SHIFT = 22
166     ATTR_B_MASK = (1 << ATTR_B_SHIFT)
167     ATTR_L_SHIFT = 21
168     ATTR_L_MASK = (1 << ATTR_L_SHIFT)
169     ATTR_AVL_SHIFT = 20
170     ATTR_AVL_MASK = (1 << ATTR_AVL_SHIFT)
171     ATTR_P_SHIFT = 15
172     ATTR_P_MASK = (1 << ATTR_P_SHIFT)
173     ATTR_DPL_SHIFT = 13
174     ATTR_DPL_MASK = (3 << ATTR_DPL_SHIFT)
175     ATTR_S_SHIFT = 12
176     ATTR_S_MASK = (1 << ATTR_S_SHIFT)
177     ATTR_TYPE_SHIFT = 8
178     ATTR_TYPE_MASK = (15 << ATTR_TYPE_SHIFT)
179     ATTR_A_MASK = (1 << 8)
181     ATTR_CS_MASK = (1 << 11)
182     ATTR_C_MASK = (1 << 10)
183     ATTR_R_MASK = (1 << 9)
185     ATTR_E_MASK = (1 << 10)
186     ATTR_W_MASK = (1 << 9)
188     def __init__(self):
189         super().__init__(4096)
191         # From Linux arch/x86/include/asm/svm.h, we're unpacking the
192         # struct vmcb_save_area
194         self.register_field("es_selector", Field.U16)
195         self.register_field("es_attrib", Field.U16, Field.BITMASK)
196         self.register_field("es_limit", Field.U32)
197         self.register_field("es_base", Field.U64)
199         self.register_field("cs_selector", Field.U16)
200         self.register_field("cs_attrib", Field.U16, Field.BITMASK)
201         self.register_field("cs_limit", Field.U32)
202         self.register_field("cs_base", Field.U64)
204         self.register_field("ss_selector", Field.U16)
205         self.register_field("ss_attrib", Field.U16, Field.BITMASK)
206         self.register_field("ss_limit", Field.U32)
207         self.register_field("ss_base", Field.U64)
209         self.register_field("ds_selector", Field.U16)
210         self.register_field("ds_attrib", Field.U16, Field.BITMASK)
211         self.register_field("ds_limit", Field.U32)
212         self.register_field("ds_base", Field.U64)
214         self.register_field("fs_selector", Field.U16)
215         self.register_field("fs_attrib", Field.U16, Field.BITMASK)
216         self.register_field("fs_limit", Field.U32)
217         self.register_field("fs_base", Field.U64)
219         self.register_field("gs_selector", Field.U16)
220         self.register_field("gs_attrib", Field.U16, Field.BITMASK)
221         self.register_field("gs_limit", Field.U32)
222         self.register_field("gs_base", Field.U64)
224         self.register_field("gdtr_selector", Field.U16)
225         self.register_field("gdtr_attrib", Field.U16, Field.BITMASK)
226         self.register_field("gdtr_limit", Field.U32)
227         self.register_field("gdtr_base", Field.U64)
229         self.register_field("ldtr_selector", Field.U16)
230         self.register_field("ldtr_attrib", Field.U16, Field.BITMASK)
231         self.register_field("ldtr_limit", Field.U32)
232         self.register_field("ldtr_base", Field.U64)
234         self.register_field("idtr_selector", Field.U16)
235         self.register_field("idtr_attrib", Field.U16, Field.BITMASK)
236         self.register_field("idtr_limit", Field.U32)
237         self.register_field("idtr_base", Field.U64)
239         self.register_field("tr_selector", Field.U16)
240         self.register_field("tr_attrib", Field.U16, Field.BITMASK)
241         self.register_field("tr_limit", Field.U32)
242         self.register_field("tr_base", Field.U64)
244         self.register_field("reserved_1",
245                             Field.U8, Field.ARRAY, bytearray([0] * 43))
247         self.register_field("cpl", Field.U8)
249         self.register_field("reserved_2",
250                             Field.U8, Field.ARRAY, bytearray([0] * 4))
252         self.register_field("efer", Field.U64)
254         self.register_field("reserved_3",
255                             Field.U8, Field.ARRAY, bytearray([0] * 104))
257         self.register_field("xss", Field.U64)
258         self.register_field("cr4", Field.U64)
259         self.register_field("cr3", Field.U64)
260         self.register_field("cr0", Field.U64)
261         self.register_field("dr7", Field.U64)
262         self.register_field("dr6", Field.U64)
263         self.register_field("rflags", Field.U64)
264         self.register_field("rip", Field.U64)
266         self.register_field("reserved_4",
267                             Field.U8, Field.ARRAY, bytearray([0] * 88))
269         self.register_field("rsp", Field.U64)
271         self.register_field("reserved_5",
272                             Field.U8, Field.ARRAY, bytearray([0] * 24))
274         self.register_field("rax", Field.U64)
275         self.register_field("star", Field.U64)
276         self.register_field("lstar", Field.U64)
277         self.register_field("cstar", Field.U64)
278         self.register_field("sfmask", Field.U64)
279         self.register_field("kernel_gs_base", Field.U64)
280         self.register_field("sysenter_cs", Field.U64)
281         self.register_field("sysenter_esp", Field.U64)
282         self.register_field("sysenter_eip", Field.U64)
283         self.register_field("cr2", Field.U64)
285         self.register_field("reserved_6",
286                             Field.U8, Field.ARRAY, bytearray([0] * 32))
288         self.register_field("g_pat", Field.U64)
289         self.register_field("dbgctl", Field.U64)
290         self.register_field("br_from", Field.U64)
291         self.register_field("br_to", Field.U64)
292         self.register_field("last_excp_from", Field.U64)
293         self.register_field("last_excp_to", Field.U64)
295         self.register_field("reserved_7",
296                             Field.U8, Field.ARRAY, bytearray([0] * 72))
298         self.register_field("spec_ctrl", Field.U32)
300         self.register_field("reserved_7b",
301                             Field.U8, Field.ARRAY, bytearray([0] * 4))
303         self.register_field("pkru", Field.U32)
305         self.register_field("reserved_7a",
306                             Field.U8, Field.ARRAY, bytearray([0] * 20))
308         self.register_field("reserved_8", Field.U64)  # rax duplicate
310         self.register_field("rcx", Field.U64)
311         self.register_field("rdx", Field.U64, Field.BITMASK)
312         self.register_field("rbx", Field.U64)
314         self.register_field("reserved_9", Field.U64)  # rsp duplicate
316         self.register_field("rbp", Field.U64)
317         self.register_field("rsi", Field.U64)
318         self.register_field("rdi", Field.U64)
319         self.register_field("r8", Field.U64)
320         self.register_field("r9", Field.U64)
321         self.register_field("r10", Field.U64)
322         self.register_field("r11", Field.U64)
323         self.register_field("r12", Field.U64)
324         self.register_field("r13", Field.U64)
325         self.register_field("r14", Field.U64)
326         self.register_field("r15", Field.U64)
328         self.register_field("reserved_10",
329                             Field.U8, Field.ARRAY, bytearray([0] * 16))
331         self.register_field("sw_exit_code", Field.U64)
332         self.register_field("sw_exit_info_1", Field.U64)
333         self.register_field("sw_exit_info_2", Field.U64)
334         self.register_field("sw_scratch", Field.U64)
336         self.register_field("reserved_11",
337                             Field.U8, Field.ARRAY, bytearray([0] * 56))
339         self.register_field("xcr0", Field.U64)
340         self.register_field("valid_bitmap",
341                             Field.U8, Field.ARRAY, bytearray([0] * 16))
342         self.register_field("x87_state_gpa",
343                             Field.U64)
345     def amd64_cpu_init(self):
346         # AMD64 Architecture Programmer’s Manual
347         # Volume 2: System Programming.
348         #
349         # 14.1.3 Processor Initialization State
350         #
351         # Values after INIT
353         self.cr0 = (1 << 4)
354         self.rip = 0xfff0
356         self.cs_selector = 0xf000
357         self.cs_base = 0xffff0000
358         self.cs_limit = 0xffff
360         self.ds_limit = 0xffff
362         self.es_limit = 0xffff
363         self.fs_limit = 0xffff
364         self.gs_limit = 0xffff
365         self.ss_limit = 0xffff
367         self.gdtr_limit = 0xffff
368         self.idtr_limit = 0xffff
370         self.ldtr_limit = 0xffff
371         self.tr_limit = 0xffff
373         self.dr6 = 0xffff0ff0
374         self.dr7 = 0x0400
375         self.rflags = 0x2
376         self.xcr0 = 0x1
378     def kvm_cpu_init(self):
379         # svm_set_cr4() sets guest X86_CR4_MCE bit if host
380         # has X86_CR4_MCE enabled
381         self.cr4 = 0x40
383         # svm_set_efer sets guest EFER_SVME (Secure Virtual Machine enable)
384         self.efer = 0x1000
386         # init_vmcb + init_sys_seg() sets
387         # SVM_SELECTOR_P_MASK | SEG_TYPE_LDT
388         self.ldtr_attrib = 0x0082
390         # init_vmcb + init_sys_seg() sets
391         # SVM_SELECTOR_P_MASK | SEG_TYPE_BUSY_TSS16
392         self.tr_attrib = 0x0083
394         # kvm_arch_vcpu_create() in arch/x86/kvm/x86.c
395         self.g_pat = 0x0007040600070406
397     def qemu_cpu_init(self):
398         # Based on logic in  x86_cpu_reset()
399         #
400         # file target/i386/cpu.c
402         def attr(mask):
403             return (mask >> VMSA.ATTR_TYPE_SHIFT)
405         self.ldtr_attrib = attr(VMSA.ATTR_P_MASK |
406                                 (2 << VMSA.ATTR_TYPE_SHIFT))
407         self.tr_attrib = attr(VMSA.ATTR_P_MASK |
408                               (11 << VMSA.ATTR_TYPE_SHIFT))
409         self.cs_attrib = attr(VMSA.ATTR_P_MASK |
410                               VMSA.ATTR_S_MASK |
411                               VMSA.ATTR_CS_MASK |
412                               VMSA.ATTR_R_MASK |
413                               VMSA.ATTR_A_MASK)
414         self.ds_attrib = attr(VMSA.ATTR_P_MASK |
415                               VMSA.ATTR_S_MASK |
416                               VMSA.ATTR_W_MASK |
417                               VMSA.ATTR_A_MASK)
418         self.es_attrib = attr(VMSA.ATTR_P_MASK |
419                               VMSA.ATTR_S_MASK |
420                               VMSA.ATTR_W_MASK |
421                               VMSA.ATTR_A_MASK)
422         self.ss_attrib = attr(VMSA.ATTR_P_MASK |
423                               VMSA.ATTR_S_MASK |
424                               VMSA.ATTR_W_MASK |
425                               VMSA.ATTR_A_MASK)
426         self.fs_attrib = attr(VMSA.ATTR_P_MASK |
427                               VMSA.ATTR_S_MASK |
428                               VMSA.ATTR_W_MASK |
429                               VMSA.ATTR_A_MASK)
430         self.gs_attrib = attr(VMSA.ATTR_P_MASK |
431                               VMSA.ATTR_S_MASK |
432                               VMSA.ATTR_W_MASK |
433                               VMSA.ATTR_A_MASK)
435         self.g_pat = 0x0007040600070406
437     def cpu_sku(self, family, model, stepping):
438         stepping &= 0xf
439         model &= 0xff
440         family &= 0xfff
442         self.rdx.value = stepping
444         if family > 0xf:
445             self.rdx.value |= 0xf00 | ((family - 0x0f) << 20)
446         else:
447             self.rdx.value |= family << 8
449         self.rdx.value |= ((model & 0xf) << 4) | ((model >> 4) << 16)
451     def reset_addr(self, reset_addr):
452         reset_cs = reset_addr & 0xffff0000
453         reset_ip = reset_addr & 0x0000ffff
455         self.rip.value = reset_ip
456         self.cs_base.value = reset_cs
459 class OVMF(object):
461     OVMF_TABLE_FOOTER_GUID = UUID("96b582de-1fb2-45f7-baea-a366c55a082d")
462     SEV_INFO_BLOCK_GUID = UUID("00f771de-1a7e-4fcb-890e-68c77e2fb44e")
464     def __init__(self):
465         self.entries = {}
467     def load(self, content):
468         expect = OVMF.OVMF_TABLE_FOOTER_GUID.bytes_le
469         actual = content[-48:-32]
470         if expect != actual:
471             raise Exception("OVMF footer GUID not found")
473         tablelen = int.from_bytes(content[-50:-48], byteorder='little')
475         if tablelen == 0:
476             raise Exception("OVMF tables zero length")
478         table = content[-(32 + tablelen):-50]
480         self.parse_table(table)
482     def parse_table(self, data):
483         while len(data) > 0:
484             entryuuid = UUID(bytes_le=data[-16:])
485             entrylen = int.from_bytes(data[-18:-16], byteorder='little')
486             entrydata = data[-entrylen:-18]
488             self.entries[str(entryuuid)] = entrydata
490             data = data[0:-entrylen]
492     def reset_addr(self):
493         if str(OVMF.SEV_INFO_BLOCK_GUID) not in self.entries:
494             raise Exception("SEV info block GUID not found")
496         reset_addr = int.from_bytes(
497             self.entries[str(OVMF.SEV_INFO_BLOCK_GUID)], "little")
498         return reset_addr
501 class GUIDTable(abc.ABC):
502     GUID_LEN = 16
504     def __init__(self, guid, lenlen=2):
505         self.guid = guid
506         self.lenlen = lenlen
508     @abc.abstractmethod
509     def entries(self):
510         pass
512     def build_entry(self, guid, payload, lenlen):
513         dummylen = int(0).to_bytes(lenlen, 'little')
514         entry = bytearray(guid + dummylen + payload)
516         lenle = len(entry).to_bytes(lenlen, 'little')
517         entry[self.GUID_LEN:(self.GUID_LEN + lenlen)] = lenle
519         return bytes(entry)
521     def build(self):
522         payload = self.entries()
524         if len(payload) == 0:
525             return bytes([])
527         dummylen = int(0).to_bytes(self.lenlen, 'little')
528         table = bytearray(self.guid + dummylen + payload)
530         guidlen = len(table).to_bytes(self.lenlen, 'little')
531         table[self.GUID_LEN:(self.GUID_LEN + self.lenlen)] = guidlen
533         pad = 16 - (len(table) % 16)
534         table += bytes([0]) * pad
536         log.debug("Table(hex): %s", bytes(table).hex())
537         return bytes(table)
540 class KernelTable(GUIDTable):
542     TABLE_GUID = UUID('{9438d606-4f22-4cc9-b479-a793-d411fd21}').bytes_le
543     KERNEL_GUID = UUID('{4de79437-abd2-427f-b835-d5b1-72d2045b}').bytes_le
544     INITRD_GUID = UUID('{44baf731-3a2f-4bd7-9af1-41e2-9169781d}').bytes_le
545     CMDLINE_GUID = UUID('{97d02dd8-bd20-4c94-aa78-e771-4d36ab2a}').bytes_le
547     def __init__(self):
548         super().__init__(guid=self.TABLE_GUID,
549                          lenlen=2)
551         self.kernel = None
552         self.initrd = None
553         self.cmdline = None
555     def load_kernel(self, path):
556         with open(path, "rb") as fh:
557             self.kernel = sha256(fh.read()).digest()
559     def load_initrd(self, path):
560         with open(path, "rb") as fh:
561             self.initrd = sha256(fh.read()).digest()
563     def load_cmdline(self, val):
564         self.cmdline = sha256(val.encode("utf8") + bytes([0])).digest()
566     def entries(self):
567         entries = bytes([])
568         if self.kernel is None:
569             return entries
571         if self.initrd is None:
572             self.initrd = sha256(bytes([])).digest()
573         if self.cmdline is None:
574             self.cmdline = sha256(bytes([0])).digest()
576         log.debug("Kernel(sha256): %s", self.kernel.hex())
577         log.debug("Initrd(sha256): %s", self.initrd.hex())
578         log.debug("Cmdline(sha256): %s", self.cmdline.hex())
579         entries += self.build_entry(self.CMDLINE_GUID, self.cmdline, 2)
580         entries += self.build_entry(self.INITRD_GUID, self.initrd, 2)
581         entries += self.build_entry(self.KERNEL_GUID, self.kernel, 2)
583         return entries
586 class SecretsTable(GUIDTable):
588     TABLE_GUID = UUID('{1e74f542-71dd-4d66-963e-ef4287ff173b}').bytes_le
590     GUID_ALIASES = {
591         "luks-key": UUID('{736869e5-84f0-4973-92ec-06879ce3da0b}')
592     }
594     def __init__(self):
595         super().__init__(guid=self.TABLE_GUID,
596                          lenlen=4)
597         self.secrets = {}
599     def load_secret(self, alias_or_guid, path):
600         guid = None
601         if re.match(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$",
602                     alias_or_guid):
603             guid = UUID(alias_or_guid)
604         else:
605             if alias_or_guid not in self.GUID_ALIASES:
606                 raise UnsupportedUsageException(
607                     "Secret alias '%s' is not known" % alias_or_guid)
609             guid = self.GUID_ALIASES[alias_or_guid]
611         if guid in self.secrets:
612             raise UnsupportedUsageException(
613                 "Secret for GUID %s already loaded" % guid)
615         with open(path, 'rb') as fh:
616             self.secrets[guid] = fh.read()
618     def entries(self):
619         entries = bytes([])
620         for guid, value in self.secrets.items():
621             entries += self.build_entry(guid.bytes_le, value, 4)
622         return entries
625 class ConfidentialVM(abc.ABC):
626     POLICY_BIT_SEV_ES = 2
627     POLICY_VAL_SEV_ES = (1 << POLICY_BIT_SEV_ES)
629     def __init__(self,
630                  measurement=None,
631                  api_major=None,
632                  api_minor=None,
633                  build_id=None,
634                  policy=None,
635                  num_cpus=None):
636         self.measurement = measurement
637         self.api_major = api_major
638         self.api_minor = api_minor
639         self.build_id = build_id
640         self.policy = policy
642         self.firmware = None
643         self.tik = None
644         self.tek = None
646         self.num_cpus = num_cpus
647         self.vmsa_cpu0 = None
648         self.vmsa_cpu1 = None
650         self.kernel_table = KernelTable()
651         self.secrets_table = SecretsTable()
653     def is_sev_es(self):
654         return self.policy & self.POLICY_VAL_SEV_ES
656     def load_tik_tek(self, tik_path, tek_path):
657         with open(tik_path, 'rb') as fh:
658             self.tik = fh.read()
659         log.debug("TIK(hex): %s", self.tik.hex())
661         if len(self.tik) != 16:
662             raise UnsupportedUsageException(
663                 "Expected 16 bytes in TIK file, but got %d" % len(self.tik))
665         with open(tek_path, 'rb') as fh:
666             self.tek = fh.read()
667         log.debug("TEK(hex): %s", self.tek.hex())
669         if len(self.tek) != 16:
670             raise UnsupportedUsageException(
671                 "Expected 16 bytes in TEK file, but got %d" % len(self.tek))
673     def load_tk(self, tk_path):
674         with open(tk_path, 'rb') as fh:
675             tk = fh.read()
677         if len(tk) != 32:
678             raise UnsupportedUsageException(
679                 "Expected 32 bytes in TIK/TEK file, but got %d" % len(tk))
681         self.tek = tk[0:16]
682         self.tik = tk[16:32]
683         log.debug("TIK(hex): %s", self.tik.hex())
684         log.debug("TEK(hex): %s", self.tek.hex())
686     def load_firmware(self, firmware_path):
687         with open(firmware_path, 'rb') as fh:
688             self.firmware = fh.read()
689         log.debug("Firmware(sha256): %s", sha256(self.firmware).hexdigest())
691     @staticmethod
692     def _load_vmsa(path):
693         with open(path, 'rb') as fh:
694             vmsa = fh.read()
696         if len(vmsa) != 4096:
697             raise UnsupportedUsageException(
698                 "VMSA must be 4096 bytes in length")
699         return vmsa
701     def load_vmsa_cpu0(self, path):
702         self.vmsa_cpu0 = self._load_vmsa(path)
703         log.debug("VMSA CPU 0(sha256): %s",
704                   sha256(self.vmsa_cpu0).hexdigest())
706     def load_vmsa_cpu1(self, path):
707         self.vmsa_cpu1 = self._load_vmsa(path)
708         log.debug("VMSA CPU 1(sha256): %s",
709                   sha256(self.vmsa_cpu1).hexdigest())
711     def build_vmsas(self, family, model, stepping):
712         ovmf = OVMF()
713         ovmf.load(self.firmware)
715         vmsa = VMSA()
716         vmsa.amd64_cpu_init()
717         vmsa.kvm_cpu_init()
718         vmsa.qemu_cpu_init()
720         vmsa.cpu_sku(family, model, stepping)
722         self.vmsa_cpu0 = vmsa.pack()
723         log.debug("VMSA CPU 0(sha256): %s",
724                   sha256(self.vmsa_cpu0).hexdigest())
726         vmsa.reset_addr(ovmf.reset_addr())
727         self.vmsa_cpu1 = vmsa.pack()
728         log.debug("VMSA CPU 1(sha256): %s",
729                   sha256(self.vmsa_cpu1).hexdigest())
731     def get_cpu_state(self):
732         if self.num_cpus is None:
733             raise UnsupportedUsageException(
734                 "Number of virtual CPUs must be specified for SEV-ES domain")
736         if self.vmsa_cpu0 is None:
737             raise UnsupportedUsageException(
738                 "VMSA for boot CPU required for SEV-ES domain")
740         if self.num_cpus > 1 and self.vmsa_cpu1 is None:
741             raise UnsupportedUsageException(
742                 "VMSA for additional CPUs required for SEV-ES domain with SMP")
744         vmsa = self.vmsa_cpu0 + (self.vmsa_cpu1 * (self.num_cpus - 1))
745         log.debug("VMSA(sha256): %s", sha256(vmsa).hexdigest())
746         return vmsa
748     # Get the full set of measured launch data for the domain
749     #
750     # The measured data that the guest is initialized with is the concatenation
751     # of the following:
752     #
753     #  - The firmware blob
754     #  - The kernel GUID table
755     def get_measured_data(self):
756         measured_data = (self.firmware +
757                          self.kernel_table.build())
758         if self.is_sev_es():
759             measured_data += self.get_cpu_state()
760         log.debug("Measured-data(sha256): %s",
761                   sha256(measured_data).hexdigest())
762         return measured_data
764     # Get the reported and computed launch measurements for the domain
765     #
766     # AMD Secure Encrypted Virtualization API , section 6.5:
767     #
768     # measurement = HMAC(0x04 || API_MAJOR || API_MINOR || BUILD ||
769     #                    GCTX.POLICY || GCTX.LD || MNONCE; GCTX.TIK)
770     #
771     # Where GCTX.LD covers all the measured data the guest is initialized with
772     # per get_measured_data().
773     def get_measurements(self):
774         measurement = b64decode(self.measurement)
775         reported = measurement[0:32]
776         nonce = measurement[32:48]
778         measured_data = self.get_measured_data()
779         msg = (
780             bytes([0x4]) +
781             self.api_major.to_bytes(1, 'little') +
782             self.api_minor.to_bytes(1, 'little') +
783             self.build_id.to_bytes(1, 'little') +
784             self.policy.to_bytes(4, 'little') +
785             sha256(measured_data).digest() +
786             nonce
787         )
788         log.debug("Measured-msg(hex): %s", msg.hex())
790         computed = hmac.new(self.tik, msg, 'sha256').digest()
792         log.debug("Measurement reported(hex): %s", reported.hex())
793         log.debug("Measurement computed(hex): %s", computed.hex())
795         return reported, computed
797     def attest(self):
798         reported, computed = self.get_measurements()
800         if reported != computed:
801             raise AttestationFailedException(
802                 "Measurement does not match, VM is not trustworthy")
804     def build_secrets(self):
805         measurement, _ = self.get_measurements()
807         iv = os.urandom(16)
809         secret_table = self.secrets_table.build()
811         cipher = Cipher(algorithms.AES(self.tek), modes.CTR(iv))
812         enc = cipher.encryptor()
813         secret_table_ciphertext = (enc.update(secret_table) +
814                                    enc.finalize())
816         flags = 0
818         ##
819         # Table 55. LAUNCH_SECRET Packet Header Buffer
820         ##
821         header = (
822             flags.to_bytes(4, byteorder='little') +
823             iv
824         )
826         # AMD Secure Encrypted Virtualization API , section 6.6
827         #
828         #  hdrmac = HMAC(0x01 || FLAGS || IV || GUEST_LENGTH ||
829         #                TRANS_LENGTH || DATA ||
830         #                MEASURE; GCTX.TIK)
831         #
832         msg = (
833             bytes([0x01]) +
834             flags.to_bytes(4, byteorder='little') +
835             iv +
836             len(secret_table).to_bytes(4, byteorder='little') +
837             len(secret_table).to_bytes(4, byteorder='little') +
838             secret_table_ciphertext +
839             measurement
840         )
842         h = hmac.new(self.tik, msg, 'sha256')
843         header = (
844             flags.to_bytes(4, byteorder='little') +
845             iv +
846             h.digest()
847         )
849         header64 = b64encode(header).decode('utf8')
850         secret64 = b64encode(secret_table_ciphertext).decode('utf8')
851         log.debug("Header: %s (%d bytes)", header64, len(header))
852         log.debug("Secret: %s (%d bytes)",
853                   secret64, len(secret_table_ciphertext))
855         return header64, secret64
857     @abc.abstractmethod
858     def inject_secrets(self):
859         pass
862 class OfflineConfidentialVM(ConfidentialVM):
863     def __init__(self,
864                  secret_header=None,
865                  secret_payload=None,
866                  **kwargs):
867         super().__init__(**kwargs)
869         self.secret_header = secret_header
870         self.secret_payload = secret_payload
872     def inject_secrets(self):
873         header64, secret64 = self.build_secrets()
875         with open(self.secret_header, "wb") as fh:
876             fh.write(header64.encode('utf8'))
877         with open(self.secret_payload, "wb") as fh:
878             fh.write(secret64.encode('utf8'))
881 class LibvirtConfidentialVM(ConfidentialVM):
882     def __init__(self, **kwargs):
883         super().__init__(**kwargs)
885         self.conn = None
886         self.dom = None
888     def check_domain(self, doc, secure):
889         ls = doc.xpath("/domain/launchSecurity[@type='sev']")
890         if len(ls) != 1:
891             raise IncorrectConfigException(
892                 "Domain is not configured with SEV launch security")
894         dh = doc.xpath("/domain/launchSecurity[@type='sev']/dhCert")
895         if len(dh) != 1:
896             raise IncorrectConfigException(
897                 "Domain must have SEV owner cert to validate measurement")
899         session = doc.xpath("/domain/launchSecurity[@type='sev']/session")
900         if len(session) != 1:
901             raise IncorrectConfigException(
902                 "Domain must have SEV session data to validate measurement")
904         nvramnodes = doc.xpath("/domain/os/nvram")
905         if len(nvramnodes) != 0 and secure:
906             raise InsecureUsageException(
907                 "Domain firmware with NVRAM cannot be securely measured")
909         loadernodes = doc.xpath("/domain/os/loader")
910         if len(loadernodes) != 1:
911             raise IncorrectConfigException(
912                 "Domain must have one firmware path")
914         measure_kernel_nodes = doc.xpath(
915             "/domain/launchSecurity[@type='sev']/@kernelHashes")
916         measure_kernel = False
917         if len(measure_kernel_nodes) == 1:
918             if measure_kernel_nodes[0] == "yes":
919                 measure_kernel = True
921         xp_kernel = "/domain/os/kernel"
922         xp_initrd = "/domain/os/initrd"
923         xp_cmdline = "/domain/os/cmdline"
924         kern_nodes = (doc.xpath(xp_kernel) +
925                       doc.xpath(xp_initrd) +
926                       doc.xpath(xp_cmdline))
927         if not measure_kernel:
928             if len(self.kernel_table.entries()) != 0:
929                 raise UnsupportedUsageException(
930                     "kernel/initrd/cmdline provided but kernel "
931                     "measurement not enabled")
933             # Check for an insecure scenario
934             if len(kern_nodes) != 0 and secure:
935                 raise InsecureUsageException(
936                     "direct kernel boot present without measurement")
937         else:
938             if len(kern_nodes) == 0:
939                 raise IncorrectConfigException(
940                     "kernel/initrd/cmdline not provided but kernel "
941                     "measurement is enabled")
943     def load_domain(self, uri, id_name_uuid, build_vmsa, secure, ignore_config):
944         self.conn = libvirt.open(uri)
946         remote = socket.getfqdn() != self.conn.getHostname()
947         if not remote and secure:
948             raise InsecureUsageException(
949                 "running locally on the hypervisor host is not secure")
951         if re.match(r'^\d+$', id_name_uuid):
952             self.dom = self.conn.lookupByID(int(id_name_uuid))
953         elif re.match(r'^[-a-f0-9]+$', id_name_uuid):
954             self.dom = self.conn.lookupByUUIDString(id_name_uuid)
955         else:
956             self.dom = self.conn.lookupByName(id_name_uuid)
958         log.debug("VM: id=%d name=%s uuid=%s",
959                   self.dom.ID(), self.dom.name(), self.dom.UUIDString())
961         if not self.dom.isActive():
962             raise InvalidStateException(
963                 "Domain must be running to validate measurement")
965         state = self.dom.info()[0]
966         if state != libvirt.VIR_DOMAIN_PAUSED and secure:
967             raise InvalidStateException(
968                 "Domain must be paused to validate measurement")
970         xml = self.dom.XMLDesc()
972         doc = etree.fromstring(xml)
973         if not ignore_config:
974             self.check_domain(doc, secure)
976         # See comments at top of file wrt why we are OK to trust the
977         # sev-api-major, sev-api-minor, sev-build-id and sev-policy data
978         # reported by the host
979         sevinfo = self.dom.launchSecurityInfo()
981         if "sev-api-major" not in sevinfo:
982             raise UnsupportedUsageException(
983                 "'api-major' not reported in domain launch security info")
985         if self.measurement is None:
986             self.measurement = sevinfo["sev-measurement"]
987         if self.api_major is None:
988             self.api_major = sevinfo["sev-api-major"]
989         if self.api_minor is None:
990             self.api_minor = sevinfo["sev-api-minor"]
991         if self.build_id is None:
992             self.build_id = sevinfo["sev-build-id"]
993         if self.policy is None:
994             self.policy = sevinfo["sev-policy"]
996         if self.is_sev_es() and self.num_cpus is None:
997             if secure:
998                 raise InsecureUsageException(
999                     "Using CPU count from guest is not secure")
1001             info = self.dom.info()
1002             self.num_cpus = info[3]
1004         if self.firmware is None:
1005             if remote:
1006                 raise UnsupportedUsageException(
1007                     "Cannot access firmware path remotely")
1008             if secure:
1009                 raise InsecureUsageException(
1010                     "Using firmware path from XML is not secure")
1012             loadernodes = doc.xpath("/domain/os/loader")
1013             if len(loadernodes) == 0:
1014                 raise UnsupportedUsageException(
1015                     "--firmware not specified and no firmware path found")
1017             self.load_firmware(loadernodes[0].text)
1019         if self.kernel_table.kernel is None:
1020             kernelnodes = doc.xpath("/domain/os/kernel")
1021             if len(kernelnodes) != 0:
1022                 if remote:
1023                     raise UnsupportedUsageException(
1024                         "Cannot access kernel path remotely")
1025                 if secure:
1026                     raise InsecureUsageException(
1027                         "Using kernel path from XML is not secure")
1028                 self.kernel_table.load_kernel(kernelnodes[0].text)
1030         if self.kernel_table.initrd is None:
1031             initrdnodes = doc.xpath("/domain/os/initrd")
1032             if len(initrdnodes) != 0:
1033                 if remote:
1034                     raise UnsupportedUsageException(
1035                         "Cannot access initrd path remotely")
1036                 if secure:
1037                     raise InsecureUsageException(
1038                         "Using initrd path from XML is not secure")
1039                 self.kernel_table.load_initrd(initrdnodes[0].text)
1041         if self.kernel_table.cmdline is None:
1042             cmdlinenodes = doc.xpath("/domain/os/cmdline")
1043             if len(cmdlinenodes) != 0:
1044                 if secure:
1045                     raise InsecureUsageException(
1046                         "Using cmdline string from XML is not secure")
1047                 self.kernel_table.load_cmdline(cmdlinenodes[0].text)
1049         capsxml = self.conn.getCapabilities()
1050         capsdoc = etree.fromstring(capsxml)
1052         if self.is_sev_es() and build_vmsa:
1053             if secure:
1054                 raise InsecureUsageException(
1055                     "Using CPU SKU from capabilities is not secure")
1057             mode = doc.xpath("string(/domain/cpu/@mode)")
1058             if mode != "host-passthrough":
1059                 raise UnsupportedUsageException(
1060                     "Using CPU family/model/stepping from host not possible unless 'host-passthrough' is used")
1062             sig = capsdoc.xpath("/capabilities/host/cpu/signature")
1063             if len(sig) != 1:
1064                 raise UnsupportedUsageException(
1065                     "Libvirt is too old to report host CPU signature")
1067             cpu_family = int(sig[0].get("family"))
1068             cpu_model = int(sig[0].get("model"))
1069             cpu_stepping = int(sig[0].get("stepping"))
1070             self.build_vmsas(cpu_family, cpu_model, cpu_stepping)
1072     def inject_secrets(self):
1073         header64, secret64 = self.build_secrets()
1075         params = {"sev-secret": secret64,
1076                   "sev-secret-header": header64}
1077         self.dom.setLaunchSecurityState(params, 0)
1078         self.dom.resume()
1081 def parse_command_line():
1082     parser = argparse.ArgumentParser(
1083         description='Validate guest AMD SEV launch measurement')
1084     parser.add_argument('--debug', '-d', action='store_true',
1085                         help='Show debug information')
1086     parser.add_argument('--quiet', '-q', action='store_true',
1087                         help='Do not display status')
1089     # Arguments related to the state of the launched guest
1090     vmstate = parser.add_argument_group("Virtual machine launch state")
1091     vmstate.add_argument('--measurement', '-m',
1092                          help='Measurement for the running domain')
1093     vmstate.add_argument('--api-major', type=int,
1094                          help='SEV API major version for the running domain')
1095     vmstate.add_argument('--api-minor', type=int,
1096                          help='SEV API minor version for the running domain')
1097     vmstate.add_argument('--build-id', type=int,
1098                          help='SEV build ID for the running domain')
1099     vmstate.add_argument('--policy', type=int,
1100                          help='SEV policy for the running domain')
1102     # Arguments related to calculation of the expected launch measurement
1103     vmconfig = parser.add_argument_group("Virtual machine config")
1104     vmconfig.add_argument('--firmware', '-f',
1105                           help='Path to the firmware binary')
1106     vmconfig.add_argument('--kernel', '-k',
1107                           help='Path to the kernel binary')
1108     vmconfig.add_argument('--initrd', '-r',
1109                           help='Path to the initrd binary')
1110     vmconfig.add_argument('--cmdline', '-e',
1111                           help='Cmdline string booted with')
1112     vmconfig.add_argument('--num-cpus', '-n', type=int,
1113                           help='Number of virtual CPUs')
1114     vmconfig.add_argument('--vmsa-cpu0', '-0',
1115                           help='VMSA state for the boot CPU')
1116     vmconfig.add_argument('--vmsa-cpu1', '-1',
1117                           help='VMSA state for the additional CPUs')
1118     vmconfig.add_argument('--cpu-family', type=int,
1119                           help='Hypervisor host CPU family number')
1120     vmconfig.add_argument('--cpu-model', type=int,
1121                           help='Hypervisor host CPU model number')
1122     vmconfig.add_argument('--cpu-stepping', type=int,
1123                           help='Hypervisor host CPU stepping number')
1124     vmconfig.add_argument('--tik',
1125                           help='TIK file for domain')
1126     vmconfig.add_argument('--tek',
1127                           help='TEK file for domain')
1128     vmconfig.add_argument('--tk',
1129                           help='TEK/TIK combined file for domain')
1131     # Arguments related to the connection to libvirt
1132     vmconn = parser.add_argument_group("Libvirt guest connection")
1133     vmconn.add_argument('--connect', '-c', default="qemu:///system",
1134                         help='libvirt connection URI')
1135     vmconn.add_argument('--domain', '-o',
1136                         help='domain ID / Name / UUID')
1137     vmconn.add_argument('--insecure', '-i', action='store_true',
1138                         help='Proceed even if usage scenario is insecure')
1139     vmconn.add_argument('--ignore-config', '-g', action='store_true',
1140                         help='Do not attempt to sanity check the guest config')
1142     # Arguments related to secret injection
1143     inject = parser.add_argument_group("Secret injection parameters")
1144     inject.add_argument('--inject-secret', '-s', action='append', default=[],
1145                         help='ALIAS-OR-GUID:PATH file containing secret to inject')
1146     inject.add_argument('--secret-payload',
1147                         help='Path to file to write secret data payload to')
1148     inject.add_argument('--secret-header',
1149                         help='Path to file to write secret data header to')
1151     return parser.parse_args()
1154 # Sanity check the set of CLI args specified provide enough info for us to do
1155 # the job
1156 def check_usage(args):
1157     if args.tk is not None:
1158         if args.tik is not None or args.tek is not None:
1159             raise UnsupportedUsageException(
1160                 "--tk is mutually exclusive with --tek/--tik")
1161     else:
1162         if args.tik is None or args.tek is None:
1163             raise UnsupportedUsageException(
1164                 "Either --tk or both of --tek/--tik are required")
1166     if args.domain is None:
1167         if args.measurement is None:
1168             raise UnsupportedUsageException(
1169                 "Either --measurement or --domain is required")
1171         if args.api_major is None:
1172             raise UnsupportedUsageException(
1173                 "Either --api-major or --domain is required")
1175         if args.api_minor is None:
1176             raise UnsupportedUsageException(
1177                 "Either --api-minor or --domain is required")
1179         if args.build_id is None:
1180             raise UnsupportedUsageException(
1181                 "Either --build-id or --domain is required")
1183         if args.policy is None:
1184             raise UnsupportedUsageException(
1185                 "Either --policy or --domain is required")
1187         if args.firmware is None:
1188             raise UnsupportedUsageException(
1189                 "Either --firmware or --domain is required")
1191         if len(args.inject_secret) > 0:
1192             if args.secret_header is None:
1193                 raise UnsupportedUsageException(
1194                     "Either --secret-header or --domain is required")
1196             if args.secret_payload is None:
1197                 raise UnsupportedUsageException(
1198                     "Either --secret-payload or --domain is required")
1200     if args.kernel is None:
1201         if args.initrd is not None or args.cmdline is not None:
1202             raise UnsupportedUsageException(
1203                 "--initrd/--cmdline require --kernel")
1205     sku = [args.cpu_family, args.cpu_model, args.cpu_stepping]
1206     if sku.count(None) == len(sku):
1207         if args.vmsa_cpu1 is not None and args.vmsa_cpu0 is None:
1208             raise UnsupportedUsageException(
1209                 "VMSA for additional CPU also requires VMSA for boot CPU")
1210     else:
1211         if args.vmsa_cpu0 is not None or args.vmsa_cpu1 is not None:
1212             raise UnsupportedUsageException(
1213                 "VMSA files are mutually exclusive with CPU SKU")
1215         if sku.count(None) != 0:
1216             raise UnsupportedUsageException(
1217                 "CPU SKU needs family, model and stepping for SEV-ES domain")
1219     secret = [args.secret_payload, args.secret_header]
1220     if secret.count(None) > 0 and secret.count(None) != len(secret):
1221         raise UnsupportedUsageException(
1222             "Both --secret-payload and --secret-header are required")
1225 def attest(args):
1226     if args.domain is None:
1227         cvm = OfflineConfidentialVM(measurement=args.measurement,
1228                                     api_major=args.api_major,
1229                                     api_minor=args.api_minor,
1230                                     build_id=args.build_id,
1231                                     policy=args.policy,
1232                                     num_cpus=args.num_cpus,
1233                                     secret_header=args.secret_header,
1234                                     secret_payload=args.secret_payload)
1235     else:
1236         cvm = LibvirtConfidentialVM(measurement=args.measurement,
1237                                     api_major=args.api_major,
1238                                     api_minor=args.api_minor,
1239                                     build_id=args.build_id,
1240                                     policy=args.policy,
1241                                     num_cpus=args.num_cpus)
1243     if args.firmware is not None:
1244         cvm.load_firmware(args.firmware)
1246     if args.tk is not None:
1247         cvm.load_tk(args.tk)
1248     else:
1249         cvm.load_tik_tek(args.tik, args.tek)
1251     if args.kernel is not None:
1252         cvm.kernel_table.load_kernel(args.kernel)
1254     if args.initrd is not None:
1255         cvm.kernel_table.load_initrd(args.initrd)
1257     if args.cmdline is not None:
1258         cvm.kernel_table.load_cmdline(args.cmdline)
1260     if args.vmsa_cpu0 is not None:
1261         cvm.load_vmsa_cpu0(args.vmsa_cpu0)
1263     if args.vmsa_cpu1 is not None:
1264         cvm.load_vmsa_cpu1(args.vmsa_cpu1)
1266     if args.domain is not None:
1267         build_vmsa = args.vmsa_cpu0 is None and args.cpu_family is None
1268         cvm.load_domain(args.connect,
1269                         args.domain,
1270                         build_vmsa,
1271                         not args.insecure,
1272                         args.ignore_config)
1274     if args.cpu_family is not None:
1275         cvm.build_vmsas(args.cpu_family,
1276                         args.cpu_model,
1277                         args.cpu_stepping)
1279     cvm.attest()
1280     if not args.quiet:
1281         print("OK: Looks good to me")
1283     for secret in args.inject_secret:
1284         bits = secret.split(":")
1285         if len(bits) != 2:
1286             raise UnsupportedUsageException(
1287                 "Expecting ALIAS-OR-GUID:PATH for injected secret")
1289         cvm.secrets_table.load_secret(bits[0], bits[1])
1291     if len(args.inject_secret) > 0:
1292         cvm.inject_secrets()
1293         if not args.quiet:
1294             print("OK: Injected %d secrets" % len(args.inject_secret))
1297 def main():
1298     args = parse_command_line()
1299     if args.debug:
1300         logging.basicConfig(level="DEBUG")
1301         formatter = logging.Formatter("[%(levelname)s]: %(message)s")
1302         handler = log.handlers[0]
1303         handler.setFormatter(formatter)
1305     try:
1306         check_usage(args)
1308         attest(args)
1310         sys.exit(0)
1311     except AttestationFailedException as e:
1312         if args.debug:
1313             traceback.print_tb(e.__traceback__)
1314         if not args.quiet:
1315             print("ERROR: %s" % e, file=sys.stderr)
1316         sys.exit(1)
1317     except UnsupportedUsageException as e:
1318         if args.debug:
1319             traceback.print_tb(e.__traceback__)
1320         if not args.quiet:
1321             print("ERROR: %s" % e, file=sys.stderr)
1322         sys.exit(2)
1323     except InsecureUsageException as e:
1324         if not args.quiet:
1325             print("ERROR: %s" % e, file=sys.stderr)
1326         sys.exit(3)
1327     except IncorrectConfigException as e:
1328         if not args.quiet:
1329             print("ERROR: %s" % e, file=sys.stderr)
1330         sys.exit(4)
1331     except InvalidStateException as e:
1332         if not args.quiet:
1333             print("ERROR: %s" % e, file=sys.stderr)
1334         sys.exit(5)
1335     except Exception as e:
1336         if args.debug:
1337             traceback.print_tb(e.__traceback__)
1338         if not args.quiet:
1339             print("ERROR: %s" % e, file=sys.stderr)
1340         sys.exit(6)
1343 if __name__ == "__main__":
1344     main()