1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
6 #include "open-chapter.h"
8 #include <linux/log2.h>
11 #include "memory-alloc.h"
13 #include "permassert.h"
16 #include "hash-utils.h"
19 * Each index zone has a dedicated open chapter zone structure which gets an equal share of the
20 * open chapter space. Records are assigned to zones based on their record name. Within each zone,
21 * records are stored in an array in the order they arrive. Additionally, a reference to each
22 * record is stored in a hash table to help determine if a new record duplicates an existing one.
23 * If new metadata for an existing name arrives, the record is altered in place. The array of
24 * records is 1-based so that record number 0 can be used to indicate an unused hash slot.
26 * Deleted records are marked with a flag rather than actually removed to simplify hash table
27 * management. The array of deleted flags overlays the array of hash slots, but the flags are
28 * indexed by record number instead of by record name. The number of hash slots will always be a
29 * power of two that is greater than the number of records to be indexed, guaranteeing that hash
30 * insertion cannot fail, and that there are sufficient flags for all records.
32 * Once any open chapter zone fills its available space, the chapter is closed. The records from
33 * each zone are interleaved to attempt to preserve temporal locality and assigned to record pages.
34 * Empty or deleted records are replaced by copies of a valid record so that the record pages only
35 * contain valid records. The chapter then constructs a delta index which maps each record name to
36 * the record page on which that record can be found, which is split into index pages. These
37 * structures are then passed to the volume to be recorded on storage.
39 * When the index is saved, the open chapter records are saved in a single array, once again
40 * interleaved to attempt to preserve temporal locality. When the index is reloaded, there may be a
41 * different number of zones than previously, so the records must be parcelled out to their new
42 * zones. In addition, depending on the distribution of record names, a new zone may have more
43 * records than it has space. In this case, the latest records for that zone will be discarded.
46 static const u8 OPEN_CHAPTER_MAGIC
[] = "ALBOC";
47 static const u8 OPEN_CHAPTER_VERSION
[] = "02.00";
49 #define OPEN_CHAPTER_MAGIC_LENGTH (sizeof(OPEN_CHAPTER_MAGIC) - 1)
50 #define OPEN_CHAPTER_VERSION_LENGTH (sizeof(OPEN_CHAPTER_VERSION) - 1)
53 static inline size_t records_size(const struct open_chapter_zone
*open_chapter
)
55 return sizeof(struct uds_volume_record
) * (1 + open_chapter
->capacity
);
58 static inline size_t slots_size(size_t slot_count
)
60 return sizeof(struct open_chapter_zone_slot
) * slot_count
;
63 int uds_make_open_chapter(const struct index_geometry
*geometry
, unsigned int zone_count
,
64 struct open_chapter_zone
**open_chapter_ptr
)
67 struct open_chapter_zone
*open_chapter
;
68 size_t capacity
= geometry
->records_per_chapter
/ zone_count
;
69 size_t slot_count
= (1 << bits_per(capacity
* LOAD_RATIO
));
71 result
= vdo_allocate_extended(struct open_chapter_zone
, slot_count
,
72 struct open_chapter_zone_slot
, "open chapter",
74 if (result
!= VDO_SUCCESS
)
77 open_chapter
->slot_count
= slot_count
;
78 open_chapter
->capacity
= capacity
;
79 result
= vdo_allocate_cache_aligned(records_size(open_chapter
), "record pages",
80 &open_chapter
->records
);
81 if (result
!= VDO_SUCCESS
) {
82 uds_free_open_chapter(open_chapter
);
86 *open_chapter_ptr
= open_chapter
;
90 void uds_reset_open_chapter(struct open_chapter_zone
*open_chapter
)
92 open_chapter
->size
= 0;
93 open_chapter
->deletions
= 0;
95 memset(open_chapter
->records
, 0, records_size(open_chapter
));
96 memset(open_chapter
->slots
, 0, slots_size(open_chapter
->slot_count
));
99 static unsigned int probe_chapter_slots(struct open_chapter_zone
*open_chapter
,
100 const struct uds_record_name
*name
)
102 struct uds_volume_record
*record
;
103 unsigned int slot_count
= open_chapter
->slot_count
;
104 unsigned int slot
= uds_name_to_hash_slot(name
, slot_count
);
105 unsigned int record_number
;
106 unsigned int attempts
= 1;
109 record_number
= open_chapter
->slots
[slot
].record_number
;
112 * If the hash slot is empty, we've reached the end of a chain without finding the
113 * record and should terminate the search.
115 if (record_number
== 0)
119 * If the name of the record referenced by the slot matches and has not been
120 * deleted, then we've found the requested name.
122 record
= &open_chapter
->records
[record_number
];
123 if ((memcmp(&record
->name
, name
, UDS_RECORD_NAME_SIZE
) == 0) &&
124 !open_chapter
->slots
[record_number
].deleted
)
128 * Quadratic probing: advance the probe by 1, 2, 3, etc. and try again. This
129 * performs better than linear probing and works best for 2^N slots.
131 slot
= (slot
+ attempts
++) % slot_count
;
135 void uds_search_open_chapter(struct open_chapter_zone
*open_chapter
,
136 const struct uds_record_name
*name
,
137 struct uds_record_data
*metadata
, bool *found
)
140 unsigned int record_number
;
142 slot
= probe_chapter_slots(open_chapter
, name
);
143 record_number
= open_chapter
->slots
[slot
].record_number
;
144 if (record_number
== 0) {
148 *metadata
= open_chapter
->records
[record_number
].data
;
152 /* Add a record to the open chapter zone and return the remaining space. */
153 int uds_put_open_chapter(struct open_chapter_zone
*open_chapter
,
154 const struct uds_record_name
*name
,
155 const struct uds_record_data
*metadata
)
158 unsigned int record_number
;
159 struct uds_volume_record
*record
;
161 if (open_chapter
->size
>= open_chapter
->capacity
)
164 slot
= probe_chapter_slots(open_chapter
, name
);
165 record_number
= open_chapter
->slots
[slot
].record_number
;
167 if (record_number
== 0) {
168 record_number
= ++open_chapter
->size
;
169 open_chapter
->slots
[slot
].record_number
= record_number
;
172 record
= &open_chapter
->records
[record_number
];
173 record
->name
= *name
;
174 record
->data
= *metadata
;
176 return open_chapter
->capacity
- open_chapter
->size
;
179 void uds_remove_from_open_chapter(struct open_chapter_zone
*open_chapter
,
180 const struct uds_record_name
*name
)
183 unsigned int record_number
;
185 slot
= probe_chapter_slots(open_chapter
, name
);
186 record_number
= open_chapter
->slots
[slot
].record_number
;
188 if (record_number
> 0) {
189 open_chapter
->slots
[record_number
].deleted
= true;
190 open_chapter
->deletions
+= 1;
194 void uds_free_open_chapter(struct open_chapter_zone
*open_chapter
)
196 if (open_chapter
!= NULL
) {
197 vdo_free(open_chapter
->records
);
198 vdo_free(open_chapter
);
202 /* Map each record name to its record page number in the delta chapter index. */
203 static int fill_delta_chapter_index(struct open_chapter_zone
**chapter_zones
,
204 unsigned int zone_count
,
205 struct open_chapter_index
*index
,
206 struct uds_volume_record
*collated_records
)
209 unsigned int records_per_chapter
;
210 unsigned int records_per_page
;
211 unsigned int record_index
;
212 unsigned int records
= 0;
215 int overflow_count
= 0;
216 struct uds_volume_record
*fill_record
= NULL
;
219 * The record pages should not have any empty space, so find a record with which to fill
220 * the chapter zone if it was closed early, and also to replace any deleted records. The
221 * last record in any filled zone is guaranteed to not have been deleted, so use one of
224 for (z
= 0; z
< zone_count
; z
++) {
225 struct open_chapter_zone
*zone
= chapter_zones
[z
];
227 if (zone
->size
== zone
->capacity
) {
228 fill_record
= &zone
->records
[zone
->size
];
233 records_per_chapter
= index
->geometry
->records_per_chapter
;
234 records_per_page
= index
->geometry
->records_per_page
;
236 for (records
= 0; records
< records_per_chapter
; records
++) {
237 struct uds_volume_record
*record
= &collated_records
[records
];
238 struct open_chapter_zone
*open_chapter
;
240 /* The record arrays in the zones are 1-based. */
241 record_index
= 1 + (records
/ zone_count
);
242 page_number
= records
/ records_per_page
;
243 open_chapter
= chapter_zones
[records
% zone_count
];
245 /* Use the fill record in place of an unused record. */
246 if (record_index
> open_chapter
->size
||
247 open_chapter
->slots
[record_index
].deleted
) {
248 *record
= *fill_record
;
252 *record
= open_chapter
->records
[record_index
];
253 result
= uds_put_open_chapter_index_record(index
, &record
->name
,
262 vdo_log_error_strerror(result
,
263 "failed to build open chapter index");
268 if (overflow_count
> 0)
269 vdo_log_warning("Failed to add %d entries to chapter index",
275 int uds_close_open_chapter(struct open_chapter_zone
**chapter_zones
,
276 unsigned int zone_count
, struct volume
*volume
,
277 struct open_chapter_index
*chapter_index
,
278 struct uds_volume_record
*collated_records
,
279 u64 virtual_chapter_number
)
283 uds_empty_open_chapter_index(chapter_index
, virtual_chapter_number
);
284 result
= fill_delta_chapter_index(chapter_zones
, zone_count
, chapter_index
,
286 if (result
!= UDS_SUCCESS
)
289 return uds_write_chapter(volume
, chapter_index
, collated_records
);
292 int uds_save_open_chapter(struct uds_index
*index
, struct buffered_writer
*writer
)
295 struct open_chapter_zone
*open_chapter
;
296 struct uds_volume_record
*record
;
297 u8 record_count_data
[sizeof(u32
)];
298 u32 record_count
= 0;
299 unsigned int record_index
;
302 result
= uds_write_to_buffered_writer(writer
, OPEN_CHAPTER_MAGIC
,
303 OPEN_CHAPTER_MAGIC_LENGTH
);
304 if (result
!= UDS_SUCCESS
)
307 result
= uds_write_to_buffered_writer(writer
, OPEN_CHAPTER_VERSION
,
308 OPEN_CHAPTER_VERSION_LENGTH
);
309 if (result
!= UDS_SUCCESS
)
312 for (z
= 0; z
< index
->zone_count
; z
++) {
313 open_chapter
= index
->zones
[z
]->open_chapter
;
314 record_count
+= open_chapter
->size
- open_chapter
->deletions
;
317 put_unaligned_le32(record_count
, record_count_data
);
318 result
= uds_write_to_buffered_writer(writer
, record_count_data
,
319 sizeof(record_count_data
));
320 if (result
!= UDS_SUCCESS
)
324 while (record_count
> 0) {
325 for (z
= 0; z
< index
->zone_count
; z
++) {
326 open_chapter
= index
->zones
[z
]->open_chapter
;
327 if (record_index
> open_chapter
->size
)
330 if (open_chapter
->slots
[record_index
].deleted
)
333 record
= &open_chapter
->records
[record_index
];
334 result
= uds_write_to_buffered_writer(writer
, (u8
*) record
,
336 if (result
!= UDS_SUCCESS
)
345 return uds_flush_buffered_writer(writer
);
348 u64
uds_compute_saved_open_chapter_size(struct index_geometry
*geometry
)
350 unsigned int records_per_chapter
= geometry
->records_per_chapter
;
352 return OPEN_CHAPTER_MAGIC_LENGTH
+ OPEN_CHAPTER_VERSION_LENGTH
+ sizeof(u32
) +
353 records_per_chapter
* sizeof(struct uds_volume_record
);
356 static int load_version20(struct uds_index
*index
, struct buffered_reader
*reader
)
360 u8 record_count_data
[sizeof(u32
)];
361 struct uds_volume_record record
;
364 * Track which zones cannot accept any more records. If the open chapter had a different
365 * number of zones previously, some new zones may have more records than they have space
366 * for. These overflow records will be discarded.
368 bool full_flags
[MAX_ZONES
] = {
372 result
= uds_read_from_buffered_reader(reader
, (u8
*) &record_count_data
,
373 sizeof(record_count_data
));
374 if (result
!= UDS_SUCCESS
)
377 record_count
= get_unaligned_le32(record_count_data
);
378 while (record_count
-- > 0) {
379 unsigned int zone
= 0;
381 result
= uds_read_from_buffered_reader(reader
, (u8
*) &record
,
383 if (result
!= UDS_SUCCESS
)
386 if (index
->zone_count
> 1)
387 zone
= uds_get_volume_index_zone(index
->volume_index
,
390 if (!full_flags
[zone
]) {
391 struct open_chapter_zone
*open_chapter
;
392 unsigned int remaining
;
394 open_chapter
= index
->zones
[zone
]->open_chapter
;
395 remaining
= uds_put_open_chapter(open_chapter
, &record
.name
,
397 /* Do not allow any zone to fill completely. */
398 full_flags
[zone
] = (remaining
<= 1);
405 int uds_load_open_chapter(struct uds_index
*index
, struct buffered_reader
*reader
)
407 u8 version
[OPEN_CHAPTER_VERSION_LENGTH
];
410 result
= uds_verify_buffered_data(reader
, OPEN_CHAPTER_MAGIC
,
411 OPEN_CHAPTER_MAGIC_LENGTH
);
412 if (result
!= UDS_SUCCESS
)
415 result
= uds_read_from_buffered_reader(reader
, version
, sizeof(version
));
416 if (result
!= UDS_SUCCESS
)
419 if (memcmp(OPEN_CHAPTER_VERSION
, version
, sizeof(version
)) != 0) {
420 return vdo_log_error_strerror(UDS_CORRUPT_DATA
,
421 "Invalid open chapter version: %.*s",
422 (int) sizeof(version
), version
);
425 return load_version20(index
, reader
);