1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
8 #include <linux/atomic.h>
9 #include <linux/dm-bufio.h>
10 #include <linux/err.h>
14 #include "memory-alloc.h"
15 #include "permassert.h"
16 #include "string-utils.h"
17 #include "thread-utils.h"
19 #include "chapter-index.h"
22 #include "hash-utils.h"
24 #include "sparse-cache.h"
27 * The first block of the volume layout is reserved for the volume header, which is no longer used.
28 * The remainder of the volume is divided into chapters consisting of several pages of records, and
29 * several pages of static index to use to find those records. The index pages are recorded first,
30 * followed by the record pages. The chapters are written in order as they are filled, so the
31 * volume storage acts as a circular log of the most recent chapters, with each new chapter
32 * overwriting the oldest saved one.
34 * When a new chapter is filled and closed, the records from that chapter are sorted and
35 * interleaved in approximate temporal order, and assigned to record pages. Then a static delta
36 * index is generated to store which record page contains each record. The in-memory index page map
37 * is also updated to indicate which delta lists fall on each chapter index page. This means that
38 * when a record is read, the volume only has to load a single index page and a single record page,
39 * rather than search the entire chapter. These index and record pages are written to storage, and
40 * the index pages are transferred to the page cache under the theory that the most recently
41 * written chapter is likely to be accessed again soon.
43 * When reading a record, the volume index will indicate which chapter should contain it. The
44 * volume uses the index page map to determine which chapter index page needs to be loaded, and
45 * then reads the relevant record page number from the chapter index. Both index and record pages
46 * are stored in a page cache when read for the common case that subsequent records need the same
47 * pages. The page cache evicts the least recently accessed entries when caching new pages. In
48 * addition, the volume uses dm-bufio to manage access to the storage, which may allow for
49 * additional caching depending on available system resources.
51 * Record requests are handled from cached pages when possible. If a page needs to be read, it is
52 * placed on a queue along with the request that wants to read it. Any requests for the same page
53 * that arrive while the read is pending are added to the queue entry. A separate reader thread
54 * handles the queued reads, adding the page to the cache and updating any requests queued with it
55 * so they can continue processing. This allows the index zone threads to continue processing new
56 * requests rather than wait for the storage reads.
58 * When an index rebuild is necessary, the volume reads each stored chapter to determine which
59 * range of chapters contain valid records, so that those records can be used to reconstruct the
60 * in-memory volume index.
63 /* The maximum allowable number of contiguous bad chapters */
64 #define MAX_BAD_CHAPTERS 100
65 #define VOLUME_CACHE_MAX_ENTRIES (U16_MAX >> 1)
66 #define VOLUME_CACHE_QUEUED_FLAG (1 << 15)
67 #define VOLUME_CACHE_MAX_QUEUED_READS 4096
69 static const u64 BAD_CHAPTER
= U64_MAX
;
72 * The invalidate counter is two 32 bits fields stored together atomically. The low order 32 bits
73 * are the physical page number of the cached page being read. The high order 32 bits are a
74 * sequence number. This value is written when the zone that owns it begins or completes a cache
75 * search. Any other thread will only read the counter in wait_for_pending_searches() while waiting
76 * to update the cache contents.
78 union invalidate_counter
{
86 static inline u32
map_to_page_number(struct index_geometry
*geometry
, u32 physical_page
)
88 return (physical_page
- HEADER_PAGES_PER_VOLUME
) % geometry
->pages_per_chapter
;
91 static inline u32
map_to_chapter_number(struct index_geometry
*geometry
, u32 physical_page
)
93 return (physical_page
- HEADER_PAGES_PER_VOLUME
) / geometry
->pages_per_chapter
;
96 static inline bool is_record_page(struct index_geometry
*geometry
, u32 physical_page
)
98 return map_to_page_number(geometry
, physical_page
) >= geometry
->index_pages_per_chapter
;
101 static u32
map_to_physical_page(const struct index_geometry
*geometry
, u32 chapter
, u32 page
)
103 /* Page zero is the header page, so the first chapter index page is page one. */
104 return HEADER_PAGES_PER_VOLUME
+ (geometry
->pages_per_chapter
* chapter
) + page
;
107 static inline union invalidate_counter
get_invalidate_counter(struct page_cache
*cache
,
108 unsigned int zone_number
)
110 return (union invalidate_counter
) {
111 .value
= READ_ONCE(cache
->search_pending_counters
[zone_number
].atomic_value
),
115 static inline void set_invalidate_counter(struct page_cache
*cache
,
116 unsigned int zone_number
,
117 union invalidate_counter invalidate_counter
)
119 WRITE_ONCE(cache
->search_pending_counters
[zone_number
].atomic_value
,
120 invalidate_counter
.value
);
123 static inline bool search_pending(union invalidate_counter invalidate_counter
)
125 return (invalidate_counter
.counter
& 1) != 0;
128 /* Lock the cache for a zone in order to search for a page. */
129 static void begin_pending_search(struct page_cache
*cache
, u32 physical_page
,
130 unsigned int zone_number
)
132 union invalidate_counter invalidate_counter
=
133 get_invalidate_counter(cache
, zone_number
);
135 invalidate_counter
.page
= physical_page
;
136 invalidate_counter
.counter
++;
137 set_invalidate_counter(cache
, zone_number
, invalidate_counter
);
138 VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter
),
139 "Search is pending for zone %u", zone_number
);
141 * This memory barrier ensures that the write to the invalidate counter is seen by other
142 * threads before this thread accesses the cached page. The corresponding read memory
143 * barrier is in wait_for_pending_searches().
148 /* Unlock the cache for a zone by clearing its invalidate counter. */
149 static void end_pending_search(struct page_cache
*cache
, unsigned int zone_number
)
151 union invalidate_counter invalidate_counter
;
154 * This memory barrier ensures that this thread completes reads of the
155 * cached page before other threads see the write to the invalidate
160 invalidate_counter
= get_invalidate_counter(cache
, zone_number
);
161 VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter
),
162 "Search is pending for zone %u", zone_number
);
163 invalidate_counter
.counter
++;
164 set_invalidate_counter(cache
, zone_number
, invalidate_counter
);
167 static void wait_for_pending_searches(struct page_cache
*cache
, u32 physical_page
)
169 union invalidate_counter initial_counters
[MAX_ZONES
];
173 * We hold the read_threads_mutex. We are waiting for threads that do not hold the
174 * read_threads_mutex. Those threads have "locked" their targeted page by setting the
175 * search_pending_counter. The corresponding write memory barrier is in
176 * begin_pending_search().
180 for (i
= 0; i
< cache
->zone_count
; i
++)
181 initial_counters
[i
] = get_invalidate_counter(cache
, i
);
182 for (i
= 0; i
< cache
->zone_count
; i
++) {
183 if (search_pending(initial_counters
[i
]) &&
184 (initial_counters
[i
].page
== physical_page
)) {
186 * There is an active search using the physical page. We need to wait for
187 * the search to finish.
189 * FIXME: Investigate using wait_event() to wait for the search to finish.
191 while (initial_counters
[i
].value
==
192 get_invalidate_counter(cache
, i
).value
)
198 static void release_page_buffer(struct cached_page
*page
)
200 if (page
->buffer
!= NULL
)
201 dm_bufio_release(vdo_forget(page
->buffer
));
204 static void clear_cache_page(struct page_cache
*cache
, struct cached_page
*page
)
206 /* Do not clear read_pending because the read queue relies on it. */
207 release_page_buffer(page
);
208 page
->physical_page
= cache
->indexable_pages
;
209 WRITE_ONCE(page
->last_used
, 0);
212 static void make_page_most_recent(struct page_cache
*cache
, struct cached_page
*page
)
215 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
216 * thread holding the read_threads_mutex.
218 if (atomic64_read(&cache
->clock
) != READ_ONCE(page
->last_used
))
219 WRITE_ONCE(page
->last_used
, atomic64_inc_return(&cache
->clock
));
222 /* Select a page to remove from the cache to make space for a new entry. */
223 static struct cached_page
*select_victim_in_cache(struct page_cache
*cache
)
225 struct cached_page
*page
;
226 int oldest_index
= 0;
227 s64 oldest_time
= S64_MAX
;
231 /* Find the oldest unclaimed page. We hold the read_threads_mutex. */
232 for (i
= 0; i
< cache
->cache_slots
; i
++) {
233 /* A page with a pending read must not be replaced. */
234 if (cache
->cache
[i
].read_pending
)
237 last_used
= READ_ONCE(cache
->cache
[i
].last_used
);
238 if (last_used
<= oldest_time
) {
239 oldest_time
= last_used
;
244 page
= &cache
->cache
[oldest_index
];
245 if (page
->physical_page
!= cache
->indexable_pages
) {
246 WRITE_ONCE(cache
->index
[page
->physical_page
], cache
->cache_slots
);
247 wait_for_pending_searches(cache
, page
->physical_page
);
250 page
->read_pending
= true;
251 clear_cache_page(cache
, page
);
255 /* Make a newly filled cache entry available to other threads. */
256 static int put_page_in_cache(struct page_cache
*cache
, u32 physical_page
,
257 struct cached_page
*page
)
261 /* We hold the read_threads_mutex. */
262 result
= VDO_ASSERT((page
->read_pending
), "page to install has a pending read");
263 if (result
!= VDO_SUCCESS
)
266 page
->physical_page
= physical_page
;
267 make_page_most_recent(cache
, page
);
268 page
->read_pending
= false;
271 * We hold the read_threads_mutex, but we must have a write memory barrier before making
272 * the cached_page available to the readers that do not hold the mutex. The corresponding
273 * read memory barrier is in get_page_and_index().
277 /* This assignment also clears the queued flag. */
278 WRITE_ONCE(cache
->index
[physical_page
], page
- cache
->cache
);
282 static void cancel_page_in_cache(struct page_cache
*cache
, u32 physical_page
,
283 struct cached_page
*page
)
287 /* We hold the read_threads_mutex. */
288 result
= VDO_ASSERT((page
->read_pending
), "page to install has a pending read");
289 if (result
!= VDO_SUCCESS
)
292 clear_cache_page(cache
, page
);
293 page
->read_pending
= false;
295 /* Clear the mapping and the queued flag for the new page. */
296 WRITE_ONCE(cache
->index
[physical_page
], cache
->cache_slots
);
299 static inline u16
next_queue_position(u16 position
)
301 return (position
+ 1) % VOLUME_CACHE_MAX_QUEUED_READS
;
304 static inline void advance_queue_position(u16
*position
)
306 *position
= next_queue_position(*position
);
309 static inline bool read_queue_is_full(struct page_cache
*cache
)
311 return cache
->read_queue_first
== next_queue_position(cache
->read_queue_last
);
314 static bool enqueue_read(struct page_cache
*cache
, struct uds_request
*request
,
317 struct queued_read
*queue_entry
;
318 u16 last
= cache
->read_queue_last
;
319 u16 read_queue_index
;
321 /* We hold the read_threads_mutex. */
322 if ((cache
->index
[physical_page
] & VOLUME_CACHE_QUEUED_FLAG
) == 0) {
323 /* This page has no existing entry in the queue. */
324 if (read_queue_is_full(cache
))
327 /* Fill in the read queue entry. */
328 cache
->read_queue
[last
].physical_page
= physical_page
;
329 cache
->read_queue
[last
].invalid
= false;
330 cache
->read_queue
[last
].first_request
= NULL
;
331 cache
->read_queue
[last
].last_request
= NULL
;
333 /* Point the cache index to the read queue entry. */
334 read_queue_index
= last
;
335 WRITE_ONCE(cache
->index
[physical_page
],
336 read_queue_index
| VOLUME_CACHE_QUEUED_FLAG
);
338 advance_queue_position(&cache
->read_queue_last
);
340 /* It's already queued, so add this request to the existing entry. */
341 read_queue_index
= cache
->index
[physical_page
] & ~VOLUME_CACHE_QUEUED_FLAG
;
344 request
->next_request
= NULL
;
345 queue_entry
= &cache
->read_queue
[read_queue_index
];
346 if (queue_entry
->first_request
== NULL
)
347 queue_entry
->first_request
= request
;
349 queue_entry
->last_request
->next_request
= request
;
350 queue_entry
->last_request
= request
;
355 static void enqueue_page_read(struct volume
*volume
, struct uds_request
*request
,
358 /* Mark the page as queued, so that chapter invalidation knows to cancel a read. */
359 while (!enqueue_read(&volume
->page_cache
, request
, physical_page
)) {
360 vdo_log_debug("Read queue full, waiting for reads to finish");
361 uds_wait_cond(&volume
->read_threads_read_done_cond
,
362 &volume
->read_threads_mutex
);
365 uds_signal_cond(&volume
->read_threads_cond
);
369 * Reserve the next read queue entry for processing, but do not actually remove it from the queue.
370 * Must be followed by release_queued_requests().
372 static struct queued_read
*reserve_read_queue_entry(struct page_cache
*cache
)
374 /* We hold the read_threads_mutex. */
375 struct queued_read
*entry
;
379 /* No items to dequeue */
380 if (cache
->read_queue_next_read
== cache
->read_queue_last
)
383 entry
= &cache
->read_queue
[cache
->read_queue_next_read
];
384 index_value
= cache
->index
[entry
->physical_page
];
385 queued
= (index_value
& VOLUME_CACHE_QUEUED_FLAG
) != 0;
386 /* Check to see if it's still queued before resetting. */
387 if (entry
->invalid
&& queued
)
388 WRITE_ONCE(cache
->index
[entry
->physical_page
], cache
->cache_slots
);
391 * If a synchronous read has taken this page, set invalid to true so it doesn't get
392 * overwritten. Requests will just be requeued.
395 entry
->invalid
= true;
397 entry
->reserved
= true;
398 advance_queue_position(&cache
->read_queue_next_read
);
402 static inline struct queued_read
*wait_to_reserve_read_queue_entry(struct volume
*volume
)
404 struct queued_read
*queue_entry
= NULL
;
406 while (!volume
->read_threads_exiting
) {
407 queue_entry
= reserve_read_queue_entry(&volume
->page_cache
);
408 if (queue_entry
!= NULL
)
411 uds_wait_cond(&volume
->read_threads_cond
, &volume
->read_threads_mutex
);
417 static int init_chapter_index_page(const struct volume
*volume
, u8
*index_page
,
418 u32 chapter
, u32 index_page_number
,
419 struct delta_index_page
*chapter_index_page
)
425 struct index_geometry
*geometry
= volume
->geometry
;
428 result
= uds_initialize_chapter_index_page(chapter_index_page
, geometry
,
429 index_page
, volume
->nonce
);
430 if (volume
->lookup_mode
== LOOKUP_FOR_REBUILD
)
433 if (result
!= UDS_SUCCESS
) {
434 return vdo_log_error_strerror(result
,
435 "Reading chapter index page for chapter %u page %u",
436 chapter
, index_page_number
);
439 uds_get_list_number_bounds(volume
->index_page_map
, chapter
, index_page_number
,
440 &lowest_list
, &highest_list
);
441 ci_virtual
= chapter_index_page
->virtual_chapter_number
;
442 ci_chapter
= uds_map_to_physical_chapter(geometry
, ci_virtual
);
443 if ((chapter
== ci_chapter
) &&
444 (lowest_list
== chapter_index_page
->lowest_list_number
) &&
445 (highest_list
== chapter_index_page
->highest_list_number
))
448 vdo_log_warning("Index page map updated to %llu",
449 (unsigned long long) volume
->index_page_map
->last_update
);
450 vdo_log_warning("Page map expects that chapter %u page %u has range %u to %u, but chapter index page has chapter %llu with range %u to %u",
451 chapter
, index_page_number
, lowest_list
, highest_list
,
452 (unsigned long long) ci_virtual
,
453 chapter_index_page
->lowest_list_number
,
454 chapter_index_page
->highest_list_number
);
455 return vdo_log_error_strerror(UDS_CORRUPT_DATA
,
456 "index page map mismatch with chapter index");
459 static int initialize_index_page(const struct volume
*volume
, u32 physical_page
,
460 struct cached_page
*page
)
462 u32 chapter
= map_to_chapter_number(volume
->geometry
, physical_page
);
463 u32 index_page_number
= map_to_page_number(volume
->geometry
, physical_page
);
465 return init_chapter_index_page(volume
, dm_bufio_get_block_data(page
->buffer
),
466 chapter
, index_page_number
, &page
->index_page
);
469 static bool search_record_page(const u8 record_page
[],
470 const struct uds_record_name
*name
,
471 const struct index_geometry
*geometry
,
472 struct uds_record_data
*metadata
)
475 * The array of records is sorted by name and stored as a binary tree in heap order, so the
476 * root of the tree is the first array element.
479 const struct uds_volume_record
*records
= (const struct uds_volume_record
*) record_page
;
481 while (node
< geometry
->records_per_page
) {
483 const struct uds_volume_record
*record
= &records
[node
];
485 result
= memcmp(name
, &record
->name
, UDS_RECORD_NAME_SIZE
);
487 if (metadata
!= NULL
)
488 *metadata
= record
->data
;
492 /* The children of node N are at indexes 2N+1 and 2N+2. */
493 node
= ((2 * node
) + ((result
< 0) ? 1 : 2));
500 * If we've read in a record page, we're going to do an immediate search, to speed up processing by
501 * avoiding get_record_from_zone(), and to ensure that requests make progress even when queued. If
502 * we've read in an index page, we save the record page number so we don't have to resolve the
503 * index page again. We use the location, virtual_chapter, and old_metadata fields in the request
504 * to allow the index code to know where to begin processing the request again.
506 static int search_page(struct cached_page
*page
, const struct volume
*volume
,
507 struct uds_request
*request
, u32 physical_page
)
510 enum uds_index_region location
;
511 u16 record_page_number
;
513 if (is_record_page(volume
->geometry
, physical_page
)) {
514 if (search_record_page(dm_bufio_get_block_data(page
->buffer
),
515 &request
->record_name
, volume
->geometry
,
516 &request
->old_metadata
))
517 location
= UDS_LOCATION_RECORD_PAGE_LOOKUP
;
519 location
= UDS_LOCATION_UNAVAILABLE
;
521 result
= uds_search_chapter_index_page(&page
->index_page
,
523 &request
->record_name
,
524 &record_page_number
);
525 if (result
!= UDS_SUCCESS
)
528 if (record_page_number
== NO_CHAPTER_INDEX_ENTRY
) {
529 location
= UDS_LOCATION_UNAVAILABLE
;
531 location
= UDS_LOCATION_INDEX_PAGE_LOOKUP
;
532 *((u16
*) &request
->old_metadata
) = record_page_number
;
536 request
->location
= location
;
537 request
->found
= false;
541 static int process_entry(struct volume
*volume
, struct queued_read
*entry
)
543 u32 page_number
= entry
->physical_page
;
544 struct uds_request
*request
;
545 struct cached_page
*page
= NULL
;
549 if (entry
->invalid
) {
550 vdo_log_debug("Requeuing requests for invalid page");
554 page
= select_victim_in_cache(&volume
->page_cache
);
556 mutex_unlock(&volume
->read_threads_mutex
);
557 page_data
= dm_bufio_read(volume
->client
, page_number
, &page
->buffer
);
558 mutex_lock(&volume
->read_threads_mutex
);
559 if (IS_ERR(page_data
)) {
560 result
= -PTR_ERR(page_data
);
561 vdo_log_warning_strerror(result
,
562 "error reading physical page %u from volume",
564 cancel_page_in_cache(&volume
->page_cache
, page_number
, page
);
568 if (entry
->invalid
) {
569 vdo_log_warning("Page %u invalidated after read", page_number
);
570 cancel_page_in_cache(&volume
->page_cache
, page_number
, page
);
574 if (!is_record_page(volume
->geometry
, page_number
)) {
575 result
= initialize_index_page(volume
, page_number
, page
);
576 if (result
!= UDS_SUCCESS
) {
577 vdo_log_warning("Error initializing chapter index page");
578 cancel_page_in_cache(&volume
->page_cache
, page_number
, page
);
583 result
= put_page_in_cache(&volume
->page_cache
, page_number
, page
);
584 if (result
!= UDS_SUCCESS
) {
585 vdo_log_warning("Error putting page %u in cache", page_number
);
586 cancel_page_in_cache(&volume
->page_cache
, page_number
, page
);
590 request
= entry
->first_request
;
591 while ((request
!= NULL
) && (result
== UDS_SUCCESS
)) {
592 result
= search_page(page
, volume
, request
, page_number
);
593 request
= request
->next_request
;
599 static void release_queued_requests(struct volume
*volume
, struct queued_read
*entry
,
602 struct page_cache
*cache
= &volume
->page_cache
;
603 u16 next_read
= cache
->read_queue_next_read
;
604 struct uds_request
*request
;
605 struct uds_request
*next
;
607 for (request
= entry
->first_request
; request
!= NULL
; request
= next
) {
608 next
= request
->next_request
;
609 request
->status
= result
;
610 request
->requeued
= true;
611 uds_enqueue_request(request
, STAGE_INDEX
);
614 entry
->reserved
= false;
616 /* Move the read_queue_first pointer as far as we can. */
617 while ((cache
->read_queue_first
!= next_read
) &&
618 (!cache
->read_queue
[cache
->read_queue_first
].reserved
))
619 advance_queue_position(&cache
->read_queue_first
);
620 uds_broadcast_cond(&volume
->read_threads_read_done_cond
);
623 static void read_thread_function(void *arg
)
625 struct volume
*volume
= arg
;
627 vdo_log_debug("reader starting");
628 mutex_lock(&volume
->read_threads_mutex
);
630 struct queued_read
*queue_entry
;
633 queue_entry
= wait_to_reserve_read_queue_entry(volume
);
634 if (volume
->read_threads_exiting
)
637 result
= process_entry(volume
, queue_entry
);
638 release_queued_requests(volume
, queue_entry
, result
);
640 mutex_unlock(&volume
->read_threads_mutex
);
641 vdo_log_debug("reader done");
644 static void get_page_and_index(struct page_cache
*cache
, u32 physical_page
,
645 int *queue_index
, struct cached_page
**page_ptr
)
652 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
653 * thread holding the read_threads_mutex.
655 * Holding only a search_pending_counter is the most frequent case.
658 * It would be unlikely for the compiler to turn the usage of index_value into two reads of
659 * cache->index, but it would be possible and very bad if those reads did not return the
662 index_value
= READ_ONCE(cache
->index
[physical_page
]);
663 queued
= (index_value
& VOLUME_CACHE_QUEUED_FLAG
) != 0;
664 index
= index_value
& ~VOLUME_CACHE_QUEUED_FLAG
;
666 if (!queued
&& (index
< cache
->cache_slots
)) {
667 *page_ptr
= &cache
->cache
[index
];
669 * We have acquired access to the cached page, but unless we hold the
670 * read_threads_mutex, we need a read memory barrier now. The corresponding write
671 * memory barrier is in put_page_in_cache().
678 *queue_index
= queued
? index
: -1;
681 static void get_page_from_cache(struct page_cache
*cache
, u32 physical_page
,
682 struct cached_page
**page
)
685 * ASSERTION: We are in a zone thread.
686 * ASSERTION: We holding a search_pending_counter or the read_threads_mutex.
688 int queue_index
= -1;
690 get_page_and_index(cache
, physical_page
, &queue_index
, page
);
693 static int read_page_locked(struct volume
*volume
, u32 physical_page
,
694 struct cached_page
**page_ptr
)
696 int result
= UDS_SUCCESS
;
697 struct cached_page
*page
= NULL
;
700 page
= select_victim_in_cache(&volume
->page_cache
);
701 page_data
= dm_bufio_read(volume
->client
, physical_page
, &page
->buffer
);
702 if (IS_ERR(page_data
)) {
703 result
= -PTR_ERR(page_data
);
704 vdo_log_warning_strerror(result
,
705 "error reading physical page %u from volume",
707 cancel_page_in_cache(&volume
->page_cache
, physical_page
, page
);
711 if (!is_record_page(volume
->geometry
, physical_page
)) {
712 result
= initialize_index_page(volume
, physical_page
, page
);
713 if (result
!= UDS_SUCCESS
) {
714 if (volume
->lookup_mode
!= LOOKUP_FOR_REBUILD
)
715 vdo_log_warning("Corrupt index page %u", physical_page
);
716 cancel_page_in_cache(&volume
->page_cache
, physical_page
, page
);
721 result
= put_page_in_cache(&volume
->page_cache
, physical_page
, page
);
722 if (result
!= UDS_SUCCESS
) {
723 vdo_log_warning("Error putting page %u in cache", physical_page
);
724 cancel_page_in_cache(&volume
->page_cache
, physical_page
, page
);
732 /* Retrieve a page from the cache while holding the read threads mutex. */
733 static int get_volume_page_locked(struct volume
*volume
, u32 physical_page
,
734 struct cached_page
**page_ptr
)
737 struct cached_page
*page
= NULL
;
739 get_page_from_cache(&volume
->page_cache
, physical_page
, &page
);
741 result
= read_page_locked(volume
, physical_page
, &page
);
742 if (result
!= UDS_SUCCESS
)
745 make_page_most_recent(&volume
->page_cache
, page
);
752 /* Retrieve a page from the cache while holding a search_pending lock. */
753 static int get_volume_page_protected(struct volume
*volume
, struct uds_request
*request
,
754 u32 physical_page
, struct cached_page
**page_ptr
)
756 struct cached_page
*page
;
758 get_page_from_cache(&volume
->page_cache
, physical_page
, &page
);
760 if (request
->zone_number
== 0) {
761 /* Only one zone is allowed to update the LRU. */
762 make_page_most_recent(&volume
->page_cache
, page
);
769 /* Prepare to enqueue a read for the page. */
770 end_pending_search(&volume
->page_cache
, request
->zone_number
);
771 mutex_lock(&volume
->read_threads_mutex
);
774 * Do the lookup again while holding the read mutex (no longer the fast case so this should
775 * be fine to repeat). We need to do this because a page may have been added to the cache
776 * by a reader thread between the time we searched above and the time we went to actually
777 * try to enqueue it below. This could result in us enqueuing another read for a page which
778 * is already in the cache, which would mean we end up with two entries in the cache for
781 get_page_from_cache(&volume
->page_cache
, physical_page
, &page
);
783 enqueue_page_read(volume
, request
, physical_page
);
785 * The performance gain from unlocking first, while "search pending" mode is off,
786 * turns out to be significant in some cases. The page is not available yet so
787 * the order does not matter for correctness as it does below.
789 mutex_unlock(&volume
->read_threads_mutex
);
790 begin_pending_search(&volume
->page_cache
, physical_page
,
791 request
->zone_number
);
796 * Now that the page is loaded, the volume needs to switch to "reader thread unlocked" and
797 * "search pending" state in careful order so no other thread can mess with the data before
798 * the caller gets to look at it.
800 begin_pending_search(&volume
->page_cache
, physical_page
, request
->zone_number
);
801 mutex_unlock(&volume
->read_threads_mutex
);
806 static int get_volume_page(struct volume
*volume
, u32 chapter
, u32 page_number
,
807 struct cached_page
**page_ptr
)
810 u32 physical_page
= map_to_physical_page(volume
->geometry
, chapter
, page_number
);
812 mutex_lock(&volume
->read_threads_mutex
);
813 result
= get_volume_page_locked(volume
, physical_page
, page_ptr
);
814 mutex_unlock(&volume
->read_threads_mutex
);
818 int uds_get_volume_record_page(struct volume
*volume
, u32 chapter
, u32 page_number
,
822 struct cached_page
*page
= NULL
;
824 result
= get_volume_page(volume
, chapter
, page_number
, &page
);
825 if (result
== UDS_SUCCESS
)
826 *data_ptr
= dm_bufio_get_block_data(page
->buffer
);
830 int uds_get_volume_index_page(struct volume
*volume
, u32 chapter
, u32 page_number
,
831 struct delta_index_page
**index_page_ptr
)
834 struct cached_page
*page
= NULL
;
836 result
= get_volume_page(volume
, chapter
, page_number
, &page
);
837 if (result
== UDS_SUCCESS
)
838 *index_page_ptr
= &page
->index_page
;
843 * Find the record page associated with a name in a given index page. This will return UDS_QUEUED
844 * if the page in question must be read from storage.
846 static int search_cached_index_page(struct volume
*volume
, struct uds_request
*request
,
847 u32 chapter
, u32 index_page_number
,
848 u16
*record_page_number
)
851 struct cached_page
*page
= NULL
;
852 u32 physical_page
= map_to_physical_page(volume
->geometry
, chapter
,
856 * Make sure the invalidate counter is updated before we try and read the mapping. This
857 * prevents this thread from reading a page in the cache which has already been marked for
858 * invalidation by the reader thread, before the reader thread has noticed that the
859 * invalidate_counter has been incremented.
861 begin_pending_search(&volume
->page_cache
, physical_page
, request
->zone_number
);
863 result
= get_volume_page_protected(volume
, request
, physical_page
, &page
);
864 if (result
!= UDS_SUCCESS
) {
865 end_pending_search(&volume
->page_cache
, request
->zone_number
);
869 result
= uds_search_chapter_index_page(&page
->index_page
, volume
->geometry
,
870 &request
->record_name
,
872 end_pending_search(&volume
->page_cache
, request
->zone_number
);
877 * Find the metadata associated with a name in a given record page. This will return UDS_QUEUED if
878 * the page in question must be read from storage.
880 int uds_search_cached_record_page(struct volume
*volume
, struct uds_request
*request
,
881 u32 chapter
, u16 record_page_number
, bool *found
)
883 struct cached_page
*record_page
;
884 struct index_geometry
*geometry
= volume
->geometry
;
886 u32 physical_page
, page_number
;
889 if (record_page_number
== NO_CHAPTER_INDEX_ENTRY
)
892 result
= VDO_ASSERT(record_page_number
< geometry
->record_pages_per_chapter
,
893 "0 <= %d < %u", record_page_number
,
894 geometry
->record_pages_per_chapter
);
895 if (result
!= VDO_SUCCESS
)
898 page_number
= geometry
->index_pages_per_chapter
+ record_page_number
;
900 physical_page
= map_to_physical_page(volume
->geometry
, chapter
, page_number
);
903 * Make sure the invalidate counter is updated before we try and read the mapping. This
904 * prevents this thread from reading a page in the cache which has already been marked for
905 * invalidation by the reader thread, before the reader thread has noticed that the
906 * invalidate_counter has been incremented.
908 begin_pending_search(&volume
->page_cache
, physical_page
, request
->zone_number
);
910 result
= get_volume_page_protected(volume
, request
, physical_page
, &record_page
);
911 if (result
!= UDS_SUCCESS
) {
912 end_pending_search(&volume
->page_cache
, request
->zone_number
);
916 if (search_record_page(dm_bufio_get_block_data(record_page
->buffer
),
917 &request
->record_name
, geometry
, &request
->old_metadata
))
920 end_pending_search(&volume
->page_cache
, request
->zone_number
);
924 void uds_prefetch_volume_chapter(const struct volume
*volume
, u32 chapter
)
926 const struct index_geometry
*geometry
= volume
->geometry
;
927 u32 physical_page
= map_to_physical_page(geometry
, chapter
, 0);
929 dm_bufio_prefetch(volume
->client
, physical_page
, geometry
->pages_per_chapter
);
932 int uds_read_chapter_index_from_volume(const struct volume
*volume
, u64 virtual_chapter
,
933 struct dm_buffer
*volume_buffers
[],
934 struct delta_index_page index_pages
[])
938 const struct index_geometry
*geometry
= volume
->geometry
;
939 u32 physical_chapter
= uds_map_to_physical_chapter(geometry
, virtual_chapter
);
940 u32 physical_page
= map_to_physical_page(geometry
, physical_chapter
, 0);
942 dm_bufio_prefetch(volume
->client
, physical_page
, geometry
->index_pages_per_chapter
);
943 for (i
= 0; i
< geometry
->index_pages_per_chapter
; i
++) {
946 index_page
= dm_bufio_read(volume
->client
, physical_page
+ i
,
948 if (IS_ERR(index_page
)) {
949 result
= -PTR_ERR(index_page
);
950 vdo_log_warning_strerror(result
,
951 "error reading physical page %u",
956 result
= init_chapter_index_page(volume
, index_page
, physical_chapter
, i
,
958 if (result
!= UDS_SUCCESS
)
965 int uds_search_volume_page_cache(struct volume
*volume
, struct uds_request
*request
,
969 u32 physical_chapter
=
970 uds_map_to_physical_chapter(volume
->geometry
, request
->virtual_chapter
);
971 u32 index_page_number
;
972 u16 record_page_number
;
974 index_page_number
= uds_find_index_page_number(volume
->index_page_map
,
975 &request
->record_name
,
978 if (request
->location
== UDS_LOCATION_INDEX_PAGE_LOOKUP
) {
979 record_page_number
= *((u16
*) &request
->old_metadata
);
981 result
= search_cached_index_page(volume
, request
, physical_chapter
,
983 &record_page_number
);
984 if (result
!= UDS_SUCCESS
)
988 return uds_search_cached_record_page(volume
, request
, physical_chapter
,
989 record_page_number
, found
);
992 int uds_search_volume_page_cache_for_rebuild(struct volume
*volume
,
993 const struct uds_record_name
*name
,
994 u64 virtual_chapter
, bool *found
)
997 struct index_geometry
*geometry
= volume
->geometry
;
998 struct cached_page
*page
;
999 u32 physical_chapter
= uds_map_to_physical_chapter(geometry
, virtual_chapter
);
1000 u32 index_page_number
;
1001 u16 record_page_number
;
1006 uds_find_index_page_number(volume
->index_page_map
, name
,
1008 result
= get_volume_page(volume
, physical_chapter
, index_page_number
, &page
);
1009 if (result
!= UDS_SUCCESS
)
1012 result
= uds_search_chapter_index_page(&page
->index_page
, geometry
, name
,
1013 &record_page_number
);
1014 if (result
!= UDS_SUCCESS
)
1017 if (record_page_number
== NO_CHAPTER_INDEX_ENTRY
)
1020 page_number
= geometry
->index_pages_per_chapter
+ record_page_number
;
1021 result
= get_volume_page(volume
, physical_chapter
, page_number
, &page
);
1022 if (result
!= UDS_SUCCESS
)
1025 *found
= search_record_page(dm_bufio_get_block_data(page
->buffer
), name
,
1030 static void invalidate_page(struct page_cache
*cache
, u32 physical_page
)
1032 struct cached_page
*page
;
1033 int queue_index
= -1;
1035 /* We hold the read_threads_mutex. */
1036 get_page_and_index(cache
, physical_page
, &queue_index
, &page
);
1038 WRITE_ONCE(cache
->index
[page
->physical_page
], cache
->cache_slots
);
1039 wait_for_pending_searches(cache
, page
->physical_page
);
1040 clear_cache_page(cache
, page
);
1041 } else if (queue_index
> -1) {
1042 vdo_log_debug("setting pending read to invalid");
1043 cache
->read_queue
[queue_index
].invalid
= true;
1047 void uds_forget_chapter(struct volume
*volume
, u64 virtual_chapter
)
1049 u32 physical_chapter
=
1050 uds_map_to_physical_chapter(volume
->geometry
, virtual_chapter
);
1051 u32 first_page
= map_to_physical_page(volume
->geometry
, physical_chapter
, 0);
1054 vdo_log_debug("forgetting chapter %llu", (unsigned long long) virtual_chapter
);
1055 mutex_lock(&volume
->read_threads_mutex
);
1056 for (i
= 0; i
< volume
->geometry
->pages_per_chapter
; i
++)
1057 invalidate_page(&volume
->page_cache
, first_page
+ i
);
1058 mutex_unlock(&volume
->read_threads_mutex
);
1062 * Donate an index pages from a newly written chapter to the page cache since it is likely to be
1063 * used again soon. The caller must already hold the reader thread mutex.
1065 static int donate_index_page_locked(struct volume
*volume
, u32 physical_chapter
,
1066 u32 index_page_number
, struct dm_buffer
*page_buffer
)
1069 struct cached_page
*page
= NULL
;
1071 map_to_physical_page(volume
->geometry
, physical_chapter
,
1074 page
= select_victim_in_cache(&volume
->page_cache
);
1075 page
->buffer
= page_buffer
;
1076 result
= init_chapter_index_page(volume
, dm_bufio_get_block_data(page_buffer
),
1077 physical_chapter
, index_page_number
,
1079 if (result
!= UDS_SUCCESS
) {
1080 vdo_log_warning("Error initialize chapter index page");
1081 cancel_page_in_cache(&volume
->page_cache
, physical_page
, page
);
1085 result
= put_page_in_cache(&volume
->page_cache
, physical_page
, page
);
1086 if (result
!= UDS_SUCCESS
) {
1087 vdo_log_warning("Error putting page %u in cache", physical_page
);
1088 cancel_page_in_cache(&volume
->page_cache
, physical_page
, page
);
1095 static int write_index_pages(struct volume
*volume
, u32 physical_chapter_number
,
1096 struct open_chapter_index
*chapter_index
)
1098 struct index_geometry
*geometry
= volume
->geometry
;
1099 struct dm_buffer
*page_buffer
;
1100 u32 first_index_page
= map_to_physical_page(geometry
, physical_chapter_number
, 0);
1101 u32 delta_list_number
= 0;
1102 u32 index_page_number
;
1104 for (index_page_number
= 0;
1105 index_page_number
< geometry
->index_pages_per_chapter
;
1106 index_page_number
++) {
1108 u32 physical_page
= first_index_page
+ index_page_number
;
1113 page_data
= dm_bufio_new(volume
->client
, physical_page
, &page_buffer
);
1114 if (IS_ERR(page_data
)) {
1115 return vdo_log_warning_strerror(-PTR_ERR(page_data
),
1116 "failed to prepare index page");
1119 last_page
= ((index_page_number
+ 1) == geometry
->index_pages_per_chapter
);
1120 result
= uds_pack_open_chapter_index_page(chapter_index
, page_data
,
1121 delta_list_number
, last_page
,
1123 if (result
!= UDS_SUCCESS
) {
1124 dm_bufio_release(page_buffer
);
1125 return vdo_log_warning_strerror(result
,
1126 "failed to pack index page");
1129 dm_bufio_mark_buffer_dirty(page_buffer
);
1131 if (lists_packed
== 0) {
1132 vdo_log_debug("no delta lists packed on chapter %u page %u",
1133 physical_chapter_number
, index_page_number
);
1135 delta_list_number
+= lists_packed
;
1138 uds_update_index_page_map(volume
->index_page_map
,
1139 chapter_index
->virtual_chapter_number
,
1140 physical_chapter_number
, index_page_number
,
1141 delta_list_number
- 1);
1143 mutex_lock(&volume
->read_threads_mutex
);
1144 result
= donate_index_page_locked(volume
, physical_chapter_number
,
1145 index_page_number
, page_buffer
);
1146 mutex_unlock(&volume
->read_threads_mutex
);
1147 if (result
!= UDS_SUCCESS
) {
1148 dm_bufio_release(page_buffer
);
1156 static u32
encode_tree(u8 record_page
[],
1157 const struct uds_volume_record
*sorted_pointers
[],
1158 u32 next_record
, u32 node
, u32 node_count
)
1160 if (node
< node_count
) {
1161 u32 child
= (2 * node
) + 1;
1163 next_record
= encode_tree(record_page
, sorted_pointers
, next_record
,
1167 * In-order traversal: copy the contents of the next record into the page at the
1170 memcpy(&record_page
[node
* BYTES_PER_RECORD
],
1171 sorted_pointers
[next_record
++], BYTES_PER_RECORD
);
1173 next_record
= encode_tree(record_page
, sorted_pointers
, next_record
,
1174 child
+ 1, node_count
);
1180 static int encode_record_page(const struct volume
*volume
,
1181 const struct uds_volume_record records
[], u8 record_page
[])
1185 u32 records_per_page
= volume
->geometry
->records_per_page
;
1186 const struct uds_volume_record
**record_pointers
= volume
->record_pointers
;
1188 for (i
= 0; i
< records_per_page
; i
++)
1189 record_pointers
[i
] = &records
[i
];
1192 * Sort the record pointers by using just the names in the records, which is less work than
1193 * sorting the entire record values.
1195 BUILD_BUG_ON(offsetof(struct uds_volume_record
, name
) != 0);
1196 result
= uds_radix_sort(volume
->radix_sorter
, (const u8
**) record_pointers
,
1197 records_per_page
, UDS_RECORD_NAME_SIZE
);
1198 if (result
!= UDS_SUCCESS
)
1201 encode_tree(record_page
, record_pointers
, 0, 0, records_per_page
);
1205 static int write_record_pages(struct volume
*volume
, u32 physical_chapter_number
,
1206 const struct uds_volume_record
*records
)
1208 u32 record_page_number
;
1209 struct index_geometry
*geometry
= volume
->geometry
;
1210 struct dm_buffer
*page_buffer
;
1211 const struct uds_volume_record
*next_record
= records
;
1212 u32 first_record_page
= map_to_physical_page(geometry
, physical_chapter_number
,
1213 geometry
->index_pages_per_chapter
);
1215 for (record_page_number
= 0;
1216 record_page_number
< geometry
->record_pages_per_chapter
;
1217 record_page_number
++) {
1219 u32 physical_page
= first_record_page
+ record_page_number
;
1222 page_data
= dm_bufio_new(volume
->client
, physical_page
, &page_buffer
);
1223 if (IS_ERR(page_data
)) {
1224 return vdo_log_warning_strerror(-PTR_ERR(page_data
),
1225 "failed to prepare record page");
1228 result
= encode_record_page(volume
, next_record
, page_data
);
1229 if (result
!= UDS_SUCCESS
) {
1230 dm_bufio_release(page_buffer
);
1231 return vdo_log_warning_strerror(result
,
1232 "failed to encode record page %u",
1233 record_page_number
);
1236 next_record
+= geometry
->records_per_page
;
1237 dm_bufio_mark_buffer_dirty(page_buffer
);
1238 dm_bufio_release(page_buffer
);
1244 int uds_write_chapter(struct volume
*volume
, struct open_chapter_index
*chapter_index
,
1245 const struct uds_volume_record
*records
)
1248 u32 physical_chapter_number
=
1249 uds_map_to_physical_chapter(volume
->geometry
,
1250 chapter_index
->virtual_chapter_number
);
1252 result
= write_index_pages(volume
, physical_chapter_number
, chapter_index
);
1253 if (result
!= UDS_SUCCESS
)
1256 result
= write_record_pages(volume
, physical_chapter_number
, records
);
1257 if (result
!= UDS_SUCCESS
)
1260 result
= -dm_bufio_write_dirty_buffers(volume
->client
);
1261 if (result
!= UDS_SUCCESS
)
1262 vdo_log_error_strerror(result
, "cannot sync chapter to volume");
1267 static void probe_chapter(struct volume
*volume
, u32 chapter_number
,
1268 u64
*virtual_chapter_number
)
1270 const struct index_geometry
*geometry
= volume
->geometry
;
1271 u32 expected_list_number
= 0;
1273 u64 vcn
= BAD_CHAPTER
;
1275 *virtual_chapter_number
= BAD_CHAPTER
;
1276 dm_bufio_prefetch(volume
->client
,
1277 map_to_physical_page(geometry
, chapter_number
, 0),
1278 geometry
->index_pages_per_chapter
);
1280 for (i
= 0; i
< geometry
->index_pages_per_chapter
; i
++) {
1281 struct delta_index_page
*page
;
1284 result
= uds_get_volume_index_page(volume
, chapter_number
, i
, &page
);
1285 if (result
!= UDS_SUCCESS
)
1288 if (page
->virtual_chapter_number
== BAD_CHAPTER
) {
1289 vdo_log_error("corrupt index page in chapter %u",
1294 if (vcn
== BAD_CHAPTER
) {
1295 vcn
= page
->virtual_chapter_number
;
1296 } else if (page
->virtual_chapter_number
!= vcn
) {
1297 vdo_log_error("inconsistent chapter %u index page %u: expected vcn %llu, got vcn %llu",
1298 chapter_number
, i
, (unsigned long long) vcn
,
1299 (unsigned long long) page
->virtual_chapter_number
);
1303 if (expected_list_number
!= page
->lowest_list_number
) {
1304 vdo_log_error("inconsistent chapter %u index page %u: expected list number %u, got list number %u",
1305 chapter_number
, i
, expected_list_number
,
1306 page
->lowest_list_number
);
1309 expected_list_number
= page
->highest_list_number
+ 1;
1311 result
= uds_validate_chapter_index_page(page
, geometry
);
1312 if (result
!= UDS_SUCCESS
)
1316 if (chapter_number
!= uds_map_to_physical_chapter(geometry
, vcn
)) {
1317 vdo_log_error("chapter %u vcn %llu is out of phase (%u)", chapter_number
,
1318 (unsigned long long) vcn
, geometry
->chapters_per_volume
);
1322 *virtual_chapter_number
= vcn
;
1325 /* Find the last valid physical chapter in the volume. */
1326 static void find_real_end_of_volume(struct volume
*volume
, u32 limit
, u32
*limit_ptr
)
1332 u32 chapter
= (span
> limit
) ? 0 : limit
- span
;
1335 probe_chapter(volume
, chapter
, &vcn
);
1336 if (vcn
== BAD_CHAPTER
) {
1351 static int find_chapter_limits(struct volume
*volume
, u32 chapter_limit
, u64
*lowest_vcn
,
1354 struct index_geometry
*geometry
= volume
->geometry
;
1356 u64 lowest
= BAD_CHAPTER
;
1357 u64 highest
= BAD_CHAPTER
;
1358 u64 moved_chapter
= BAD_CHAPTER
;
1359 u32 left_chapter
= 0;
1360 u32 right_chapter
= 0;
1361 u32 bad_chapters
= 0;
1364 * This method assumes there is at most one run of contiguous bad chapters caused by
1365 * unflushed writes. Either the bad spot is at the beginning and end, or somewhere in the
1366 * middle. Wherever it is, the highest and lowest VCNs are adjacent to it. Otherwise the
1367 * volume is cleanly saved and somewhere in the middle of it the highest VCN immediately
1368 * precedes the lowest one.
1371 /* It doesn't matter if this results in a bad spot (BAD_CHAPTER). */
1372 probe_chapter(volume
, 0, &zero_vcn
);
1375 * Binary search for end of the discontinuity in the monotonically increasing virtual
1376 * chapter numbers; bad spots are treated as a span of BAD_CHAPTER values. In effect we're
1377 * searching for the index of the smallest value less than zero_vcn. In the case we go off
1378 * the end it means that chapter 0 has the lowest vcn.
1380 * If a virtual chapter is out-of-order, it will be the one moved by conversion. Always
1381 * skip over the moved chapter when searching, adding it to the range at the end if
1384 if (geometry
->remapped_physical
> 0) {
1387 probe_chapter(volume
, geometry
->remapped_physical
, &remapped_vcn
);
1388 if (remapped_vcn
== geometry
->remapped_virtual
)
1389 moved_chapter
= geometry
->remapped_physical
;
1393 right_chapter
= chapter_limit
;
1395 while (left_chapter
< right_chapter
) {
1397 u32 chapter
= (left_chapter
+ right_chapter
) / 2;
1399 if (chapter
== moved_chapter
)
1402 probe_chapter(volume
, chapter
, &probe_vcn
);
1403 if (zero_vcn
<= probe_vcn
) {
1404 left_chapter
= chapter
+ 1;
1405 if (left_chapter
== moved_chapter
)
1408 right_chapter
= chapter
;
1412 /* If left_chapter goes off the end, chapter 0 has the lowest virtual chapter number.*/
1413 if (left_chapter
>= chapter_limit
)
1416 /* At this point, left_chapter is the chapter with the lowest virtual chapter number. */
1417 probe_chapter(volume
, left_chapter
, &lowest
);
1419 /* The moved chapter might be the lowest in the range. */
1420 if ((moved_chapter
!= BAD_CHAPTER
) && (lowest
== geometry
->remapped_virtual
+ 1))
1421 lowest
= geometry
->remapped_virtual
;
1424 * Circularly scan backwards, moving over any bad chapters until encountering a good one,
1425 * which is the chapter with the highest vcn.
1427 while (highest
== BAD_CHAPTER
) {
1428 right_chapter
= (right_chapter
+ chapter_limit
- 1) % chapter_limit
;
1429 if (right_chapter
== moved_chapter
)
1432 probe_chapter(volume
, right_chapter
, &highest
);
1433 if (bad_chapters
++ >= MAX_BAD_CHAPTERS
) {
1434 vdo_log_error("too many bad chapters in volume: %u",
1436 return UDS_CORRUPT_DATA
;
1440 *lowest_vcn
= lowest
;
1441 *highest_vcn
= highest
;
1446 * Find the highest and lowest contiguous chapters present in the volume and determine their
1447 * virtual chapter numbers. This is used by rebuild.
1449 int uds_find_volume_chapter_boundaries(struct volume
*volume
, u64
*lowest_vcn
,
1450 u64
*highest_vcn
, bool *is_empty
)
1452 u32 chapter_limit
= volume
->geometry
->chapters_per_volume
;
1454 find_real_end_of_volume(volume
, chapter_limit
, &chapter_limit
);
1455 if (chapter_limit
== 0) {
1463 return find_chapter_limits(volume
, chapter_limit
, lowest_vcn
, highest_vcn
);
1466 int __must_check
uds_replace_volume_storage(struct volume
*volume
,
1467 struct index_layout
*layout
,
1468 struct block_device
*bdev
)
1473 result
= uds_replace_index_layout_storage(layout
, bdev
);
1474 if (result
!= UDS_SUCCESS
)
1477 /* Release all outstanding dm_bufio objects */
1478 for (i
= 0; i
< volume
->page_cache
.indexable_pages
; i
++)
1479 volume
->page_cache
.index
[i
] = volume
->page_cache
.cache_slots
;
1480 for (i
= 0; i
< volume
->page_cache
.cache_slots
; i
++)
1481 clear_cache_page(&volume
->page_cache
, &volume
->page_cache
.cache
[i
]);
1482 if (volume
->sparse_cache
!= NULL
)
1483 uds_invalidate_sparse_cache(volume
->sparse_cache
);
1484 if (volume
->client
!= NULL
)
1485 dm_bufio_client_destroy(vdo_forget(volume
->client
));
1487 return uds_open_volume_bufio(layout
, volume
->geometry
->bytes_per_page
,
1488 volume
->reserved_buffers
, &volume
->client
);
1491 static int __must_check
initialize_page_cache(struct page_cache
*cache
,
1492 const struct index_geometry
*geometry
,
1493 u32 chapters_in_cache
,
1494 unsigned int zone_count
)
1499 cache
->indexable_pages
= geometry
->pages_per_volume
+ 1;
1500 cache
->cache_slots
= chapters_in_cache
* geometry
->record_pages_per_chapter
;
1501 cache
->zone_count
= zone_count
;
1502 atomic64_set(&cache
->clock
, 1);
1504 result
= VDO_ASSERT((cache
->cache_slots
<= VOLUME_CACHE_MAX_ENTRIES
),
1505 "requested cache size, %u, within limit %u",
1506 cache
->cache_slots
, VOLUME_CACHE_MAX_ENTRIES
);
1507 if (result
!= VDO_SUCCESS
)
1510 result
= vdo_allocate(VOLUME_CACHE_MAX_QUEUED_READS
, struct queued_read
,
1511 "volume read queue", &cache
->read_queue
);
1512 if (result
!= VDO_SUCCESS
)
1515 result
= vdo_allocate(cache
->zone_count
, struct search_pending_counter
,
1516 "Volume Cache Zones", &cache
->search_pending_counters
);
1517 if (result
!= VDO_SUCCESS
)
1520 result
= vdo_allocate(cache
->indexable_pages
, u16
, "page cache index",
1522 if (result
!= VDO_SUCCESS
)
1525 result
= vdo_allocate(cache
->cache_slots
, struct cached_page
, "page cache cache",
1527 if (result
!= VDO_SUCCESS
)
1530 /* Initialize index values to invalid values. */
1531 for (i
= 0; i
< cache
->indexable_pages
; i
++)
1532 cache
->index
[i
] = cache
->cache_slots
;
1534 for (i
= 0; i
< cache
->cache_slots
; i
++)
1535 clear_cache_page(cache
, &cache
->cache
[i
]);
1540 int uds_make_volume(const struct uds_configuration
*config
, struct index_layout
*layout
,
1541 struct volume
**new_volume
)
1544 struct volume
*volume
= NULL
;
1545 struct index_geometry
*geometry
;
1546 unsigned int reserved_buffers
;
1549 result
= vdo_allocate(1, struct volume
, "volume", &volume
);
1550 if (result
!= VDO_SUCCESS
)
1553 volume
->nonce
= uds_get_volume_nonce(layout
);
1555 result
= uds_copy_index_geometry(config
->geometry
, &volume
->geometry
);
1556 if (result
!= UDS_SUCCESS
) {
1557 uds_free_volume(volume
);
1558 return vdo_log_warning_strerror(result
,
1559 "failed to allocate geometry: error");
1561 geometry
= volume
->geometry
;
1564 * Reserve a buffer for each entry in the page cache, one for the chapter writer, and one
1565 * for each entry in the sparse cache.
1567 reserved_buffers
= config
->cache_chapters
* geometry
->record_pages_per_chapter
;
1568 reserved_buffers
+= 1;
1569 if (uds_is_sparse_index_geometry(geometry
))
1570 reserved_buffers
+= (config
->cache_chapters
* geometry
->index_pages_per_chapter
);
1571 volume
->reserved_buffers
= reserved_buffers
;
1572 result
= uds_open_volume_bufio(layout
, geometry
->bytes_per_page
,
1573 volume
->reserved_buffers
, &volume
->client
);
1574 if (result
!= UDS_SUCCESS
) {
1575 uds_free_volume(volume
);
1579 result
= uds_make_radix_sorter(geometry
->records_per_page
,
1580 &volume
->radix_sorter
);
1581 if (result
!= UDS_SUCCESS
) {
1582 uds_free_volume(volume
);
1586 result
= vdo_allocate(geometry
->records_per_page
,
1587 const struct uds_volume_record
*, "record pointers",
1588 &volume
->record_pointers
);
1589 if (result
!= VDO_SUCCESS
) {
1590 uds_free_volume(volume
);
1594 if (uds_is_sparse_index_geometry(geometry
)) {
1595 size_t page_size
= sizeof(struct delta_index_page
) + geometry
->bytes_per_page
;
1597 result
= uds_make_sparse_cache(geometry
, config
->cache_chapters
,
1599 &volume
->sparse_cache
);
1600 if (result
!= UDS_SUCCESS
) {
1601 uds_free_volume(volume
);
1605 volume
->cache_size
=
1606 page_size
* geometry
->index_pages_per_chapter
* config
->cache_chapters
;
1609 result
= initialize_page_cache(&volume
->page_cache
, geometry
,
1610 config
->cache_chapters
, config
->zone_count
);
1611 if (result
!= UDS_SUCCESS
) {
1612 uds_free_volume(volume
);
1616 volume
->cache_size
+= volume
->page_cache
.cache_slots
* sizeof(struct delta_index_page
);
1617 result
= uds_make_index_page_map(geometry
, &volume
->index_page_map
);
1618 if (result
!= UDS_SUCCESS
) {
1619 uds_free_volume(volume
);
1623 mutex_init(&volume
->read_threads_mutex
);
1624 uds_init_cond(&volume
->read_threads_read_done_cond
);
1625 uds_init_cond(&volume
->read_threads_cond
);
1627 result
= vdo_allocate(config
->read_threads
, struct thread
*, "reader threads",
1628 &volume
->reader_threads
);
1629 if (result
!= VDO_SUCCESS
) {
1630 uds_free_volume(volume
);
1634 for (i
= 0; i
< config
->read_threads
; i
++) {
1635 result
= vdo_create_thread(read_thread_function
, (void *) volume
,
1636 "reader", &volume
->reader_threads
[i
]);
1637 if (result
!= VDO_SUCCESS
) {
1638 uds_free_volume(volume
);
1642 volume
->read_thread_count
= i
+ 1;
1645 *new_volume
= volume
;
1649 static void uninitialize_page_cache(struct page_cache
*cache
)
1653 if (cache
->cache
!= NULL
) {
1654 for (i
= 0; i
< cache
->cache_slots
; i
++)
1655 release_page_buffer(&cache
->cache
[i
]);
1657 vdo_free(cache
->index
);
1658 vdo_free(cache
->cache
);
1659 vdo_free(cache
->search_pending_counters
);
1660 vdo_free(cache
->read_queue
);
1663 void uds_free_volume(struct volume
*volume
)
1668 if (volume
->reader_threads
!= NULL
) {
1671 /* This works even if some threads weren't started. */
1672 mutex_lock(&volume
->read_threads_mutex
);
1673 volume
->read_threads_exiting
= true;
1674 uds_broadcast_cond(&volume
->read_threads_cond
);
1675 mutex_unlock(&volume
->read_threads_mutex
);
1676 for (i
= 0; i
< volume
->read_thread_count
; i
++)
1677 vdo_join_threads(volume
->reader_threads
[i
]);
1678 vdo_free(volume
->reader_threads
);
1679 volume
->reader_threads
= NULL
;
1682 /* Must destroy the client AFTER freeing the cached pages. */
1683 uninitialize_page_cache(&volume
->page_cache
);
1684 uds_free_sparse_cache(volume
->sparse_cache
);
1685 if (volume
->client
!= NULL
)
1686 dm_bufio_client_destroy(vdo_forget(volume
->client
));
1688 uds_free_index_page_map(volume
->index_page_map
);
1689 uds_free_radix_sorter(volume
->radix_sorter
);
1690 vdo_free(volume
->geometry
);
1691 vdo_free(volume
->record_pointers
);