1 # Library for manipulations with qcow2 image
3 # Copyright (c) 2020 Virtuozzo International GmbH.
4 # Copyright (C) 2012 Red Hat, Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 def __init__(self
, value
):
30 return str(self
.value
)
33 class Flags64(Qcow2Field
):
38 if self
.value
& (1 << bit
):
43 class Enum(Qcow2Field
):
46 return f
'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
49 class Qcow2StructMeta(type):
51 # Mapping from c types to python struct format
59 def __init__(self
, name
, bases
, attrs
):
61 self
.fmt
= '>' + ''.join(self
.ctypes
[f
[0]] for f
in self
.fields
)
64 class Qcow2Struct(metaclass
=Qcow2StructMeta
):
66 """Qcow2Struct: base class for qcow2 data structures
68 Successors should define fields class variable, which is: list of tuples,
69 each of three elements:
70 - c-type (one of 'u8', 'u16', 'u32', 'u64')
71 - format (format_spec to use with .format() when dump or 'mask' to dump
76 def __init__(self
, fd
=None, offset
=None, data
=None):
79 1. Specify data. fd and offset must be None.
80 2. Specify fd and offset, data must be None. offset may be omitted
81 in this case, than current position of fd is used.
85 buf_size
= struct
.calcsize(self
.fmt
)
86 if offset
is not None:
88 data
= fd
.read(buf_size
)
90 assert fd
is None and offset
is None
92 values
= struct
.unpack(self
.fmt
, data
)
93 self
.__dict
__ = dict((field
[2], values
[i
])
94 for i
, field
in enumerate(self
.fields
))
98 value
= self
.__dict
__[f
[2]]
99 if isinstance(f
[1], str):
100 value_str
= f
[1].format(value
)
102 value_str
= str(f
[1](value
))
104 print('{:<25} {}'.format(f
[2], value_str
))
107 class Qcow2BitmapExt(Qcow2Struct
):
110 ('u32', '{}', 'nb_bitmaps'),
111 ('u32', '{}', 'reserved32'),
112 ('u64', '{:#x}', 'bitmap_directory_size'),
113 ('u64', '{:#x}', 'bitmap_directory_offset')
117 QCOW2_EXT_MAGIC_BITMAPS
= 0x23852875
120 class QcowHeaderExtension(Qcow2Struct
):
124 0xe2792aca: 'Backing format',
125 0x6803f857: 'Feature table',
126 0x0537be77: 'Crypto header',
127 QCOW2_EXT_MAGIC_BITMAPS
: 'Bitmaps',
128 0x44415441: 'Data file'
132 ('u32', Magic
, 'magic'),
133 ('u32', '{}', 'length')
134 # length bytes of data follows
135 # then padding to next multiply of 8
138 def __init__(self
, magic
=None, length
=None, data
=None, fd
=None):
140 Support both loading from fd and creation from user data.
141 For fd-based creation current position in a file will be used to read
144 This should be somehow refactored and functionality should be moved to
145 superclass (to allow creation of any qcow2 struct), but then, fields
146 of variable length (data here) should be supported in base class
147 somehow. Note also, that we probably want to parse different
148 extensions. Should they be subclasses of this class, or how to do it
149 better? Should it be something like QAPI union with discriminator field
150 (magic here). So, it's a TODO. We'll see how to properly refactor this
151 when we have more qcow2 structures.
154 assert all(v
is not None for v
in (magic
, length
, data
))
158 padding
= 8 - (length
% 8)
159 data
+= b
'\0' * padding
162 assert all(v
is None for v
in (magic
, length
, data
))
163 super().__init
__(fd
=fd
)
164 padded
= (self
.length
+ 7) & ~
7
165 self
.data
= fd
.read(padded
)
166 assert self
.data
is not None
168 if self
.magic
== QCOW2_EXT_MAGIC_BITMAPS
:
169 self
.obj
= Qcow2BitmapExt(data
=self
.data
)
177 data
= self
.data
[:self
.length
]
178 if all(c
in string
.printable
.encode('ascii') for c
in data
):
179 data
= f
"'{ data.decode('ascii') }'"
182 print(f
'{"data":<25} {data}')
187 def create(cls
, magic
, data
):
188 return QcowHeaderExtension(magic
, len(data
), data
)
191 class QcowHeader(Qcow2Struct
):
194 # Version 2 header fields
195 ('u32', '{:#x}', 'magic'),
196 ('u32', '{}', 'version'),
197 ('u64', '{:#x}', 'backing_file_offset'),
198 ('u32', '{:#x}', 'backing_file_size'),
199 ('u32', '{}', 'cluster_bits'),
200 ('u64', '{}', 'size'),
201 ('u32', '{}', 'crypt_method'),
202 ('u32', '{}', 'l1_size'),
203 ('u64', '{:#x}', 'l1_table_offset'),
204 ('u64', '{:#x}', 'refcount_table_offset'),
205 ('u32', '{}', 'refcount_table_clusters'),
206 ('u32', '{}', 'nb_snapshots'),
207 ('u64', '{:#x}', 'snapshot_offset'),
209 # Version 3 header fields
210 ('u64', Flags64
, 'incompatible_features'),
211 ('u64', Flags64
, 'compatible_features'),
212 ('u64', Flags64
, 'autoclear_features'),
213 ('u32', '{}', 'refcount_order'),
214 ('u32', '{}', 'header_length'),
217 def __init__(self
, fd
):
218 super().__init
__(fd
=fd
, offset
=0)
221 self
.cluster_size
= 1 << self
.cluster_bits
223 fd
.seek(self
.header_length
)
224 self
.load_extensions(fd
)
226 if self
.backing_file_offset
:
227 fd
.seek(self
.backing_file_offset
)
228 self
.backing_file
= fd
.read(self
.backing_file_size
)
230 self
.backing_file
= None
232 def set_defaults(self
):
233 if self
.version
== 2:
234 self
.incompatible_features
= 0
235 self
.compatible_features
= 0
236 self
.autoclear_features
= 0
237 self
.refcount_order
= 4
238 self
.header_length
= 72
240 def load_extensions(self
, fd
):
243 if self
.backing_file_offset
!= 0:
244 end
= min(self
.cluster_size
, self
.backing_file_offset
)
246 end
= self
.cluster_size
248 while fd
.tell() < end
:
249 ext
= QcowHeaderExtension(fd
=fd
)
253 self
.extensions
.append(ext
)
255 def update_extensions(self
, fd
):
257 fd
.seek(self
.header_length
)
258 extensions
= self
.extensions
259 extensions
.append(QcowHeaderExtension(0, 0, b
''))
260 for ex
in extensions
:
261 buf
= struct
.pack('>II', ex
.magic
, ex
.length
)
265 if self
.backing_file
is not None:
266 self
.backing_file_offset
= fd
.tell()
267 fd
.write(self
.backing_file
)
269 if fd
.tell() > self
.cluster_size
:
270 raise Exception('I think I just broke the image...')
272 def update(self
, fd
):
273 header_bytes
= self
.header_length
275 self
.update_extensions(fd
)
278 header
= tuple(self
.__dict
__[f
] for t
, p
, f
in QcowHeader
.fields
)
279 buf
= struct
.pack(QcowHeader
.fmt
, *header
)
280 buf
= buf
[0:header_bytes
-1]
283 def dump_extensions(self
):
284 for ex
in self
.extensions
:
285 print('Header extension:')