1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
10 #include "memory-alloc.h"
12 #include "funnel-requestqueue.h"
13 #include "hash-utils.h"
14 #include "sparse-cache.h"
16 static const u64 NO_LAST_SAVE
= U64_MAX
;
19 * When searching for deduplication records, the index first searches the volume index, and then
20 * searches the chapter index for the relevant chapter. If the chapter has been fully committed to
21 * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been
22 * committed (either the open chapter or a recently closed one), the index searches the in-memory
23 * representation of the chapter. Finally, if the volume index does not find a record and the index
24 * is sparse, the index will search the sparse cache.
26 * The index send two kinds of messages to coordinate between zones: chapter close messages for the
27 * chapter writer, and sparse cache barrier messages for the sparse cache.
29 * The chapter writer is responsible for committing chapters of records to storage. Since zones can
30 * get different numbers of records, some zones may fall behind others. Each time a zone fills up
31 * its available space in a chapter, it informs the chapter writer that the chapter is complete,
32 * and also informs all other zones that it has closed the chapter. Each other zone will then close
33 * the chapter immediately, regardless of how full it is, in order to minimize skew between zones.
34 * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage.
36 * The last zone to close the chapter also removes the oldest chapter from the volume index.
37 * Although that chapter is invalid for zones that have moved on, the existence of the open chapter
38 * means that those zones will never ask the volume index about it. No zone is allowed to get more
39 * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another
40 * chapter before the previous one has been closed by all zones, it is forced to wait.
42 * The sparse cache relies on having the same set of chapter indexes available to all zones. When a
43 * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone
44 * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and
45 * paused its operations, the cache membership is changed and each zone is then informed that it
46 * can proceed. More details can be found in the sparse cache documentation.
48 * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the
49 * barrier message to change the sparse cache membership, so the index simulates the message by
50 * invoking the handler directly.
53 struct chapter_writer
{
54 /* The index to which we belong */
55 struct uds_index
*index
;
56 /* The thread to do the writing */
57 struct thread
*thread
;
58 /* The lock protecting the following fields */
60 /* The condition signalled on state changes */
62 /* Set to true to stop the thread */
64 /* The result from the most recent write */
66 /* The number of bytes allocated by the chapter writer */
68 /* The number of zones which have submitted a chapter for writing */
69 unsigned int zones_to_write
;
70 /* Open chapter index used by uds_close_open_chapter() */
71 struct open_chapter_index
*open_chapter_index
;
72 /* Collated records used by uds_close_open_chapter() */
73 struct uds_volume_record
*collated_records
;
74 /* The chapters to write (one per zone) */
75 struct open_chapter_zone
*chapters
[];
78 static bool is_zone_chapter_sparse(const struct index_zone
*zone
, u64 virtual_chapter
)
80 return uds_is_chapter_sparse(zone
->index
->volume
->geometry
,
81 zone
->oldest_virtual_chapter
,
82 zone
->newest_virtual_chapter
, virtual_chapter
);
85 static int launch_zone_message(struct uds_zone_message message
, unsigned int zone
,
86 struct uds_index
*index
)
89 struct uds_request
*request
;
91 result
= vdo_allocate(1, struct uds_request
, __func__
, &request
);
92 if (result
!= VDO_SUCCESS
)
95 request
->index
= index
;
96 request
->unbatched
= true;
97 request
->zone_number
= zone
;
98 request
->zone_message
= message
;
100 uds_enqueue_request(request
, STAGE_MESSAGE
);
104 static void enqueue_barrier_messages(struct uds_index
*index
, u64 virtual_chapter
)
106 struct uds_zone_message message
= {
107 .type
= UDS_MESSAGE_SPARSE_CACHE_BARRIER
,
108 .virtual_chapter
= virtual_chapter
,
112 for (zone
= 0; zone
< index
->zone_count
; zone
++) {
113 int result
= launch_zone_message(message
, zone
, index
);
115 VDO_ASSERT_LOG_ONLY((result
== UDS_SUCCESS
), "barrier message allocation");
120 * Determine whether this request should trigger a sparse cache barrier message to change the
121 * membership of the sparse cache. If a change in membership is desired, the function returns the
122 * chapter number to add.
124 static u64
triage_index_request(struct uds_index
*index
, struct uds_request
*request
)
127 struct index_zone
*zone
;
129 virtual_chapter
= uds_lookup_volume_index_name(index
->volume_index
,
130 &request
->record_name
);
131 if (virtual_chapter
== NO_CHAPTER
)
134 zone
= index
->zones
[request
->zone_number
];
135 if (!is_zone_chapter_sparse(zone
, virtual_chapter
))
139 * FIXME: Optimize for a common case by remembering the chapter from the most recent
140 * barrier message and skipping this chapter if is it the same.
143 return virtual_chapter
;
147 * Simulate a message to change the sparse cache membership for a single-zone sparse index. This
148 * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind
149 * of index does nothing here.
151 static int simulate_index_zone_barrier_message(struct index_zone
*zone
,
152 struct uds_request
*request
)
154 u64 sparse_virtual_chapter
;
156 if ((zone
->index
->zone_count
> 1) ||
157 !uds_is_sparse_index_geometry(zone
->index
->volume
->geometry
))
160 sparse_virtual_chapter
= triage_index_request(zone
->index
, request
);
161 if (sparse_virtual_chapter
== NO_CHAPTER
)
164 return uds_update_sparse_cache(zone
, sparse_virtual_chapter
);
167 /* This is the request processing function for the triage queue. */
168 static void triage_request(struct uds_request
*request
)
170 struct uds_index
*index
= request
->index
;
171 u64 sparse_virtual_chapter
= triage_index_request(index
, request
);
173 if (sparse_virtual_chapter
!= NO_CHAPTER
)
174 enqueue_barrier_messages(index
, sparse_virtual_chapter
);
176 uds_enqueue_request(request
, STAGE_INDEX
);
179 static int finish_previous_chapter(struct uds_index
*index
, u64 current_chapter_number
)
182 struct chapter_writer
*writer
= index
->chapter_writer
;
184 mutex_lock(&writer
->mutex
);
185 while (index
->newest_virtual_chapter
< current_chapter_number
)
186 uds_wait_cond(&writer
->cond
, &writer
->mutex
);
187 result
= writer
->result
;
188 mutex_unlock(&writer
->mutex
);
190 if (result
!= UDS_SUCCESS
)
191 return vdo_log_error_strerror(result
,
192 "Writing of previous open chapter failed");
197 static int swap_open_chapter(struct index_zone
*zone
)
201 result
= finish_previous_chapter(zone
->index
, zone
->newest_virtual_chapter
);
202 if (result
!= UDS_SUCCESS
)
205 swap(zone
->open_chapter
, zone
->writing_chapter
);
210 * Inform the chapter writer that this zone is done with this chapter. The chapter won't start
211 * writing until all zones have closed it.
213 static unsigned int start_closing_chapter(struct uds_index
*index
,
214 unsigned int zone_number
,
215 struct open_chapter_zone
*chapter
)
217 unsigned int finished_zones
;
218 struct chapter_writer
*writer
= index
->chapter_writer
;
220 mutex_lock(&writer
->mutex
);
221 finished_zones
= ++writer
->zones_to_write
;
222 writer
->chapters
[zone_number
] = chapter
;
223 uds_broadcast_cond(&writer
->cond
);
224 mutex_unlock(&writer
->mutex
);
226 return finished_zones
;
229 static int announce_chapter_closed(struct index_zone
*zone
, u64 closed_chapter
)
233 struct uds_zone_message zone_message
= {
234 .type
= UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED
,
235 .virtual_chapter
= closed_chapter
,
238 for (i
= 0; i
< zone
->index
->zone_count
; i
++) {
242 result
= launch_zone_message(zone_message
, i
, zone
->index
);
243 if (result
!= UDS_SUCCESS
)
250 static int open_next_chapter(struct index_zone
*zone
)
255 unsigned int finished_zones
;
258 vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)",
259 (unsigned long long) zone
->newest_virtual_chapter
, zone
->id
,
260 zone
->open_chapter
->size
,
261 zone
->open_chapter
->capacity
- zone
->open_chapter
->size
);
263 result
= swap_open_chapter(zone
);
264 if (result
!= UDS_SUCCESS
)
267 closed_chapter
= zone
->newest_virtual_chapter
++;
268 uds_set_volume_index_zone_open_chapter(zone
->index
->volume_index
, zone
->id
,
269 zone
->newest_virtual_chapter
);
270 uds_reset_open_chapter(zone
->open_chapter
);
272 finished_zones
= start_closing_chapter(zone
->index
, zone
->id
,
273 zone
->writing_chapter
);
274 if ((finished_zones
== 1) && (zone
->index
->zone_count
> 1)) {
275 result
= announce_chapter_closed(zone
, closed_chapter
);
276 if (result
!= UDS_SUCCESS
)
280 expiring
= zone
->oldest_virtual_chapter
;
281 expire_chapters
= uds_chapters_to_expire(zone
->index
->volume
->geometry
,
282 zone
->newest_virtual_chapter
);
283 zone
->oldest_virtual_chapter
+= expire_chapters
;
285 if (finished_zones
< zone
->index
->zone_count
)
288 while (expire_chapters
-- > 0)
289 uds_forget_chapter(zone
->index
->volume
, expiring
++);
294 static int handle_chapter_closed(struct index_zone
*zone
, u64 virtual_chapter
)
296 if (zone
->newest_virtual_chapter
== virtual_chapter
)
297 return open_next_chapter(zone
);
302 static int dispatch_index_zone_control_request(struct uds_request
*request
)
304 struct uds_zone_message
*message
= &request
->zone_message
;
305 struct index_zone
*zone
= request
->index
->zones
[request
->zone_number
];
307 switch (message
->type
) {
308 case UDS_MESSAGE_SPARSE_CACHE_BARRIER
:
309 return uds_update_sparse_cache(zone
, message
->virtual_chapter
);
311 case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED
:
312 return handle_chapter_closed(zone
, message
->virtual_chapter
);
315 vdo_log_error("invalid message type: %d", message
->type
);
316 return UDS_INVALID_ARGUMENT
;
320 static void set_request_location(struct uds_request
*request
,
321 enum uds_index_region new_location
)
323 request
->location
= new_location
;
324 request
->found
= ((new_location
== UDS_LOCATION_IN_OPEN_CHAPTER
) ||
325 (new_location
== UDS_LOCATION_IN_DENSE
) ||
326 (new_location
== UDS_LOCATION_IN_SPARSE
));
329 static void set_chapter_location(struct uds_request
*request
,
330 const struct index_zone
*zone
, u64 virtual_chapter
)
332 request
->found
= true;
333 if (virtual_chapter
== zone
->newest_virtual_chapter
)
334 request
->location
= UDS_LOCATION_IN_OPEN_CHAPTER
;
335 else if (is_zone_chapter_sparse(zone
, virtual_chapter
))
336 request
->location
= UDS_LOCATION_IN_SPARSE
;
338 request
->location
= UDS_LOCATION_IN_DENSE
;
341 static int search_sparse_cache_in_zone(struct index_zone
*zone
, struct uds_request
*request
,
342 u64 virtual_chapter
, bool *found
)
345 struct volume
*volume
;
346 u16 record_page_number
;
349 result
= uds_search_sparse_cache(zone
, &request
->record_name
, &virtual_chapter
,
350 &record_page_number
);
351 if ((result
!= UDS_SUCCESS
) || (virtual_chapter
== NO_CHAPTER
))
354 request
->virtual_chapter
= virtual_chapter
;
355 volume
= zone
->index
->volume
;
356 chapter
= uds_map_to_physical_chapter(volume
->geometry
, virtual_chapter
);
357 return uds_search_cached_record_page(volume
, request
, chapter
,
358 record_page_number
, found
);
361 static int get_record_from_zone(struct index_zone
*zone
, struct uds_request
*request
,
364 struct volume
*volume
;
366 if (request
->location
== UDS_LOCATION_RECORD_PAGE_LOOKUP
) {
369 } else if (request
->location
== UDS_LOCATION_UNAVAILABLE
) {
374 if (request
->virtual_chapter
== zone
->newest_virtual_chapter
) {
375 uds_search_open_chapter(zone
->open_chapter
, &request
->record_name
,
376 &request
->old_metadata
, found
);
380 if ((zone
->newest_virtual_chapter
> 0) &&
381 (request
->virtual_chapter
== (zone
->newest_virtual_chapter
- 1)) &&
382 (zone
->writing_chapter
->size
> 0)) {
383 uds_search_open_chapter(zone
->writing_chapter
, &request
->record_name
,
384 &request
->old_metadata
, found
);
388 volume
= zone
->index
->volume
;
389 if (is_zone_chapter_sparse(zone
, request
->virtual_chapter
) &&
390 uds_sparse_cache_contains(volume
->sparse_cache
, request
->virtual_chapter
,
391 request
->zone_number
))
392 return search_sparse_cache_in_zone(zone
, request
,
393 request
->virtual_chapter
, found
);
395 return uds_search_volume_page_cache(volume
, request
, found
);
398 static int put_record_in_zone(struct index_zone
*zone
, struct uds_request
*request
,
399 const struct uds_record_data
*metadata
)
401 unsigned int remaining
;
403 remaining
= uds_put_open_chapter(zone
->open_chapter
, &request
->record_name
,
406 return open_next_chapter(zone
);
411 static int search_index_zone(struct index_zone
*zone
, struct uds_request
*request
)
414 struct volume_index_record record
;
415 bool overflow_record
, found
= false;
416 struct uds_record_data
*metadata
;
419 result
= uds_get_volume_index_record(zone
->index
->volume_index
,
420 &request
->record_name
, &record
);
421 if (result
!= UDS_SUCCESS
)
424 if (record
.is_found
) {
425 if (request
->requeued
&& request
->virtual_chapter
!= record
.virtual_chapter
)
426 set_request_location(request
, UDS_LOCATION_UNKNOWN
);
428 request
->virtual_chapter
= record
.virtual_chapter
;
429 result
= get_record_from_zone(zone
, request
, &found
);
430 if (result
!= UDS_SUCCESS
)
435 set_chapter_location(request
, zone
, record
.virtual_chapter
);
438 * If a record has overflowed a chapter index in more than one chapter (or overflowed in
439 * one chapter and collided with an existing record), it will exist as a collision record
440 * in the volume index, but we won't find it in the volume. This case needs special
443 overflow_record
= (record
.is_found
&& record
.is_collision
&& !found
);
444 chapter
= zone
->newest_virtual_chapter
;
445 if (found
|| overflow_record
) {
446 if ((request
->type
== UDS_QUERY_NO_UPDATE
) ||
447 ((request
->type
== UDS_QUERY
) && overflow_record
)) {
448 /* There is nothing left to do. */
452 if (record
.virtual_chapter
!= chapter
) {
454 * Update the volume index to reference the new chapter for the block. If
455 * the record had been deleted or dropped from the chapter index, it will
458 result
= uds_set_volume_index_record_chapter(&record
, chapter
);
459 } else if (request
->type
!= UDS_UPDATE
) {
460 /* The record is already in the open chapter. */
465 * The record wasn't in the volume index, so check whether the
466 * name is in a cached sparse chapter. If we found the name on
467 * a previous search, use that result instead.
469 if (request
->location
== UDS_LOCATION_RECORD_PAGE_LOOKUP
) {
471 } else if (request
->location
== UDS_LOCATION_UNAVAILABLE
) {
473 } else if (uds_is_sparse_index_geometry(zone
->index
->volume
->geometry
) &&
474 !uds_is_volume_index_sample(zone
->index
->volume_index
,
475 &request
->record_name
)) {
476 result
= search_sparse_cache_in_zone(zone
, request
, NO_CHAPTER
,
478 if (result
!= UDS_SUCCESS
)
483 set_request_location(request
, UDS_LOCATION_IN_SPARSE
);
485 if ((request
->type
== UDS_QUERY_NO_UPDATE
) ||
486 ((request
->type
== UDS_QUERY
) && !found
)) {
487 /* There is nothing left to do. */
492 * Add a new entry to the volume index referencing the open chapter. This needs to
493 * be done both for new records, and for records from cached sparse chapters.
495 result
= uds_put_volume_index_record(&record
, chapter
);
498 if (result
== UDS_OVERFLOW
) {
500 * The volume index encountered a delta list overflow. The condition was already
501 * logged. We will go on without adding the record to the open chapter.
506 if (result
!= UDS_SUCCESS
)
509 if (!found
|| (request
->type
== UDS_UPDATE
)) {
510 /* This is a new record or we're updating an existing record. */
511 metadata
= &request
->new_metadata
;
513 /* Move the existing record to the open chapter. */
514 metadata
= &request
->old_metadata
;
517 return put_record_in_zone(zone
, request
, metadata
);
520 static int remove_from_index_zone(struct index_zone
*zone
, struct uds_request
*request
)
523 struct volume_index_record record
;
525 result
= uds_get_volume_index_record(zone
->index
->volume_index
,
526 &request
->record_name
, &record
);
527 if (result
!= UDS_SUCCESS
)
530 if (!record
.is_found
)
533 /* If the request was requeued, check whether the saved state is still valid. */
535 if (record
.is_collision
) {
536 set_chapter_location(request
, zone
, record
.virtual_chapter
);
538 /* Non-collision records are hints, so resolve the name in the chapter. */
541 if (request
->requeued
&& request
->virtual_chapter
!= record
.virtual_chapter
)
542 set_request_location(request
, UDS_LOCATION_UNKNOWN
);
544 request
->virtual_chapter
= record
.virtual_chapter
;
545 result
= get_record_from_zone(zone
, request
, &found
);
546 if (result
!= UDS_SUCCESS
)
550 /* There is no record to remove. */
555 set_chapter_location(request
, zone
, record
.virtual_chapter
);
558 * Delete the volume index entry for the named record only. Note that a later search might
559 * later return stale advice if there is a colliding name in the same chapter, but it's a
560 * very rare case (1 in 2^21).
562 result
= uds_remove_volume_index_record(&record
);
563 if (result
!= UDS_SUCCESS
)
567 * If the record is in the open chapter, we must remove it or mark it deleted to avoid
568 * trouble if the record is added again later.
570 if (request
->location
== UDS_LOCATION_IN_OPEN_CHAPTER
)
571 uds_remove_from_open_chapter(zone
->open_chapter
, &request
->record_name
);
576 static int dispatch_index_request(struct uds_index
*index
, struct uds_request
*request
)
579 struct index_zone
*zone
= index
->zones
[request
->zone_number
];
581 if (!request
->requeued
) {
582 result
= simulate_index_zone_barrier_message(zone
, request
);
583 if (result
!= UDS_SUCCESS
)
587 switch (request
->type
) {
591 case UDS_QUERY_NO_UPDATE
:
592 result
= search_index_zone(zone
, request
);
596 result
= remove_from_index_zone(zone
, request
);
600 result
= vdo_log_warning_strerror(UDS_INVALID_ARGUMENT
,
601 "invalid request type: %d",
609 /* This is the request processing function invoked by each zone's thread. */
610 static void execute_zone_request(struct uds_request
*request
)
613 struct uds_index
*index
= request
->index
;
615 if (request
->zone_message
.type
!= UDS_MESSAGE_NONE
) {
616 result
= dispatch_index_zone_control_request(request
);
617 if (result
!= UDS_SUCCESS
) {
618 vdo_log_error_strerror(result
, "error executing message: %d",
619 request
->zone_message
.type
);
622 /* Once the message is processed it can be freed. */
623 vdo_free(vdo_forget(request
));
627 index
->need_to_save
= true;
628 if (request
->requeued
&& (request
->status
!= UDS_SUCCESS
)) {
629 set_request_location(request
, UDS_LOCATION_UNAVAILABLE
);
630 index
->callback(request
);
634 result
= dispatch_index_request(index
, request
);
635 if (result
== UDS_QUEUED
) {
636 /* The request has been requeued so don't let it complete. */
641 set_request_location(request
, UDS_LOCATION_UNAVAILABLE
);
643 request
->status
= result
;
644 index
->callback(request
);
647 static int initialize_index_queues(struct uds_index
*index
,
648 const struct index_geometry
*geometry
)
653 for (i
= 0; i
< index
->zone_count
; i
++) {
654 result
= uds_make_request_queue("indexW", &execute_zone_request
,
655 &index
->zone_queues
[i
]);
656 if (result
!= UDS_SUCCESS
)
660 /* The triage queue is only needed for sparse multi-zone indexes. */
661 if ((index
->zone_count
> 1) && uds_is_sparse_index_geometry(geometry
)) {
662 result
= uds_make_request_queue("triageW", &triage_request
,
663 &index
->triage_queue
);
664 if (result
!= UDS_SUCCESS
)
671 /* This is the driver function for the chapter writer thread. */
672 static void close_chapters(void *arg
)
675 struct chapter_writer
*writer
= arg
;
676 struct uds_index
*index
= writer
->index
;
678 vdo_log_debug("chapter writer starting");
679 mutex_lock(&writer
->mutex
);
681 while (writer
->zones_to_write
< index
->zone_count
) {
682 if (writer
->stop
&& (writer
->zones_to_write
== 0)) {
684 * We've been told to stop, and all of the zones are in the same
685 * open chapter, so we can exit now.
687 mutex_unlock(&writer
->mutex
);
688 vdo_log_debug("chapter writer stopping");
691 uds_wait_cond(&writer
->cond
, &writer
->mutex
);
695 * Release the lock while closing a chapter. We probably don't need to do this, but
696 * it seems safer in principle. It's OK to access the chapter and chapter_number
697 * fields without the lock since those aren't allowed to change until we're done.
699 mutex_unlock(&writer
->mutex
);
701 if (index
->has_saved_open_chapter
) {
703 * Remove the saved open chapter the first time we close an open chapter
704 * after loading from a clean shutdown, or after doing a clean save. The
705 * lack of the saved open chapter will indicate that a recovery is
708 index
->has_saved_open_chapter
= false;
709 result
= uds_discard_open_chapter(index
->layout
);
710 if (result
== UDS_SUCCESS
)
711 vdo_log_debug("Discarding saved open chapter");
714 result
= uds_close_open_chapter(writer
->chapters
, index
->zone_count
,
716 writer
->open_chapter_index
,
717 writer
->collated_records
,
718 index
->newest_virtual_chapter
);
720 mutex_lock(&writer
->mutex
);
721 index
->newest_virtual_chapter
++;
722 index
->oldest_virtual_chapter
+=
723 uds_chapters_to_expire(index
->volume
->geometry
,
724 index
->newest_virtual_chapter
);
725 writer
->result
= result
;
726 writer
->zones_to_write
= 0;
727 uds_broadcast_cond(&writer
->cond
);
731 static void stop_chapter_writer(struct chapter_writer
*writer
)
733 struct thread
*writer_thread
= NULL
;
735 mutex_lock(&writer
->mutex
);
736 if (writer
->thread
!= NULL
) {
737 writer_thread
= writer
->thread
;
738 writer
->thread
= NULL
;
740 uds_broadcast_cond(&writer
->cond
);
742 mutex_unlock(&writer
->mutex
);
744 if (writer_thread
!= NULL
)
745 vdo_join_threads(writer_thread
);
748 static void free_chapter_writer(struct chapter_writer
*writer
)
753 stop_chapter_writer(writer
);
754 uds_free_open_chapter_index(writer
->open_chapter_index
);
755 vdo_free(writer
->collated_records
);
759 static int make_chapter_writer(struct uds_index
*index
,
760 struct chapter_writer
**writer_ptr
)
763 struct chapter_writer
*writer
;
764 size_t collated_records_size
=
765 (sizeof(struct uds_volume_record
) * index
->volume
->geometry
->records_per_chapter
);
767 result
= vdo_allocate_extended(struct chapter_writer
, index
->zone_count
,
768 struct open_chapter_zone
*, "Chapter Writer",
770 if (result
!= VDO_SUCCESS
)
773 writer
->index
= index
;
774 mutex_init(&writer
->mutex
);
775 uds_init_cond(&writer
->cond
);
777 result
= vdo_allocate_cache_aligned(collated_records_size
, "collated records",
778 &writer
->collated_records
);
779 if (result
!= VDO_SUCCESS
) {
780 free_chapter_writer(writer
);
784 result
= uds_make_open_chapter_index(&writer
->open_chapter_index
,
785 index
->volume
->geometry
,
786 index
->volume
->nonce
);
787 if (result
!= UDS_SUCCESS
) {
788 free_chapter_writer(writer
);
792 writer
->memory_size
= (sizeof(struct chapter_writer
) +
793 index
->zone_count
* sizeof(struct open_chapter_zone
*) +
794 collated_records_size
+
795 writer
->open_chapter_index
->memory_size
);
797 result
= vdo_create_thread(close_chapters
, writer
, "writer", &writer
->thread
);
798 if (result
!= VDO_SUCCESS
) {
799 free_chapter_writer(writer
);
803 *writer_ptr
= writer
;
807 static int load_index(struct uds_index
*index
)
810 u64 last_save_chapter
;
812 result
= uds_load_index_state(index
->layout
, index
);
813 if (result
!= UDS_SUCCESS
)
814 return UDS_INDEX_NOT_SAVED_CLEANLY
;
816 last_save_chapter
= ((index
->last_save
!= NO_LAST_SAVE
) ? index
->last_save
: 0);
818 vdo_log_info("loaded index from chapter %llu through chapter %llu",
819 (unsigned long long) index
->oldest_virtual_chapter
,
820 (unsigned long long) last_save_chapter
);
825 static int rebuild_index_page_map(struct uds_index
*index
, u64 vcn
)
828 struct delta_index_page
*chapter_index_page
;
829 struct index_geometry
*geometry
= index
->volume
->geometry
;
830 u32 chapter
= uds_map_to_physical_chapter(geometry
, vcn
);
831 u32 expected_list_number
= 0;
832 u32 index_page_number
;
833 u32 lowest_delta_list
;
834 u32 highest_delta_list
;
836 for (index_page_number
= 0;
837 index_page_number
< geometry
->index_pages_per_chapter
;
838 index_page_number
++) {
839 result
= uds_get_volume_index_page(index
->volume
, chapter
,
841 &chapter_index_page
);
842 if (result
!= UDS_SUCCESS
) {
843 return vdo_log_error_strerror(result
,
844 "failed to read index page %u in chapter %u",
845 index_page_number
, chapter
);
848 lowest_delta_list
= chapter_index_page
->lowest_list_number
;
849 highest_delta_list
= chapter_index_page
->highest_list_number
;
850 if (lowest_delta_list
!= expected_list_number
) {
851 return vdo_log_error_strerror(UDS_CORRUPT_DATA
,
852 "chapter %u index page %u is corrupt",
853 chapter
, index_page_number
);
856 uds_update_index_page_map(index
->volume
->index_page_map
, vcn
, chapter
,
857 index_page_number
, highest_delta_list
);
858 expected_list_number
= highest_delta_list
+ 1;
864 static int replay_record(struct uds_index
*index
, const struct uds_record_name
*name
,
865 u64 virtual_chapter
, bool will_be_sparse_chapter
)
868 struct volume_index_record record
;
871 if (will_be_sparse_chapter
&&
872 !uds_is_volume_index_sample(index
->volume_index
, name
)) {
874 * This entry will be in a sparse chapter after the rebuild completes, and it is
875 * not a sample, so just skip over it.
880 result
= uds_get_volume_index_record(index
->volume_index
, name
, &record
);
881 if (result
!= UDS_SUCCESS
)
884 if (record
.is_found
) {
885 if (record
.is_collision
) {
886 if (record
.virtual_chapter
== virtual_chapter
) {
887 /* The record is already correct. */
891 update_record
= true;
892 } else if (record
.virtual_chapter
== virtual_chapter
) {
894 * There is a volume index entry pointing to the current chapter, but we
895 * don't know if it is for the same name as the one we are currently
896 * working on or not. For now, we're just going to assume that it isn't.
897 * This will create one extra collision record if there was a deleted
898 * record in the current chapter.
900 update_record
= false;
903 * If we're rebuilding, we don't normally want to go to disk to see if the
904 * record exists, since we will likely have just read the record from disk
905 * (i.e. we know it's there). The exception to this is when we find an
906 * entry in the volume index that has a different chapter. In this case, we
907 * need to search that chapter to determine if the volume index entry was
908 * for the same record or a different one.
910 result
= uds_search_volume_page_cache_for_rebuild(index
->volume
,
912 record
.virtual_chapter
,
914 if (result
!= UDS_SUCCESS
)
918 update_record
= false;
923 * Update the volume index to reference the new chapter for the block. If the
924 * record had been deleted or dropped from the chapter index, it will be back.
926 result
= uds_set_volume_index_record_chapter(&record
, virtual_chapter
);
929 * Add a new entry to the volume index referencing the open chapter. This should be
930 * done regardless of whether we are a brand new record or a sparse record, i.e.
931 * one that doesn't exist in the index but does on disk, since for a sparse record,
932 * we would want to un-sparsify if it did exist.
934 result
= uds_put_volume_index_record(&record
, virtual_chapter
);
937 if ((result
== UDS_DUPLICATE_NAME
) || (result
== UDS_OVERFLOW
)) {
938 /* The rebuilt index will lose these records. */
945 static bool check_for_suspend(struct uds_index
*index
)
949 if (index
->load_context
== NULL
)
952 mutex_lock(&index
->load_context
->mutex
);
953 if (index
->load_context
->status
!= INDEX_SUSPENDING
) {
954 mutex_unlock(&index
->load_context
->mutex
);
958 /* Notify that we are suspended and wait for the resume. */
959 index
->load_context
->status
= INDEX_SUSPENDED
;
960 uds_broadcast_cond(&index
->load_context
->cond
);
962 while ((index
->load_context
->status
!= INDEX_OPENING
) &&
963 (index
->load_context
->status
!= INDEX_FREEING
))
964 uds_wait_cond(&index
->load_context
->cond
, &index
->load_context
->mutex
);
966 closing
= (index
->load_context
->status
== INDEX_FREEING
);
967 mutex_unlock(&index
->load_context
->mutex
);
971 static int replay_chapter(struct uds_index
*index
, u64
virtual, bool sparse
)
976 const struct index_geometry
*geometry
;
977 u32 physical_chapter
;
979 if (check_for_suspend(index
)) {
980 vdo_log_info("Replay interrupted by index shutdown at chapter %llu",
981 (unsigned long long) virtual);
985 geometry
= index
->volume
->geometry
;
986 physical_chapter
= uds_map_to_physical_chapter(geometry
, virtual);
987 uds_prefetch_volume_chapter(index
->volume
, physical_chapter
);
988 uds_set_volume_index_open_chapter(index
->volume_index
, virtual);
990 result
= rebuild_index_page_map(index
, virtual);
991 if (result
!= UDS_SUCCESS
) {
992 return vdo_log_error_strerror(result
,
993 "could not rebuild index page map for chapter %u",
997 for (i
= 0; i
< geometry
->record_pages_per_chapter
; i
++) {
999 u32 record_page_number
;
1001 record_page_number
= geometry
->index_pages_per_chapter
+ i
;
1002 result
= uds_get_volume_record_page(index
->volume
, physical_chapter
,
1003 record_page_number
, &record_page
);
1004 if (result
!= UDS_SUCCESS
) {
1005 return vdo_log_error_strerror(result
, "could not get page %d",
1006 record_page_number
);
1009 for (j
= 0; j
< geometry
->records_per_page
; j
++) {
1010 const u8
*name_bytes
;
1011 struct uds_record_name name
;
1013 name_bytes
= record_page
+ (j
* BYTES_PER_RECORD
);
1014 memcpy(&name
.name
, name_bytes
, UDS_RECORD_NAME_SIZE
);
1015 result
= replay_record(index
, &name
, virtual, sparse
);
1016 if (result
!= UDS_SUCCESS
)
1024 static int replay_volume(struct uds_index
*index
)
1030 u64 from_virtual
= index
->oldest_virtual_chapter
;
1031 u64 upto_virtual
= index
->newest_virtual_chapter
;
1032 bool will_be_sparse
;
1034 vdo_log_info("Replaying volume from chapter %llu through chapter %llu",
1035 (unsigned long long) from_virtual
,
1036 (unsigned long long) upto_virtual
);
1039 * The index failed to load, so the volume index is empty. Add records to the volume index
1040 * in order, skipping non-hooks in chapters which will be sparse to save time.
1042 * Go through each record page of each chapter and add the records back to the volume
1043 * index. This should not cause anything to be written to either the open chapter or the
1044 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this
1045 * would have already been purged from the volume index when the chapter was opened.
1047 * Also, go through each index page for each chapter and rebuild the index page map.
1049 old_map_update
= index
->volume
->index_page_map
->last_update
;
1050 for (virtual = from_virtual
; virtual < upto_virtual
; virtual++) {
1051 will_be_sparse
= uds_is_chapter_sparse(index
->volume
->geometry
,
1052 from_virtual
, upto_virtual
,
1054 result
= replay_chapter(index
, virtual, will_be_sparse
);
1055 if (result
!= UDS_SUCCESS
)
1059 /* Also reap the chapter being replaced by the open chapter. */
1060 uds_set_volume_index_open_chapter(index
->volume_index
, upto_virtual
);
1062 new_map_update
= index
->volume
->index_page_map
->last_update
;
1063 if (new_map_update
!= old_map_update
) {
1064 vdo_log_info("replay changed index page map update from %llu to %llu",
1065 (unsigned long long) old_map_update
,
1066 (unsigned long long) new_map_update
);
1072 static int rebuild_index(struct uds_index
*index
)
1077 bool is_empty
= false;
1078 u32 chapters_per_volume
= index
->volume
->geometry
->chapters_per_volume
;
1080 index
->volume
->lookup_mode
= LOOKUP_FOR_REBUILD
;
1081 result
= uds_find_volume_chapter_boundaries(index
->volume
, &lowest
, &highest
,
1083 if (result
!= UDS_SUCCESS
) {
1084 return vdo_log_fatal_strerror(result
,
1085 "cannot rebuild index: unknown volume chapter boundaries");
1089 index
->newest_virtual_chapter
= 0;
1090 index
->oldest_virtual_chapter
= 0;
1091 index
->volume
->lookup_mode
= LOOKUP_NORMAL
;
1095 index
->newest_virtual_chapter
= highest
+ 1;
1096 index
->oldest_virtual_chapter
= lowest
;
1097 if (index
->newest_virtual_chapter
==
1098 (index
->oldest_virtual_chapter
+ chapters_per_volume
)) {
1099 /* Skip the chapter shadowed by the open chapter. */
1100 index
->oldest_virtual_chapter
++;
1103 result
= replay_volume(index
);
1104 if (result
!= UDS_SUCCESS
)
1107 index
->volume
->lookup_mode
= LOOKUP_NORMAL
;
1111 static void free_index_zone(struct index_zone
*zone
)
1116 uds_free_open_chapter(zone
->open_chapter
);
1117 uds_free_open_chapter(zone
->writing_chapter
);
1121 static int make_index_zone(struct uds_index
*index
, unsigned int zone_number
)
1124 struct index_zone
*zone
;
1126 result
= vdo_allocate(1, struct index_zone
, "index zone", &zone
);
1127 if (result
!= VDO_SUCCESS
)
1130 result
= uds_make_open_chapter(index
->volume
->geometry
, index
->zone_count
,
1131 &zone
->open_chapter
);
1132 if (result
!= UDS_SUCCESS
) {
1133 free_index_zone(zone
);
1137 result
= uds_make_open_chapter(index
->volume
->geometry
, index
->zone_count
,
1138 &zone
->writing_chapter
);
1139 if (result
!= UDS_SUCCESS
) {
1140 free_index_zone(zone
);
1144 zone
->index
= index
;
1145 zone
->id
= zone_number
;
1146 index
->zones
[zone_number
] = zone
;
1151 int uds_make_index(struct uds_configuration
*config
, enum uds_open_index_type open_type
,
1152 struct index_load_context
*load_context
, index_callback_fn callback
,
1153 struct uds_index
**new_index
)
1156 bool loaded
= false;
1157 bool new = (open_type
== UDS_CREATE
);
1158 struct uds_index
*index
= NULL
;
1159 struct index_zone
*zone
;
1163 result
= vdo_allocate_extended(struct uds_index
, config
->zone_count
,
1164 struct uds_request_queue
*, "index", &index
);
1165 if (result
!= VDO_SUCCESS
)
1168 index
->zone_count
= config
->zone_count
;
1170 result
= uds_make_index_layout(config
, new, &index
->layout
);
1171 if (result
!= UDS_SUCCESS
) {
1172 uds_free_index(index
);
1176 result
= vdo_allocate(index
->zone_count
, struct index_zone
*, "zones",
1178 if (result
!= VDO_SUCCESS
) {
1179 uds_free_index(index
);
1183 result
= uds_make_volume(config
, index
->layout
, &index
->volume
);
1184 if (result
!= UDS_SUCCESS
) {
1185 uds_free_index(index
);
1189 index
->volume
->lookup_mode
= LOOKUP_NORMAL
;
1190 for (z
= 0; z
< index
->zone_count
; z
++) {
1191 result
= make_index_zone(index
, z
);
1192 if (result
!= UDS_SUCCESS
) {
1193 uds_free_index(index
);
1194 return vdo_log_error_strerror(result
,
1195 "Could not create index zone");
1199 nonce
= uds_get_volume_nonce(index
->layout
);
1200 result
= uds_make_volume_index(config
, nonce
, &index
->volume_index
);
1201 if (result
!= UDS_SUCCESS
) {
1202 uds_free_index(index
);
1203 return vdo_log_error_strerror(result
, "could not make volume index");
1206 index
->load_context
= load_context
;
1207 index
->callback
= callback
;
1209 result
= initialize_index_queues(index
, config
->geometry
);
1210 if (result
!= UDS_SUCCESS
) {
1211 uds_free_index(index
);
1215 result
= make_chapter_writer(index
, &index
->chapter_writer
);
1216 if (result
!= UDS_SUCCESS
) {
1217 uds_free_index(index
);
1222 result
= load_index(index
);
1228 /* We should not try a rebuild for this error. */
1229 vdo_log_error_strerror(result
, "index could not be loaded");
1232 vdo_log_error_strerror(result
, "index could not be loaded");
1233 if (open_type
== UDS_LOAD
) {
1234 result
= rebuild_index(index
);
1235 if (result
!= UDS_SUCCESS
) {
1236 vdo_log_error_strerror(result
,
1237 "index could not be rebuilt");
1244 if (result
!= UDS_SUCCESS
) {
1245 uds_free_index(index
);
1246 return vdo_log_error_strerror(result
, "fatal error in %s()", __func__
);
1249 for (z
= 0; z
< index
->zone_count
; z
++) {
1250 zone
= index
->zones
[z
];
1251 zone
->oldest_virtual_chapter
= index
->oldest_virtual_chapter
;
1252 zone
->newest_virtual_chapter
= index
->newest_virtual_chapter
;
1255 if (index
->load_context
!= NULL
) {
1256 mutex_lock(&index
->load_context
->mutex
);
1257 index
->load_context
->status
= INDEX_READY
;
1259 * If we get here, suspend is meaningless, but notify any thread trying to suspend
1260 * us so it doesn't hang.
1262 uds_broadcast_cond(&index
->load_context
->cond
);
1263 mutex_unlock(&index
->load_context
->mutex
);
1266 index
->has_saved_open_chapter
= loaded
;
1267 index
->need_to_save
= !loaded
;
1272 void uds_free_index(struct uds_index
*index
)
1279 uds_request_queue_finish(index
->triage_queue
);
1280 for (i
= 0; i
< index
->zone_count
; i
++)
1281 uds_request_queue_finish(index
->zone_queues
[i
]);
1283 free_chapter_writer(index
->chapter_writer
);
1285 uds_free_volume_index(index
->volume_index
);
1286 if (index
->zones
!= NULL
) {
1287 for (i
= 0; i
< index
->zone_count
; i
++)
1288 free_index_zone(index
->zones
[i
]);
1289 vdo_free(index
->zones
);
1292 uds_free_volume(index
->volume
);
1293 uds_free_index_layout(vdo_forget(index
->layout
));
1297 /* Wait for the chapter writer to complete any outstanding writes. */
1298 void uds_wait_for_idle_index(struct uds_index
*index
)
1300 struct chapter_writer
*writer
= index
->chapter_writer
;
1302 mutex_lock(&writer
->mutex
);
1303 while (writer
->zones_to_write
> 0)
1304 uds_wait_cond(&writer
->cond
, &writer
->mutex
);
1305 mutex_unlock(&writer
->mutex
);
1308 /* This function assumes that all requests have been drained. */
1309 int uds_save_index(struct uds_index
*index
)
1313 if (!index
->need_to_save
)
1316 uds_wait_for_idle_index(index
);
1317 index
->prev_save
= index
->last_save
;
1318 index
->last_save
= ((index
->newest_virtual_chapter
== 0) ?
1319 NO_LAST_SAVE
: index
->newest_virtual_chapter
- 1);
1320 vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index
->last_save
);
1322 result
= uds_save_index_state(index
->layout
, index
);
1323 if (result
!= UDS_SUCCESS
) {
1324 vdo_log_info("save index failed");
1325 index
->last_save
= index
->prev_save
;
1327 index
->has_saved_open_chapter
= true;
1328 index
->need_to_save
= false;
1329 vdo_log_info("finished save (vcn %llu)",
1330 (unsigned long long) index
->last_save
);
1336 int uds_replace_index_storage(struct uds_index
*index
, struct block_device
*bdev
)
1338 return uds_replace_volume_storage(index
->volume
, index
->layout
, bdev
);
1341 /* Accessing statistics should be safe from any thread. */
1342 void uds_get_index_stats(struct uds_index
*index
, struct uds_index_stats
*counters
)
1344 struct volume_index_stats stats
;
1346 uds_get_volume_index_stats(index
->volume_index
, &stats
);
1347 counters
->entries_indexed
= stats
.record_count
;
1348 counters
->collisions
= stats
.collision_count
;
1349 counters
->entries_discarded
= stats
.discard_count
;
1351 counters
->memory_used
= (index
->volume_index
->memory_size
+
1352 index
->volume
->cache_size
+
1353 index
->chapter_writer
->memory_size
);
1356 void uds_enqueue_request(struct uds_request
*request
, enum request_stage stage
)
1358 struct uds_index
*index
= request
->index
;
1359 struct uds_request_queue
*queue
;
1363 if (index
->triage_queue
!= NULL
) {
1364 queue
= index
->triage_queue
;
1371 request
->zone_number
=
1372 uds_get_volume_index_zone(index
->volume_index
, &request
->record_name
);
1376 queue
= index
->zone_queues
[request
->zone_number
];
1380 VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage
);
1384 uds_request_queue_enqueue(queue
, request
);