1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
9 #include <linux/ratelimit.h>
13 #include "memory-alloc.h"
14 #include "permassert.h"
16 #include "action-manager.h"
17 #include "admin-state.h"
18 #include "completion.h"
19 #include "constants.h"
21 #include "encodings.h"
22 #include "io-submitter.h"
23 #include "physical-zone.h"
24 #include "recovery-journal.h"
25 #include "slab-depot.h"
26 #include "status-codes.h"
30 #include "wait-queue.h"
35 * The block map era, or maximum age, is used as follows:
37 * Each block map page, when dirty, records the earliest recovery journal block sequence number of
38 * the changes reflected in that dirty block. Sequence numbers are classified into eras: every
39 * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras
40 * according to the sequence number they record.
42 * In the current (newest) era, block map pages are not written unless there is cache pressure. In
43 * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in
44 * this era are issued for write. In all older eras, pages are issued for write immediately.
47 struct page_descriptor
{
48 root_count_t root_index
;
50 page_number_t page_index
;
55 struct page_descriptor descriptor
;
59 struct write_if_not_dirtied_context
{
60 struct block_map_zone
*zone
;
64 struct block_map_tree_segment
{
65 struct tree_page
*levels
[VDO_BLOCK_MAP_TREE_HEIGHT
];
68 struct block_map_tree
{
69 struct block_map_tree_segment
*segments
;
73 struct block_map
*map
;
75 struct boundary
*boundaries
;
76 struct tree_page
**pages
;
77 struct block_map_tree trees
[];
81 page_number_t page_index
;
88 struct vdo_waiter waiter
;
89 struct block_map_tree
*tree
;
91 struct cursors
*parent
;
92 struct boundary boundary
;
93 struct cursor_level levels
[VDO_BLOCK_MAP_TREE_HEIGHT
];
94 struct pooled_vio
*vio
;
98 struct block_map_zone
*zone
;
99 struct vio_pool
*pool
;
100 vdo_entry_callback_fn entry_callback
;
101 struct vdo_completion
*completion
;
102 root_count_t active_roots
;
103 struct cursor cursors
[];
106 static const physical_block_number_t NO_PAGE
= 0xFFFFFFFFFFFFFFFF;
108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */
109 static const physical_block_number_t VDO_INVALID_PBN
= 0xFFFFFFFFFFFFFFFF;
111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY
= {
112 .mapping_state
= VDO_MAPPING_STATE_UNMAPPED
& 0x0F,
113 .pbn_high_nibble
= 0,
114 .pbn_low_word
= __cpu_to_le32(VDO_ZERO_BLOCK
& UINT_MAX
),
117 #define LOG_INTERVAL 4000
118 #define DISPLAY_INTERVAL 100000
121 * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread.
122 * Prevents any compiler shenanigans from affecting other threads reading those stats.
124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta))
126 static inline bool is_dirty(const struct page_info
*info
)
128 return info
->state
== PS_DIRTY
;
131 static inline bool is_present(const struct page_info
*info
)
133 return (info
->state
== PS_RESIDENT
) || (info
->state
== PS_DIRTY
);
136 static inline bool is_in_flight(const struct page_info
*info
)
138 return (info
->state
== PS_INCOMING
) || (info
->state
== PS_OUTGOING
);
141 static inline bool is_incoming(const struct page_info
*info
)
143 return info
->state
== PS_INCOMING
;
146 static inline bool is_outgoing(const struct page_info
*info
)
148 return info
->state
== PS_OUTGOING
;
151 static inline bool is_valid(const struct page_info
*info
)
153 return is_present(info
) || is_outgoing(info
);
156 static char *get_page_buffer(struct page_info
*info
)
158 struct vdo_page_cache
*cache
= info
->cache
;
160 return &cache
->pages
[(info
- cache
->infos
) * VDO_BLOCK_SIZE
];
163 static inline struct vdo_page_completion
*page_completion_from_waiter(struct vdo_waiter
*waiter
)
165 struct vdo_page_completion
*completion
;
170 completion
= container_of(waiter
, struct vdo_page_completion
, waiter
);
171 vdo_assert_completion_type(&completion
->completion
, VDO_PAGE_COMPLETION
);
176 * initialize_info() - Initialize all page info structures and put them on the free list.
178 * Return: VDO_SUCCESS or an error.
180 static int initialize_info(struct vdo_page_cache
*cache
)
182 struct page_info
*info
;
184 INIT_LIST_HEAD(&cache
->free_list
);
185 for (info
= cache
->infos
; info
< cache
->infos
+ cache
->page_count
; info
++) {
189 info
->state
= PS_FREE
;
192 result
= create_metadata_vio(cache
->vdo
, VIO_TYPE_BLOCK_MAP
,
193 VIO_PRIORITY_METADATA
, info
,
194 get_page_buffer(info
), &info
->vio
);
195 if (result
!= VDO_SUCCESS
)
198 /* The thread ID should never change. */
199 info
->vio
->completion
.callback_thread_id
= cache
->zone
->thread_id
;
201 INIT_LIST_HEAD(&info
->state_entry
);
202 list_add_tail(&info
->state_entry
, &cache
->free_list
);
203 INIT_LIST_HEAD(&info
->lru_entry
);
210 * allocate_cache_components() - Allocate components of the cache which require their own
213 * The caller is responsible for all clean up on errors.
215 * Return: VDO_SUCCESS or an error code.
217 static int __must_check
allocate_cache_components(struct vdo_page_cache
*cache
)
219 u64 size
= cache
->page_count
* (u64
) VDO_BLOCK_SIZE
;
222 result
= vdo_allocate(cache
->page_count
, struct page_info
, "page infos",
224 if (result
!= VDO_SUCCESS
)
227 result
= vdo_allocate_memory(size
, VDO_BLOCK_SIZE
, "cache pages", &cache
->pages
);
228 if (result
!= VDO_SUCCESS
)
231 result
= vdo_int_map_create(cache
->page_count
, &cache
->page_map
);
232 if (result
!= VDO_SUCCESS
)
235 return initialize_info(cache
);
239 * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's
242 static inline void assert_on_cache_thread(struct vdo_page_cache
*cache
,
243 const char *function_name
)
245 thread_id_t thread_id
= vdo_get_callback_thread_id();
247 VDO_ASSERT_LOG_ONLY((thread_id
== cache
->zone
->thread_id
),
248 "%s() must only be called on cache thread %d, not thread %d",
249 function_name
, cache
->zone
->thread_id
, thread_id
);
252 /** assert_io_allowed() - Assert that a page cache may issue I/O. */
253 static inline void assert_io_allowed(struct vdo_page_cache
*cache
)
255 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache
->zone
->state
),
256 "VDO page cache may issue I/O");
259 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */
260 static void report_cache_pressure(struct vdo_page_cache
*cache
)
262 ADD_ONCE(cache
->stats
.cache_pressure
, 1);
263 if (cache
->waiter_count
> cache
->page_count
) {
264 if ((cache
->pressure_report
% LOG_INTERVAL
) == 0)
265 vdo_log_info("page cache pressure %u", cache
->stats
.cache_pressure
);
267 if (++cache
->pressure_report
>= DISPLAY_INTERVAL
)
268 cache
->pressure_report
= 0;
273 * get_page_state_name() - Return the name of a page state.
275 * If the page state is invalid a static string is returned and the invalid state is logged.
277 * Return: A pointer to a static page state name.
279 static const char * __must_check
get_page_state_name(enum vdo_page_buffer_state state
)
282 static const char * const state_names
[] = {
283 "FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING"
286 BUILD_BUG_ON(ARRAY_SIZE(state_names
) != PAGE_STATE_COUNT
);
288 result
= VDO_ASSERT(state
< ARRAY_SIZE(state_names
),
289 "Unknown page_state value %d", state
);
290 if (result
!= VDO_SUCCESS
)
291 return "[UNKNOWN PAGE STATE]";
293 return state_names
[state
];
297 * update_counter() - Update the counter associated with a given state.
298 * @info: The page info to count.
299 * @delta: The delta to apply to the counter.
301 static void update_counter(struct page_info
*info
, s32 delta
)
303 struct block_map_statistics
*stats
= &info
->cache
->stats
;
305 switch (info
->state
) {
307 ADD_ONCE(stats
->free_pages
, delta
);
311 ADD_ONCE(stats
->incoming_pages
, delta
);
315 ADD_ONCE(stats
->outgoing_pages
, delta
);
319 ADD_ONCE(stats
->failed_pages
, delta
);
323 ADD_ONCE(stats
->clean_pages
, delta
);
327 ADD_ONCE(stats
->dirty_pages
, delta
);
335 /** update_lru() - Update the lru information for an active page. */
336 static void update_lru(struct page_info
*info
)
338 if (info
->cache
->lru_list
.prev
!= &info
->lru_entry
)
339 list_move_tail(&info
->lru_entry
, &info
->cache
->lru_list
);
343 * set_info_state() - Set the state of a page_info and put it on the right list, adjusting
346 static void set_info_state(struct page_info
*info
, enum vdo_page_buffer_state new_state
)
348 if (new_state
== info
->state
)
351 update_counter(info
, -1);
352 info
->state
= new_state
;
353 update_counter(info
, 1);
355 switch (info
->state
) {
358 list_move_tail(&info
->state_entry
, &info
->cache
->free_list
);
362 list_move_tail(&info
->state_entry
, &info
->cache
->outgoing_list
);
369 list_del_init(&info
->state_entry
);
373 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */
374 static int __must_check
set_info_pbn(struct page_info
*info
, physical_block_number_t pbn
)
376 struct vdo_page_cache
*cache
= info
->cache
;
378 /* Either the new or the old page number must be NO_PAGE. */
379 int result
= VDO_ASSERT((pbn
== NO_PAGE
) || (info
->pbn
== NO_PAGE
),
380 "Must free a page before reusing it.");
381 if (result
!= VDO_SUCCESS
)
384 if (info
->pbn
!= NO_PAGE
)
385 vdo_int_map_remove(cache
->page_map
, info
->pbn
);
389 if (pbn
!= NO_PAGE
) {
390 result
= vdo_int_map_put(cache
->page_map
, pbn
, info
, true, NULL
);
391 if (result
!= VDO_SUCCESS
)
397 /** reset_page_info() - Reset page info to represent an unallocated page. */
398 static int reset_page_info(struct page_info
*info
)
402 result
= VDO_ASSERT(info
->busy
== 0, "VDO Page must not be busy");
403 if (result
!= VDO_SUCCESS
)
406 result
= VDO_ASSERT(!vdo_waitq_has_waiters(&info
->waiting
),
407 "VDO Page must not have waiters");
408 if (result
!= VDO_SUCCESS
)
411 result
= set_info_pbn(info
, NO_PAGE
);
412 set_info_state(info
, PS_FREE
);
413 list_del_init(&info
->lru_entry
);
418 * find_free_page() - Find a free page.
420 * Return: A pointer to the page info structure (if found), NULL otherwise.
422 static struct page_info
* __must_check
find_free_page(struct vdo_page_cache
*cache
)
424 struct page_info
*info
;
426 info
= list_first_entry_or_null(&cache
->free_list
, struct page_info
,
429 list_del_init(&info
->state_entry
);
435 * find_page() - Find the page info (if any) associated with a given pbn.
436 * @pbn: The absolute physical block number of the page.
438 * Return: The page info for the page if available, or NULL if not.
440 static struct page_info
* __must_check
find_page(struct vdo_page_cache
*cache
,
441 physical_block_number_t pbn
)
443 if ((cache
->last_found
!= NULL
) && (cache
->last_found
->pbn
== pbn
))
444 return cache
->last_found
;
446 cache
->last_found
= vdo_int_map_get(cache
->page_map
, pbn
);
447 return cache
->last_found
;
451 * select_lru_page() - Determine which page is least recently used.
453 * Picks the least recently used from among the non-busy entries at the front of each of the lru
454 * ring. Since whenever we mark a page busy we also put it to the end of the ring it is unlikely
455 * that the entries at the front are busy unless the queue is very short, but not impossible.
457 * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be
458 * found. The page can be dirty or resident.
460 static struct page_info
* __must_check
select_lru_page(struct vdo_page_cache
*cache
)
462 struct page_info
*info
;
464 list_for_each_entry(info
, &cache
->lru_list
, lru_entry
)
465 if ((info
->busy
== 0) && !is_in_flight(info
))
471 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */
474 * complete_with_page() - Helper to complete the VDO Page Completion request successfully.
475 * @info: The page info representing the result page.
476 * @vdo_page_comp: The VDO page completion to complete.
478 static void complete_with_page(struct page_info
*info
,
479 struct vdo_page_completion
*vdo_page_comp
)
481 bool available
= vdo_page_comp
->writable
? is_present(info
) : is_valid(info
);
484 vdo_log_error_strerror(VDO_BAD_PAGE
,
485 "Requested cache page %llu in state %s is not %s",
486 (unsigned long long) info
->pbn
,
487 get_page_state_name(info
->state
),
488 vdo_page_comp
->writable
? "present" : "valid");
489 vdo_fail_completion(&vdo_page_comp
->completion
, VDO_BAD_PAGE
);
493 vdo_page_comp
->info
= info
;
494 vdo_page_comp
->ready
= true;
495 vdo_finish_completion(&vdo_page_comp
->completion
);
499 * complete_waiter_with_error() - Complete a page completion with an error code.
500 * @waiter: The page completion, as a waiter.
501 * @result_ptr: A pointer to the error code.
503 * Implements waiter_callback_fn.
505 static void complete_waiter_with_error(struct vdo_waiter
*waiter
, void *result_ptr
)
507 int *result
= result_ptr
;
509 vdo_fail_completion(&page_completion_from_waiter(waiter
)->completion
, *result
);
513 * complete_waiter_with_page() - Complete a page completion with a page.
514 * @waiter: The page completion, as a waiter.
515 * @page_info: The page info to complete with.
517 * Implements waiter_callback_fn.
519 static void complete_waiter_with_page(struct vdo_waiter
*waiter
, void *page_info
)
521 complete_with_page(page_info
, page_completion_from_waiter(waiter
));
525 * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result.
527 * Upon completion the waitq will be empty.
529 * Return: The number of pages distributed.
531 static unsigned int distribute_page_over_waitq(struct page_info
*info
,
532 struct vdo_wait_queue
*waitq
)
537 num_pages
= vdo_waitq_num_waiters(waitq
);
540 * Increment the busy count once for each pending completion so that this page does not
541 * stop being busy until all completions have been processed.
543 info
->busy
+= num_pages
;
545 vdo_waitq_notify_all_waiters(waitq
, complete_waiter_with_page
, info
);
550 * set_persistent_error() - Set a persistent error which all requests will receive in the future.
551 * @context: A string describing what triggered the error.
553 * Once triggered, all enqueued completions will get this error. Any future requests will result in
554 * this error as well.
556 static void set_persistent_error(struct vdo_page_cache
*cache
, const char *context
,
559 struct page_info
*info
;
560 /* If we're already read-only, there's no need to log. */
561 struct vdo
*vdo
= cache
->vdo
;
563 if ((result
!= VDO_READ_ONLY
) && !vdo_is_read_only(vdo
)) {
564 vdo_log_error_strerror(result
, "VDO Page Cache persistent error: %s",
566 vdo_enter_read_only_mode(vdo
, result
);
569 assert_on_cache_thread(cache
, __func__
);
571 vdo_waitq_notify_all_waiters(&cache
->free_waiters
,
572 complete_waiter_with_error
, &result
);
573 cache
->waiter_count
= 0;
575 for (info
= cache
->infos
; info
< cache
->infos
+ cache
->page_count
; info
++) {
576 vdo_waitq_notify_all_waiters(&info
->waiting
,
577 complete_waiter_with_error
, &result
);
582 * validate_completed_page() - Check that a page completion which is being freed to the cache
583 * referred to a valid page and is in a valid state.
584 * @writable: Whether a writable page is required.
586 * Return: VDO_SUCCESS if the page was valid, otherwise as error
588 static int __must_check
validate_completed_page(struct vdo_page_completion
*completion
,
593 result
= VDO_ASSERT(completion
->ready
, "VDO Page completion not ready");
594 if (result
!= VDO_SUCCESS
)
597 result
= VDO_ASSERT(completion
->info
!= NULL
,
598 "VDO Page Completion must be complete");
599 if (result
!= VDO_SUCCESS
)
602 result
= VDO_ASSERT(completion
->info
->pbn
== completion
->pbn
,
603 "VDO Page Completion pbn must be consistent");
604 if (result
!= VDO_SUCCESS
)
607 result
= VDO_ASSERT(is_valid(completion
->info
),
608 "VDO Page Completion page must be valid");
609 if (result
!= VDO_SUCCESS
)
613 result
= VDO_ASSERT(completion
->writable
,
614 "VDO Page Completion must be writable");
615 if (result
!= VDO_SUCCESS
)
622 static void check_for_drain_complete(struct block_map_zone
*zone
)
624 if (vdo_is_state_draining(&zone
->state
) &&
625 (zone
->active_lookups
== 0) &&
626 !vdo_waitq_has_waiters(&zone
->flush_waiters
) &&
627 !is_vio_pool_busy(zone
->vio_pool
) &&
628 (zone
->page_cache
.outstanding_reads
== 0) &&
629 (zone
->page_cache
.outstanding_writes
== 0)) {
630 vdo_finish_draining_with_result(&zone
->state
,
631 (vdo_is_read_only(zone
->block_map
->vdo
) ?
632 VDO_READ_ONLY
: VDO_SUCCESS
));
636 static void enter_zone_read_only_mode(struct block_map_zone
*zone
, int result
)
638 vdo_enter_read_only_mode(zone
->block_map
->vdo
, result
);
641 * We are in read-only mode, so we won't ever write any page out.
642 * Just take all waiters off the waitq so the zone can drain.
644 vdo_waitq_init(&zone
->flush_waiters
);
645 check_for_drain_complete(zone
);
648 static bool __must_check
649 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion
*completion
,
652 int result
= validate_completed_page(completion
, writable
);
654 if (result
== VDO_SUCCESS
)
657 enter_zone_read_only_mode(completion
->info
->cache
->zone
, result
);
662 * handle_load_error() - Handle page load errors.
663 * @completion: The page read vio.
665 static void handle_load_error(struct vdo_completion
*completion
)
667 int result
= completion
->result
;
668 struct page_info
*info
= completion
->parent
;
669 struct vdo_page_cache
*cache
= info
->cache
;
671 assert_on_cache_thread(cache
, __func__
);
672 vio_record_metadata_io_error(as_vio(completion
));
673 vdo_enter_read_only_mode(cache
->zone
->block_map
->vdo
, result
);
674 ADD_ONCE(cache
->stats
.failed_reads
, 1);
675 set_info_state(info
, PS_FAILED
);
676 vdo_waitq_notify_all_waiters(&info
->waiting
, complete_waiter_with_error
, &result
);
677 reset_page_info(info
);
680 * Don't decrement until right before calling check_for_drain_complete() to
681 * ensure that the above work can't cause the page cache to be freed out from under us.
683 cache
->outstanding_reads
--;
684 check_for_drain_complete(cache
->zone
);
688 * page_is_loaded() - Callback used when a page has been loaded.
689 * @completion: The vio which has loaded the page. Its parent is the page_info.
691 static void page_is_loaded(struct vdo_completion
*completion
)
693 struct page_info
*info
= completion
->parent
;
694 struct vdo_page_cache
*cache
= info
->cache
;
695 nonce_t nonce
= info
->cache
->zone
->block_map
->nonce
;
696 struct block_map_page
*page
;
697 enum block_map_page_validity validity
;
699 assert_on_cache_thread(cache
, __func__
);
701 page
= (struct block_map_page
*) get_page_buffer(info
);
702 validity
= vdo_validate_block_map_page(page
, nonce
, info
->pbn
);
703 if (validity
== VDO_BLOCK_MAP_PAGE_BAD
) {
704 physical_block_number_t pbn
= vdo_get_block_map_page_pbn(page
);
705 int result
= vdo_log_error_strerror(VDO_BAD_PAGE
,
706 "Expected page %llu but got page %llu instead",
707 (unsigned long long) info
->pbn
,
708 (unsigned long long) pbn
);
710 vdo_continue_completion(completion
, result
);
714 if (validity
== VDO_BLOCK_MAP_PAGE_INVALID
)
715 vdo_format_block_map_page(page
, nonce
, info
->pbn
, false);
717 info
->recovery_lock
= 0;
718 set_info_state(info
, PS_RESIDENT
);
719 distribute_page_over_waitq(info
, &info
->waiting
);
722 * Don't decrement until right before calling check_for_drain_complete() to
723 * ensure that the above work can't cause the page cache to be freed out from under us.
725 cache
->outstanding_reads
--;
726 check_for_drain_complete(cache
->zone
);
730 * handle_rebuild_read_error() - Handle a read error during a read-only rebuild.
731 * @completion: The page load completion.
733 static void handle_rebuild_read_error(struct vdo_completion
*completion
)
735 struct page_info
*info
= completion
->parent
;
736 struct vdo_page_cache
*cache
= info
->cache
;
738 assert_on_cache_thread(cache
, __func__
);
741 * We are doing a read-only rebuild, so treat this as a successful read
742 * of an uninitialized page.
744 vio_record_metadata_io_error(as_vio(completion
));
745 ADD_ONCE(cache
->stats
.failed_reads
, 1);
746 memset(get_page_buffer(info
), 0, VDO_BLOCK_SIZE
);
747 vdo_reset_completion(completion
);
748 page_is_loaded(completion
);
751 static void load_cache_page_endio(struct bio
*bio
)
753 struct vio
*vio
= bio
->bi_private
;
754 struct page_info
*info
= vio
->completion
.parent
;
756 continue_vio_after_io(vio
, page_is_loaded
, info
->cache
->zone
->thread_id
);
760 * launch_page_load() - Begin the process of loading a page.
762 * Return: VDO_SUCCESS or an error code.
764 static int __must_check
launch_page_load(struct page_info
*info
,
765 physical_block_number_t pbn
)
768 vdo_action_fn callback
;
769 struct vdo_page_cache
*cache
= info
->cache
;
771 assert_io_allowed(cache
);
773 result
= set_info_pbn(info
, pbn
);
774 if (result
!= VDO_SUCCESS
)
777 result
= VDO_ASSERT((info
->busy
== 0), "Page is not busy before loading.");
778 if (result
!= VDO_SUCCESS
)
781 set_info_state(info
, PS_INCOMING
);
782 cache
->outstanding_reads
++;
783 ADD_ONCE(cache
->stats
.pages_loaded
, 1);
784 callback
= (cache
->rebuilding
? handle_rebuild_read_error
: handle_load_error
);
785 vdo_submit_metadata_vio(info
->vio
, pbn
, load_cache_page_endio
,
786 callback
, REQ_OP_READ
| REQ_PRIO
);
790 static void write_pages(struct vdo_completion
*completion
);
792 /** handle_flush_error() - Handle errors flushing the layer. */
793 static void handle_flush_error(struct vdo_completion
*completion
)
795 struct page_info
*info
= completion
->parent
;
797 vio_record_metadata_io_error(as_vio(completion
));
798 set_persistent_error(info
->cache
, "flush failed", completion
->result
);
799 write_pages(completion
);
802 static void flush_endio(struct bio
*bio
)
804 struct vio
*vio
= bio
->bi_private
;
805 struct page_info
*info
= vio
->completion
.parent
;
807 continue_vio_after_io(vio
, write_pages
, info
->cache
->zone
->thread_id
);
810 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */
811 static void save_pages(struct vdo_page_cache
*cache
)
813 struct page_info
*info
;
816 if ((cache
->pages_in_flush
> 0) || (cache
->pages_to_flush
== 0))
819 assert_io_allowed(cache
);
821 info
= list_first_entry(&cache
->outgoing_list
, struct page_info
, state_entry
);
823 cache
->pages_in_flush
= cache
->pages_to_flush
;
824 cache
->pages_to_flush
= 0;
825 ADD_ONCE(cache
->stats
.flush_count
, 1);
830 * We must make sure that the recovery journal entries that changed these pages were
831 * successfully persisted, and thus must issue a flush before each batch of pages is
832 * written to ensure this.
834 vdo_submit_flush_vio(vio
, flush_endio
, handle_flush_error
);
838 * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved.
840 * Once in the list, a page may not be used until it has been written out.
842 static void schedule_page_save(struct page_info
*info
)
844 if (info
->busy
> 0) {
845 info
->write_status
= WRITE_STATUS_DEFERRED
;
849 info
->cache
->pages_to_flush
++;
850 info
->cache
->outstanding_writes
++;
851 set_info_state(info
, PS_OUTGOING
);
855 * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving
856 * pages if another save is not in progress.
858 static void launch_page_save(struct page_info
*info
)
860 schedule_page_save(info
);
861 save_pages(info
->cache
);
865 * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is
866 * requesting a given page number.
867 * @context: A pointer to the pbn of the desired page.
869 * Implements waiter_match_fn.
871 * Return: true if the page completion is for the desired page number.
873 static bool completion_needs_page(struct vdo_waiter
*waiter
, void *context
)
875 physical_block_number_t
*pbn
= context
;
877 return (page_completion_from_waiter(waiter
)->pbn
== *pbn
);
881 * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and
882 * any other completions that match it in page number.
884 static void allocate_free_page(struct page_info
*info
)
887 struct vdo_waiter
*oldest_waiter
;
888 physical_block_number_t pbn
;
889 struct vdo_page_cache
*cache
= info
->cache
;
891 assert_on_cache_thread(cache
, __func__
);
893 if (!vdo_waitq_has_waiters(&cache
->free_waiters
)) {
894 if (cache
->stats
.cache_pressure
> 0) {
895 vdo_log_info("page cache pressure relieved");
896 WRITE_ONCE(cache
->stats
.cache_pressure
, 0);
902 result
= reset_page_info(info
);
903 if (result
!= VDO_SUCCESS
) {
904 set_persistent_error(cache
, "cannot reset page info", result
);
908 oldest_waiter
= vdo_waitq_get_first_waiter(&cache
->free_waiters
);
909 pbn
= page_completion_from_waiter(oldest_waiter
)->pbn
;
912 * Remove all entries which match the page number in question and push them onto the page
915 vdo_waitq_dequeue_matching_waiters(&cache
->free_waiters
, completion_needs_page
,
916 &pbn
, &info
->waiting
);
917 cache
->waiter_count
-= vdo_waitq_num_waiters(&info
->waiting
);
919 result
= launch_page_load(info
, pbn
);
920 if (result
!= VDO_SUCCESS
) {
921 vdo_waitq_notify_all_waiters(&info
->waiting
,
922 complete_waiter_with_error
, &result
);
927 * discard_a_page() - Begin the process of discarding a page.
929 * If no page is discardable, increments a count of deferred frees so that the next release of a
930 * page which is no longer busy will kick off another discard cycle. This is an indication that the
931 * cache is not big enough.
933 * If the selected page is not dirty, immediately allocates the page to the oldest completion
934 * waiting for a free page.
936 static void discard_a_page(struct vdo_page_cache
*cache
)
938 struct page_info
*info
= select_lru_page(cache
);
941 report_cache_pressure(cache
);
945 if (!is_dirty(info
)) {
946 allocate_free_page(info
);
950 VDO_ASSERT_LOG_ONLY(!is_in_flight(info
),
951 "page selected for discard is not in flight");
953 cache
->discard_count
++;
954 info
->write_status
= WRITE_STATUS_DISCARD
;
955 launch_page_save(info
);
959 * discard_page_for_completion() - Helper used to trigger a discard so that the completion can get
962 static void discard_page_for_completion(struct vdo_page_completion
*vdo_page_comp
)
964 struct vdo_page_cache
*cache
= vdo_page_comp
->cache
;
966 cache
->waiter_count
++;
967 vdo_waitq_enqueue_waiter(&cache
->free_waiters
, &vdo_page_comp
->waiter
);
968 discard_a_page(cache
);
972 * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free
974 * @cache: The page cache.
976 static void discard_page_if_needed(struct vdo_page_cache
*cache
)
978 if (cache
->waiter_count
> cache
->discard_count
)
979 discard_a_page(cache
);
983 * write_has_finished() - Inform the cache that a write has finished (possibly with an error).
984 * @info: The info structure for the page whose write just completed.
986 * Return: true if the page write was a discard.
988 static bool write_has_finished(struct page_info
*info
)
990 bool was_discard
= (info
->write_status
== WRITE_STATUS_DISCARD
);
992 assert_on_cache_thread(info
->cache
, __func__
);
993 info
->cache
->outstanding_writes
--;
995 info
->write_status
= WRITE_STATUS_NORMAL
;
1000 * handle_page_write_error() - Handler for page write errors.
1001 * @completion: The page write vio.
1003 static void handle_page_write_error(struct vdo_completion
*completion
)
1005 int result
= completion
->result
;
1006 struct page_info
*info
= completion
->parent
;
1007 struct vdo_page_cache
*cache
= info
->cache
;
1009 vio_record_metadata_io_error(as_vio(completion
));
1011 /* If we're already read-only, write failures are to be expected. */
1012 if (result
!= VDO_READ_ONLY
) {
1013 vdo_log_ratelimit(vdo_log_error
,
1014 "failed to write block map page %llu",
1015 (unsigned long long) info
->pbn
);
1018 set_info_state(info
, PS_DIRTY
);
1019 ADD_ONCE(cache
->stats
.failed_writes
, 1);
1020 set_persistent_error(cache
, "cannot write page", result
);
1022 if (!write_has_finished(info
))
1023 discard_page_if_needed(cache
);
1025 check_for_drain_complete(cache
->zone
);
1028 static void page_is_written_out(struct vdo_completion
*completion
);
1030 static void write_cache_page_endio(struct bio
*bio
)
1032 struct vio
*vio
= bio
->bi_private
;
1033 struct page_info
*info
= vio
->completion
.parent
;
1035 continue_vio_after_io(vio
, page_is_written_out
, info
->cache
->zone
->thread_id
);
1039 * page_is_written_out() - Callback used when a page has been written out.
1040 * @completion: The vio which wrote the page. Its parent is a page_info.
1042 static void page_is_written_out(struct vdo_completion
*completion
)
1044 bool was_discard
, reclaimed
;
1046 struct page_info
*info
= completion
->parent
;
1047 struct vdo_page_cache
*cache
= info
->cache
;
1048 struct block_map_page
*page
= (struct block_map_page
*) get_page_buffer(info
);
1050 if (!page
->header
.initialized
) {
1051 page
->header
.initialized
= true;
1052 vdo_submit_metadata_vio(info
->vio
, info
->pbn
,
1053 write_cache_page_endio
,
1054 handle_page_write_error
,
1055 REQ_OP_WRITE
| REQ_PRIO
| REQ_PREFLUSH
);
1059 /* Handle journal updates and torn write protection. */
1060 vdo_release_recovery_journal_block_reference(cache
->zone
->block_map
->journal
,
1061 info
->recovery_lock
,
1062 VDO_ZONE_TYPE_LOGICAL
,
1063 cache
->zone
->zone_number
);
1064 info
->recovery_lock
= 0;
1065 was_discard
= write_has_finished(info
);
1066 reclaimed
= (!was_discard
|| (info
->busy
> 0) || vdo_waitq_has_waiters(&info
->waiting
));
1068 set_info_state(info
, PS_RESIDENT
);
1070 reclamations
= distribute_page_over_waitq(info
, &info
->waiting
);
1071 ADD_ONCE(cache
->stats
.reclaimed
, reclamations
);
1074 cache
->discard_count
--;
1077 discard_page_if_needed(cache
);
1079 allocate_free_page(info
);
1081 check_for_drain_complete(cache
->zone
);
1085 * write_pages() - Write the batch of pages which were covered by the layer flush which just
1087 * @flush_completion: The flush vio.
1089 * This callback is registered in save_pages().
1091 static void write_pages(struct vdo_completion
*flush_completion
)
1093 struct vdo_page_cache
*cache
= ((struct page_info
*) flush_completion
->parent
)->cache
;
1096 * We need to cache these two values on the stack since it is possible for the last
1097 * page info to cause the page cache to get freed. Hence once we launch the last page,
1098 * it may be unsafe to dereference the cache.
1100 bool has_unflushed_pages
= (cache
->pages_to_flush
> 0);
1101 page_count_t pages_in_flush
= cache
->pages_in_flush
;
1103 cache
->pages_in_flush
= 0;
1104 while (pages_in_flush
-- > 0) {
1105 struct page_info
*info
=
1106 list_first_entry(&cache
->outgoing_list
, struct page_info
,
1109 list_del_init(&info
->state_entry
);
1110 if (vdo_is_read_only(info
->cache
->vdo
)) {
1111 struct vdo_completion
*completion
= &info
->vio
->completion
;
1113 vdo_reset_completion(completion
);
1114 completion
->callback
= page_is_written_out
;
1115 completion
->error_handler
= handle_page_write_error
;
1116 vdo_fail_completion(completion
, VDO_READ_ONLY
);
1119 ADD_ONCE(info
->cache
->stats
.pages_saved
, 1);
1120 vdo_submit_metadata_vio(info
->vio
, info
->pbn
, write_cache_page_endio
,
1121 handle_page_write_error
, REQ_OP_WRITE
| REQ_PRIO
);
1124 if (has_unflushed_pages
) {
1126 * If there are unflushed pages, the cache can't have been freed, so this call is
1134 * vdo_release_page_completion() - Release a VDO Page Completion.
1136 * The page referenced by this completion (if any) will no longer be held busy by this completion.
1137 * If a page becomes discardable and there are completions awaiting free pages then a new round of
1138 * page discarding is started.
1140 void vdo_release_page_completion(struct vdo_completion
*completion
)
1142 struct page_info
*discard_info
= NULL
;
1143 struct vdo_page_completion
*page_completion
= as_vdo_page_completion(completion
);
1144 struct vdo_page_cache
*cache
;
1146 if (completion
->result
== VDO_SUCCESS
) {
1147 if (!validate_completed_page_or_enter_read_only_mode(page_completion
, false))
1150 if (--page_completion
->info
->busy
== 0)
1151 discard_info
= page_completion
->info
;
1154 VDO_ASSERT_LOG_ONLY((page_completion
->waiter
.next_waiter
== NULL
),
1155 "Page being released after leaving all queues");
1157 page_completion
->info
= NULL
;
1158 cache
= page_completion
->cache
;
1159 assert_on_cache_thread(cache
, __func__
);
1161 if (discard_info
!= NULL
) {
1162 if (discard_info
->write_status
== WRITE_STATUS_DEFERRED
) {
1163 discard_info
->write_status
= WRITE_STATUS_NORMAL
;
1164 launch_page_save(discard_info
);
1168 * if there are excess requests for pages (that have not already started discards)
1169 * we need to discard some page (which may be this one)
1171 discard_page_if_needed(cache
);
1176 * load_page_for_completion() - Helper function to load a page as described by a VDO Page
1179 static void load_page_for_completion(struct page_info
*info
,
1180 struct vdo_page_completion
*vdo_page_comp
)
1184 vdo_waitq_enqueue_waiter(&info
->waiting
, &vdo_page_comp
->waiter
);
1185 result
= launch_page_load(info
, vdo_page_comp
->pbn
);
1186 if (result
!= VDO_SUCCESS
) {
1187 vdo_waitq_notify_all_waiters(&info
->waiting
,
1188 complete_waiter_with_error
, &result
);
1193 * vdo_get_page() - Initialize a page completion and get a block map page.
1194 * @page_completion: The vdo_page_completion to initialize.
1195 * @zone: The block map zone of the desired page.
1196 * @pbn: The absolute physical block of the desired page.
1197 * @writable: Whether the page can be modified.
1198 * @parent: The object to notify when the fetch is complete.
1199 * @callback: The notification callback.
1200 * @error_handler: The handler for fetch errors.
1201 * @requeue: Whether we must requeue when notifying the parent.
1203 * May cause another page to be discarded (potentially writing a dirty page) and the one nominated
1204 * by the completion to be loaded from disk. When the callback is invoked, the page will be
1205 * resident in the cache and marked busy. All callers must call vdo_release_page_completion()
1206 * when they are done with the page to clear the busy mark.
1208 void vdo_get_page(struct vdo_page_completion
*page_completion
,
1209 struct block_map_zone
*zone
, physical_block_number_t pbn
,
1210 bool writable
, void *parent
, vdo_action_fn callback
,
1211 vdo_action_fn error_handler
, bool requeue
)
1213 struct vdo_page_cache
*cache
= &zone
->page_cache
;
1214 struct vdo_completion
*completion
= &page_completion
->completion
;
1215 struct page_info
*info
;
1217 assert_on_cache_thread(cache
, __func__
);
1218 VDO_ASSERT_LOG_ONLY((page_completion
->waiter
.next_waiter
== NULL
),
1219 "New page completion was not already on a wait queue");
1221 *page_completion
= (struct vdo_page_completion
) {
1223 .writable
= writable
,
1227 vdo_initialize_completion(completion
, cache
->vdo
, VDO_PAGE_COMPLETION
);
1228 vdo_prepare_completion(completion
, callback
, error_handler
,
1229 cache
->zone
->thread_id
, parent
);
1230 completion
->requeue
= requeue
;
1232 if (page_completion
->writable
&& vdo_is_read_only(cache
->vdo
)) {
1233 vdo_fail_completion(completion
, VDO_READ_ONLY
);
1237 if (page_completion
->writable
)
1238 ADD_ONCE(cache
->stats
.write_count
, 1);
1240 ADD_ONCE(cache
->stats
.read_count
, 1);
1242 info
= find_page(cache
, page_completion
->pbn
);
1244 /* The page is in the cache already. */
1245 if ((info
->write_status
== WRITE_STATUS_DEFERRED
) ||
1246 is_incoming(info
) ||
1247 (is_outgoing(info
) && page_completion
->writable
)) {
1248 /* The page is unusable until it has finished I/O. */
1249 ADD_ONCE(cache
->stats
.wait_for_page
, 1);
1250 vdo_waitq_enqueue_waiter(&info
->waiting
, &page_completion
->waiter
);
1254 if (is_valid(info
)) {
1255 /* The page is usable. */
1256 ADD_ONCE(cache
->stats
.found_in_cache
, 1);
1257 if (!is_present(info
))
1258 ADD_ONCE(cache
->stats
.read_outgoing
, 1);
1261 complete_with_page(info
, page_completion
);
1265 /* Something horrible has gone wrong. */
1266 VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state.");
1269 /* The page must be fetched. */
1270 info
= find_free_page(cache
);
1272 ADD_ONCE(cache
->stats
.fetch_required
, 1);
1273 load_page_for_completion(info
, page_completion
);
1277 /* The page must wait for a page to be discarded. */
1278 ADD_ONCE(cache
->stats
.discard_required
, 1);
1279 discard_page_for_completion(page_completion
);
1283 * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy.
1284 * @completion: The vdo_page_completion containing the page.
1286 void vdo_request_page_write(struct vdo_completion
*completion
)
1288 struct page_info
*info
;
1289 struct vdo_page_completion
*vdo_page_comp
= as_vdo_page_completion(completion
);
1291 if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp
, true))
1294 info
= vdo_page_comp
->info
;
1295 set_info_state(info
, PS_DIRTY
);
1296 launch_page_save(info
);
1300 * vdo_get_cached_page() - Get the block map page from a page completion.
1301 * @completion: A vdo page completion whose callback has been called.
1302 * @page_ptr: A pointer to hold the page
1304 * Return: VDO_SUCCESS or an error
1306 int vdo_get_cached_page(struct vdo_completion
*completion
,
1307 struct block_map_page
**page_ptr
)
1310 struct vdo_page_completion
*vpc
;
1312 vpc
= as_vdo_page_completion(completion
);
1313 result
= validate_completed_page(vpc
, true);
1314 if (result
== VDO_SUCCESS
)
1315 *page_ptr
= (struct block_map_page
*) get_page_buffer(vpc
->info
);
1321 * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache.
1323 * There must not be any dirty pages in the cache.
1325 * Return: A success or error code.
1327 int vdo_invalidate_page_cache(struct vdo_page_cache
*cache
)
1329 struct page_info
*info
;
1331 assert_on_cache_thread(cache
, __func__
);
1333 /* Make sure we don't throw away any dirty pages. */
1334 for (info
= cache
->infos
; info
< cache
->infos
+ cache
->page_count
; info
++) {
1335 int result
= VDO_ASSERT(!is_dirty(info
), "cache must have no dirty pages");
1337 if (result
!= VDO_SUCCESS
)
1341 /* Reset the page map by re-allocating it. */
1342 vdo_int_map_free(vdo_forget(cache
->page_map
));
1343 return vdo_int_map_create(cache
->page_count
, &cache
->page_map
);
1347 * get_tree_page_by_index() - Get the tree page for a given height and page index.
1349 * Return: The requested page.
1351 static struct tree_page
* __must_check
get_tree_page_by_index(struct forest
*forest
,
1352 root_count_t root_index
,
1354 page_number_t page_index
)
1356 page_number_t offset
= 0;
1359 for (segment
= 0; segment
< forest
->segments
; segment
++) {
1360 page_number_t border
= forest
->boundaries
[segment
].levels
[height
- 1];
1362 if (page_index
< border
) {
1363 struct block_map_tree
*tree
= &forest
->trees
[root_index
];
1365 return &(tree
->segments
[segment
].levels
[height
- 1][page_index
- offset
]);
1374 /* Get the page referred to by the lock's tree slot at its current height. */
1375 static inline struct tree_page
*get_tree_page(const struct block_map_zone
*zone
,
1376 const struct tree_lock
*lock
)
1378 return get_tree_page_by_index(zone
->block_map
->forest
, lock
->root_index
,
1380 lock
->tree_slots
[lock
->height
].page_index
);
1383 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */
1384 bool vdo_copy_valid_page(char *buffer
, nonce_t nonce
,
1385 physical_block_number_t pbn
,
1386 struct block_map_page
*page
)
1388 struct block_map_page
*loaded
= (struct block_map_page
*) buffer
;
1389 enum block_map_page_validity validity
=
1390 vdo_validate_block_map_page(loaded
, nonce
, pbn
);
1392 if (validity
== VDO_BLOCK_MAP_PAGE_VALID
) {
1393 memcpy(page
, loaded
, VDO_BLOCK_SIZE
);
1397 if (validity
== VDO_BLOCK_MAP_PAGE_BAD
) {
1398 vdo_log_error_strerror(VDO_BAD_PAGE
,
1399 "Expected page %llu but got page %llu instead",
1400 (unsigned long long) pbn
,
1401 (unsigned long long) vdo_get_block_map_page_pbn(loaded
));
1408 * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within
1409 * a cyclic range of values from 0 to (modulus - 1).
1410 * @lower: The lowest value to accept.
1411 * @value: The value to check.
1412 * @upper: The highest value to accept.
1413 * @modulus: The size of the cyclic space, no more than 2^15.
1415 * The value and both bounds must be smaller than the modulus.
1417 * Return: true if the value is in range.
1419 static bool in_cyclic_range(u16 lower
, u16 value
, u16 upper
, u16 modulus
)
1425 return (value
<= upper
);
1429 * is_not_older() - Check whether a generation is strictly older than some other generation in the
1430 * context of a zone's current generation range.
1431 * @zone: The zone in which to do the comparison.
1432 * @a: The generation in question.
1433 * @b: The generation to compare to.
1435 * Return: true if generation @a is not strictly older than generation @b in the context of @zone
1437 static bool __must_check
is_not_older(struct block_map_zone
*zone
, u8 a
, u8 b
)
1441 result
= VDO_ASSERT((in_cyclic_range(zone
->oldest_generation
, a
, zone
->generation
, 1 << 8) &&
1442 in_cyclic_range(zone
->oldest_generation
, b
, zone
->generation
, 1 << 8)),
1443 "generation(s) %u, %u are out of range [%u, %u]",
1444 a
, b
, zone
->oldest_generation
, zone
->generation
);
1445 if (result
!= VDO_SUCCESS
) {
1446 enter_zone_read_only_mode(zone
, result
);
1450 return in_cyclic_range(b
, a
, zone
->generation
, 1 << 8);
1453 static void release_generation(struct block_map_zone
*zone
, u8 generation
)
1457 result
= VDO_ASSERT((zone
->dirty_page_counts
[generation
] > 0),
1458 "dirty page count underflow for generation %u", generation
);
1459 if (result
!= VDO_SUCCESS
) {
1460 enter_zone_read_only_mode(zone
, result
);
1464 zone
->dirty_page_counts
[generation
]--;
1465 while ((zone
->dirty_page_counts
[zone
->oldest_generation
] == 0) &&
1466 (zone
->oldest_generation
!= zone
->generation
))
1467 zone
->oldest_generation
++;
1470 static void set_generation(struct block_map_zone
*zone
, struct tree_page
*page
,
1475 bool decrement_old
= vdo_waiter_is_waiting(&page
->waiter
);
1476 u8 old_generation
= page
->generation
;
1478 if (decrement_old
&& (old_generation
== new_generation
))
1481 page
->generation
= new_generation
;
1482 new_count
= ++zone
->dirty_page_counts
[new_generation
];
1483 result
= VDO_ASSERT((new_count
!= 0), "dirty page count overflow for generation %u",
1485 if (result
!= VDO_SUCCESS
) {
1486 enter_zone_read_only_mode(zone
, result
);
1491 release_generation(zone
, old_generation
);
1494 static void write_page(struct tree_page
*tree_page
, struct pooled_vio
*vio
);
1496 /* Implements waiter_callback_fn */
1497 static void write_page_callback(struct vdo_waiter
*waiter
, void *context
)
1499 write_page(container_of(waiter
, struct tree_page
, waiter
), context
);
1502 static void acquire_vio(struct vdo_waiter
*waiter
, struct block_map_zone
*zone
)
1504 waiter
->callback
= write_page_callback
;
1505 acquire_vio_from_pool(zone
->vio_pool
, waiter
);
1508 /* Return: true if all possible generations were not already active */
1509 static bool attempt_increment(struct block_map_zone
*zone
)
1511 u8 generation
= zone
->generation
+ 1;
1513 if (zone
->oldest_generation
== generation
)
1516 zone
->generation
= generation
;
1520 /* Launches a flush if one is not already in progress. */
1521 static void enqueue_page(struct tree_page
*page
, struct block_map_zone
*zone
)
1523 if ((zone
->flusher
== NULL
) && attempt_increment(zone
)) {
1524 zone
->flusher
= page
;
1525 acquire_vio(&page
->waiter
, zone
);
1529 vdo_waitq_enqueue_waiter(&zone
->flush_waiters
, &page
->waiter
);
1532 static void write_page_if_not_dirtied(struct vdo_waiter
*waiter
, void *context
)
1534 struct tree_page
*page
= container_of(waiter
, struct tree_page
, waiter
);
1535 struct write_if_not_dirtied_context
*write_context
= context
;
1537 if (page
->generation
== write_context
->generation
) {
1538 acquire_vio(waiter
, write_context
->zone
);
1542 enqueue_page(page
, write_context
->zone
);
1545 static void return_to_pool(struct block_map_zone
*zone
, struct pooled_vio
*vio
)
1547 return_vio_to_pool(zone
->vio_pool
, vio
);
1548 check_for_drain_complete(zone
);
1551 /* This callback is registered in write_initialized_page(). */
1552 static void finish_page_write(struct vdo_completion
*completion
)
1555 struct vio
*vio
= as_vio(completion
);
1556 struct pooled_vio
*pooled
= container_of(vio
, struct pooled_vio
, vio
);
1557 struct tree_page
*page
= completion
->parent
;
1558 struct block_map_zone
*zone
= pooled
->context
;
1560 vdo_release_recovery_journal_block_reference(zone
->block_map
->journal
,
1561 page
->writing_recovery_lock
,
1562 VDO_ZONE_TYPE_LOGICAL
,
1565 dirty
= (page
->writing_generation
!= page
->generation
);
1566 release_generation(zone
, page
->writing_generation
);
1567 page
->writing
= false;
1569 if (zone
->flusher
== page
) {
1570 struct write_if_not_dirtied_context context
= {
1572 .generation
= page
->writing_generation
,
1575 vdo_waitq_notify_all_waiters(&zone
->flush_waiters
,
1576 write_page_if_not_dirtied
, &context
);
1577 if (dirty
&& attempt_increment(zone
)) {
1578 write_page(page
, pooled
);
1582 zone
->flusher
= NULL
;
1586 enqueue_page(page
, zone
);
1587 } else if ((zone
->flusher
== NULL
) && vdo_waitq_has_waiters(&zone
->flush_waiters
) &&
1588 attempt_increment(zone
)) {
1589 zone
->flusher
= container_of(vdo_waitq_dequeue_waiter(&zone
->flush_waiters
),
1590 struct tree_page
, waiter
);
1591 write_page(zone
->flusher
, pooled
);
1595 return_to_pool(zone
, pooled
);
1598 static void handle_write_error(struct vdo_completion
*completion
)
1600 int result
= completion
->result
;
1601 struct vio
*vio
= as_vio(completion
);
1602 struct pooled_vio
*pooled
= container_of(vio
, struct pooled_vio
, vio
);
1603 struct block_map_zone
*zone
= pooled
->context
;
1605 vio_record_metadata_io_error(vio
);
1606 enter_zone_read_only_mode(zone
, result
);
1607 return_to_pool(zone
, pooled
);
1610 static void write_page_endio(struct bio
*bio
);
1612 static void write_initialized_page(struct vdo_completion
*completion
)
1614 struct vio
*vio
= as_vio(completion
);
1615 struct pooled_vio
*pooled
= container_of(vio
, struct pooled_vio
, vio
);
1616 struct block_map_zone
*zone
= pooled
->context
;
1617 struct tree_page
*tree_page
= completion
->parent
;
1618 struct block_map_page
*page
= (struct block_map_page
*) vio
->data
;
1619 blk_opf_t operation
= REQ_OP_WRITE
| REQ_PRIO
;
1622 * Now that we know the page has been written at least once, mark the copy we are writing
1625 page
->header
.initialized
= true;
1627 if (zone
->flusher
== tree_page
)
1628 operation
|= REQ_PREFLUSH
;
1630 vdo_submit_metadata_vio(vio
, vdo_get_block_map_page_pbn(page
),
1631 write_page_endio
, handle_write_error
,
1635 static void write_page_endio(struct bio
*bio
)
1637 struct pooled_vio
*vio
= bio
->bi_private
;
1638 struct block_map_zone
*zone
= vio
->context
;
1639 struct block_map_page
*page
= (struct block_map_page
*) vio
->vio
.data
;
1641 continue_vio_after_io(&vio
->vio
,
1642 (page
->header
.initialized
?
1643 finish_page_write
: write_initialized_page
),
1647 static void write_page(struct tree_page
*tree_page
, struct pooled_vio
*vio
)
1649 struct vdo_completion
*completion
= &vio
->vio
.completion
;
1650 struct block_map_zone
*zone
= vio
->context
;
1651 struct block_map_page
*page
= vdo_as_block_map_page(tree_page
);
1653 if ((zone
->flusher
!= tree_page
) &&
1654 is_not_older(zone
, tree_page
->generation
, zone
->generation
)) {
1656 * This page was re-dirtied after the last flush was issued, hence we need to do
1659 enqueue_page(tree_page
, zone
);
1660 return_to_pool(zone
, vio
);
1664 completion
->parent
= tree_page
;
1665 memcpy(vio
->vio
.data
, tree_page
->page_buffer
, VDO_BLOCK_SIZE
);
1666 completion
->callback_thread_id
= zone
->thread_id
;
1668 tree_page
->writing
= true;
1669 tree_page
->writing_generation
= tree_page
->generation
;
1670 tree_page
->writing_recovery_lock
= tree_page
->recovery_lock
;
1672 /* Clear this now so that we know this page is not on any dirty list. */
1673 tree_page
->recovery_lock
= 0;
1676 * We've already copied the page into the vio which will write it, so if it was not yet
1677 * initialized, the first write will indicate that (for torn write protection). It is now
1678 * safe to mark it as initialized in memory since if the write fails, the in memory state
1679 * will become irrelevant.
1681 if (page
->header
.initialized
) {
1682 write_initialized_page(completion
);
1686 page
->header
.initialized
= true;
1687 vdo_submit_metadata_vio(&vio
->vio
, vdo_get_block_map_page_pbn(page
),
1688 write_page_endio
, handle_write_error
,
1689 REQ_OP_WRITE
| REQ_PRIO
);
1692 /* Release a lock on a page which was being loaded or allocated. */
1693 static void release_page_lock(struct data_vio
*data_vio
, char *what
)
1695 struct block_map_zone
*zone
;
1696 struct tree_lock
*lock_holder
;
1697 struct tree_lock
*lock
= &data_vio
->tree_lock
;
1699 VDO_ASSERT_LOG_ONLY(lock
->locked
,
1700 "release of unlocked block map page %s for key %llu in tree %u",
1701 what
, (unsigned long long) lock
->key
, lock
->root_index
);
1703 zone
= data_vio
->logical
.zone
->block_map_zone
;
1704 lock_holder
= vdo_int_map_remove(zone
->loading_pages
, lock
->key
);
1705 VDO_ASSERT_LOG_ONLY((lock_holder
== lock
),
1706 "block map page %s mismatch for key %llu in tree %u",
1707 what
, (unsigned long long) lock
->key
, lock
->root_index
);
1708 lock
->locked
= false;
1711 static void finish_lookup(struct data_vio
*data_vio
, int result
)
1713 data_vio
->tree_lock
.height
= 0;
1715 --data_vio
->logical
.zone
->block_map_zone
->active_lookups
;
1717 set_data_vio_logical_callback(data_vio
, continue_data_vio_with_block_map_slot
);
1718 data_vio
->vio
.completion
.error_handler
= handle_data_vio_error
;
1719 continue_data_vio_with_error(data_vio
, result
);
1722 static void abort_lookup_for_waiter(struct vdo_waiter
*waiter
, void *context
)
1724 struct data_vio
*data_vio
= vdo_waiter_as_data_vio(waiter
);
1725 int result
= *((int *) context
);
1727 if (!data_vio
->write
) {
1728 if (result
== VDO_NO_SPACE
)
1729 result
= VDO_SUCCESS
;
1730 } else if (result
!= VDO_NO_SPACE
) {
1731 result
= VDO_READ_ONLY
;
1734 finish_lookup(data_vio
, result
);
1737 static void abort_lookup(struct data_vio
*data_vio
, int result
, char *what
)
1739 if (result
!= VDO_NO_SPACE
)
1740 enter_zone_read_only_mode(data_vio
->logical
.zone
->block_map_zone
, result
);
1742 if (data_vio
->tree_lock
.locked
) {
1743 release_page_lock(data_vio
, what
);
1744 vdo_waitq_notify_all_waiters(&data_vio
->tree_lock
.waiters
,
1745 abort_lookup_for_waiter
,
1749 finish_lookup(data_vio
, result
);
1752 static void abort_load(struct data_vio
*data_vio
, int result
)
1754 abort_lookup(data_vio
, result
, "load");
1757 static bool __must_check
is_invalid_tree_entry(const struct vdo
*vdo
,
1758 const struct data_location
*mapping
,
1761 if (!vdo_is_valid_location(mapping
) ||
1762 vdo_is_state_compressed(mapping
->state
) ||
1763 (vdo_is_mapped_location(mapping
) && (mapping
->pbn
== VDO_ZERO_BLOCK
)))
1766 /* Roots aren't physical data blocks, so we can't check their PBNs. */
1767 if (height
== VDO_BLOCK_MAP_TREE_HEIGHT
)
1770 return !vdo_is_physical_data_block(vdo
->depot
, mapping
->pbn
);
1773 static void load_block_map_page(struct block_map_zone
*zone
, struct data_vio
*data_vio
);
1774 static void allocate_block_map_page(struct block_map_zone
*zone
,
1775 struct data_vio
*data_vio
);
1777 static void continue_with_loaded_page(struct data_vio
*data_vio
,
1778 struct block_map_page
*page
)
1780 struct tree_lock
*lock
= &data_vio
->tree_lock
;
1781 struct block_map_tree_slot slot
= lock
->tree_slots
[lock
->height
];
1782 struct data_location mapping
=
1783 vdo_unpack_block_map_entry(&page
->entries
[slot
.block_map_slot
.slot
]);
1785 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio
), &mapping
, lock
->height
)) {
1786 vdo_log_error_strerror(VDO_BAD_MAPPING
,
1787 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
1788 (unsigned long long) mapping
.pbn
, mapping
.state
,
1789 lock
->tree_slots
[lock
->height
- 1].page_index
,
1791 abort_load(data_vio
, VDO_BAD_MAPPING
);
1795 if (!vdo_is_mapped_location(&mapping
)) {
1796 /* The page we need is unallocated */
1797 allocate_block_map_page(data_vio
->logical
.zone
->block_map_zone
,
1802 lock
->tree_slots
[lock
->height
- 1].block_map_slot
.pbn
= mapping
.pbn
;
1803 if (lock
->height
== 1) {
1804 finish_lookup(data_vio
, VDO_SUCCESS
);
1808 /* We know what page we need to load next */
1809 load_block_map_page(data_vio
->logical
.zone
->block_map_zone
, data_vio
);
1812 static void continue_load_for_waiter(struct vdo_waiter
*waiter
, void *context
)
1814 struct data_vio
*data_vio
= vdo_waiter_as_data_vio(waiter
);
1816 data_vio
->tree_lock
.height
--;
1817 continue_with_loaded_page(data_vio
, context
);
1820 static void finish_block_map_page_load(struct vdo_completion
*completion
)
1822 physical_block_number_t pbn
;
1823 struct tree_page
*tree_page
;
1824 struct block_map_page
*page
;
1826 struct vio
*vio
= as_vio(completion
);
1827 struct pooled_vio
*pooled
= vio_as_pooled_vio(vio
);
1828 struct data_vio
*data_vio
= completion
->parent
;
1829 struct block_map_zone
*zone
= pooled
->context
;
1830 struct tree_lock
*tree_lock
= &data_vio
->tree_lock
;
1832 tree_lock
->height
--;
1833 pbn
= tree_lock
->tree_slots
[tree_lock
->height
].block_map_slot
.pbn
;
1834 tree_page
= get_tree_page(zone
, tree_lock
);
1835 page
= (struct block_map_page
*) tree_page
->page_buffer
;
1836 nonce
= zone
->block_map
->nonce
;
1838 if (!vdo_copy_valid_page(vio
->data
, nonce
, pbn
, page
))
1839 vdo_format_block_map_page(page
, nonce
, pbn
, false);
1840 return_vio_to_pool(zone
->vio_pool
, pooled
);
1842 /* Release our claim to the load and wake any waiters */
1843 release_page_lock(data_vio
, "load");
1844 vdo_waitq_notify_all_waiters(&tree_lock
->waiters
, continue_load_for_waiter
, page
);
1845 continue_with_loaded_page(data_vio
, page
);
1848 static void handle_io_error(struct vdo_completion
*completion
)
1850 int result
= completion
->result
;
1851 struct vio
*vio
= as_vio(completion
);
1852 struct pooled_vio
*pooled
= container_of(vio
, struct pooled_vio
, vio
);
1853 struct data_vio
*data_vio
= completion
->parent
;
1854 struct block_map_zone
*zone
= pooled
->context
;
1856 vio_record_metadata_io_error(vio
);
1857 return_vio_to_pool(zone
->vio_pool
, pooled
);
1858 abort_load(data_vio
, result
);
1861 static void load_page_endio(struct bio
*bio
)
1863 struct vio
*vio
= bio
->bi_private
;
1864 struct data_vio
*data_vio
= vio
->completion
.parent
;
1866 continue_vio_after_io(vio
, finish_block_map_page_load
,
1867 data_vio
->logical
.zone
->thread_id
);
1870 static void load_page(struct vdo_waiter
*waiter
, void *context
)
1872 struct pooled_vio
*pooled
= context
;
1873 struct data_vio
*data_vio
= vdo_waiter_as_data_vio(waiter
);
1874 struct tree_lock
*lock
= &data_vio
->tree_lock
;
1875 physical_block_number_t pbn
= lock
->tree_slots
[lock
->height
- 1].block_map_slot
.pbn
;
1877 pooled
->vio
.completion
.parent
= data_vio
;
1878 vdo_submit_metadata_vio(&pooled
->vio
, pbn
, load_page_endio
,
1879 handle_io_error
, REQ_OP_READ
| REQ_PRIO
);
1883 * If the page is already locked, queue up to wait for the lock to be released. If the lock is
1884 * acquired, @data_vio->tree_lock.locked will be true.
1886 static int attempt_page_lock(struct block_map_zone
*zone
, struct data_vio
*data_vio
)
1889 struct tree_lock
*lock_holder
;
1890 struct tree_lock
*lock
= &data_vio
->tree_lock
;
1891 height_t height
= lock
->height
;
1892 struct block_map_tree_slot tree_slot
= lock
->tree_slots
[height
];
1895 key
.descriptor
= (struct page_descriptor
) {
1896 .root_index
= lock
->root_index
,
1898 .page_index
= tree_slot
.page_index
,
1899 .slot
= tree_slot
.block_map_slot
.slot
,
1901 lock
->key
= key
.key
;
1903 result
= vdo_int_map_put(zone
->loading_pages
, lock
->key
,
1904 lock
, false, (void **) &lock_holder
);
1905 if (result
!= VDO_SUCCESS
)
1908 if (lock_holder
== NULL
) {
1909 /* We got the lock */
1910 data_vio
->tree_lock
.locked
= true;
1914 /* Someone else is loading or allocating the page we need */
1915 vdo_waitq_enqueue_waiter(&lock_holder
->waiters
, &data_vio
->waiter
);
1919 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */
1920 static void load_block_map_page(struct block_map_zone
*zone
, struct data_vio
*data_vio
)
1924 result
= attempt_page_lock(zone
, data_vio
);
1925 if (result
!= VDO_SUCCESS
) {
1926 abort_load(data_vio
, result
);
1930 if (data_vio
->tree_lock
.locked
) {
1931 data_vio
->waiter
.callback
= load_page
;
1932 acquire_vio_from_pool(zone
->vio_pool
, &data_vio
->waiter
);
1936 static void allocation_failure(struct vdo_completion
*completion
)
1938 struct data_vio
*data_vio
= as_data_vio(completion
);
1940 if (vdo_requeue_completion_if_needed(completion
,
1941 data_vio
->logical
.zone
->thread_id
))
1944 abort_lookup(data_vio
, completion
->result
, "allocation");
1947 static void continue_allocation_for_waiter(struct vdo_waiter
*waiter
, void *context
)
1949 struct data_vio
*data_vio
= vdo_waiter_as_data_vio(waiter
);
1950 struct tree_lock
*tree_lock
= &data_vio
->tree_lock
;
1951 physical_block_number_t pbn
= *((physical_block_number_t
*) context
);
1953 tree_lock
->height
--;
1954 data_vio
->tree_lock
.tree_slots
[tree_lock
->height
].block_map_slot
.pbn
= pbn
;
1956 if (tree_lock
->height
== 0) {
1957 finish_lookup(data_vio
, VDO_SUCCESS
);
1961 allocate_block_map_page(data_vio
->logical
.zone
->block_map_zone
, data_vio
);
1964 /** expire_oldest_list() - Expire the oldest list. */
1965 static void expire_oldest_list(struct dirty_lists
*dirty_lists
)
1967 block_count_t i
= dirty_lists
->offset
++;
1969 dirty_lists
->oldest_period
++;
1970 if (!list_empty(&dirty_lists
->eras
[i
][VDO_TREE_PAGE
])) {
1971 list_splice_tail_init(&dirty_lists
->eras
[i
][VDO_TREE_PAGE
],
1972 &dirty_lists
->expired
[VDO_TREE_PAGE
]);
1975 if (!list_empty(&dirty_lists
->eras
[i
][VDO_CACHE_PAGE
])) {
1976 list_splice_tail_init(&dirty_lists
->eras
[i
][VDO_CACHE_PAGE
],
1977 &dirty_lists
->expired
[VDO_CACHE_PAGE
]);
1980 if (dirty_lists
->offset
== dirty_lists
->maximum_age
)
1981 dirty_lists
->offset
= 0;
1985 /** update_period() - Update the dirty_lists period if necessary. */
1986 static void update_period(struct dirty_lists
*dirty
, sequence_number_t period
)
1988 while (dirty
->next_period
<= period
) {
1989 if ((dirty
->next_period
- dirty
->oldest_period
) == dirty
->maximum_age
)
1990 expire_oldest_list(dirty
);
1991 dirty
->next_period
++;
1995 /** write_expired_elements() - Write out the expired list. */
1996 static void write_expired_elements(struct block_map_zone
*zone
)
1998 struct tree_page
*page
, *ttmp
;
1999 struct page_info
*info
, *ptmp
;
2000 struct list_head
*expired
;
2001 u8 generation
= zone
->generation
;
2003 expired
= &zone
->dirty_lists
->expired
[VDO_TREE_PAGE
];
2004 list_for_each_entry_safe(page
, ttmp
, expired
, entry
) {
2007 list_del_init(&page
->entry
);
2009 result
= VDO_ASSERT(!vdo_waiter_is_waiting(&page
->waiter
),
2010 "Newly expired page not already waiting to write");
2011 if (result
!= VDO_SUCCESS
) {
2012 enter_zone_read_only_mode(zone
, result
);
2016 set_generation(zone
, page
, generation
);
2018 enqueue_page(page
, zone
);
2021 expired
= &zone
->dirty_lists
->expired
[VDO_CACHE_PAGE
];
2022 list_for_each_entry_safe(info
, ptmp
, expired
, state_entry
) {
2023 list_del_init(&info
->state_entry
);
2024 schedule_page_save(info
);
2027 save_pages(&zone
->page_cache
);
2031 * add_to_dirty_lists() - Add an element to the dirty lists.
2032 * @zone: The zone in which we are operating.
2033 * @entry: The list entry of the element to add.
2034 * @type: The type of page.
2035 * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty.
2036 * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a
2039 static void add_to_dirty_lists(struct block_map_zone
*zone
,
2040 struct list_head
*entry
,
2041 enum block_map_page_type type
,
2042 sequence_number_t old_period
,
2043 sequence_number_t new_period
)
2045 struct dirty_lists
*dirty_lists
= zone
->dirty_lists
;
2047 if ((old_period
== new_period
) || ((old_period
!= 0) && (old_period
< new_period
)))
2050 if (new_period
< dirty_lists
->oldest_period
) {
2051 list_move_tail(entry
, &dirty_lists
->expired
[type
]);
2053 update_period(dirty_lists
, new_period
);
2054 list_move_tail(entry
,
2055 &dirty_lists
->eras
[new_period
% dirty_lists
->maximum_age
][type
]);
2058 write_expired_elements(zone
);
2062 * Record the allocation in the tree and wake any waiters now that the write lock has been
2065 static void finish_block_map_allocation(struct vdo_completion
*completion
)
2067 physical_block_number_t pbn
;
2068 struct tree_page
*tree_page
;
2069 struct block_map_page
*page
;
2070 sequence_number_t old_lock
;
2071 struct data_vio
*data_vio
= as_data_vio(completion
);
2072 struct block_map_zone
*zone
= data_vio
->logical
.zone
->block_map_zone
;
2073 struct tree_lock
*tree_lock
= &data_vio
->tree_lock
;
2074 height_t height
= tree_lock
->height
;
2076 assert_data_vio_in_logical_zone(data_vio
);
2078 tree_page
= get_tree_page(zone
, tree_lock
);
2079 pbn
= tree_lock
->tree_slots
[height
- 1].block_map_slot
.pbn
;
2081 /* Record the allocation. */
2082 page
= (struct block_map_page
*) tree_page
->page_buffer
;
2083 old_lock
= tree_page
->recovery_lock
;
2084 vdo_update_block_map_page(page
, data_vio
, pbn
,
2085 VDO_MAPPING_STATE_UNCOMPRESSED
,
2086 &tree_page
->recovery_lock
);
2088 if (vdo_waiter_is_waiting(&tree_page
->waiter
)) {
2089 /* This page is waiting to be written out. */
2090 if (zone
->flusher
!= tree_page
) {
2092 * The outstanding flush won't cover the update we just made,
2093 * so mark the page as needing another flush.
2095 set_generation(zone
, tree_page
, zone
->generation
);
2098 /* Put the page on a dirty list */
2100 INIT_LIST_HEAD(&tree_page
->entry
);
2101 add_to_dirty_lists(zone
, &tree_page
->entry
, VDO_TREE_PAGE
,
2102 old_lock
, tree_page
->recovery_lock
);
2105 tree_lock
->height
--;
2107 /* Format the interior node we just allocated (in memory). */
2108 tree_page
= get_tree_page(zone
, tree_lock
);
2109 vdo_format_block_map_page(tree_page
->page_buffer
,
2110 zone
->block_map
->nonce
,
2114 /* Release our claim to the allocation and wake any waiters */
2115 release_page_lock(data_vio
, "allocation");
2116 vdo_waitq_notify_all_waiters(&tree_lock
->waiters
,
2117 continue_allocation_for_waiter
, &pbn
);
2118 if (tree_lock
->height
== 0) {
2119 finish_lookup(data_vio
, VDO_SUCCESS
);
2123 allocate_block_map_page(zone
, data_vio
);
2126 static void release_block_map_write_lock(struct vdo_completion
*completion
)
2128 struct data_vio
*data_vio
= as_data_vio(completion
);
2130 assert_data_vio_in_allocated_zone(data_vio
);
2132 release_data_vio_allocation_lock(data_vio
, true);
2133 launch_data_vio_logical_callback(data_vio
, finish_block_map_allocation
);
2137 * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled,
2138 * to prevent deduplication against the block after we release the write lock on it, but before we
2139 * write out the page.
2141 static void set_block_map_page_reference_count(struct vdo_completion
*completion
)
2143 struct data_vio
*data_vio
= as_data_vio(completion
);
2145 assert_data_vio_in_allocated_zone(data_vio
);
2147 completion
->callback
= release_block_map_write_lock
;
2148 vdo_modify_reference_count(completion
, &data_vio
->increment_updater
);
2151 static void journal_block_map_allocation(struct vdo_completion
*completion
)
2153 struct data_vio
*data_vio
= as_data_vio(completion
);
2155 assert_data_vio_in_journal_zone(data_vio
);
2157 set_data_vio_allocated_zone_callback(data_vio
,
2158 set_block_map_page_reference_count
);
2159 vdo_add_recovery_journal_entry(completion
->vdo
->recovery_journal
, data_vio
);
2162 static void allocate_block(struct vdo_completion
*completion
)
2164 struct data_vio
*data_vio
= as_data_vio(completion
);
2165 struct tree_lock
*lock
= &data_vio
->tree_lock
;
2166 physical_block_number_t pbn
;
2168 assert_data_vio_in_allocated_zone(data_vio
);
2170 if (!vdo_allocate_block_in_zone(data_vio
))
2173 pbn
= data_vio
->allocation
.pbn
;
2174 lock
->tree_slots
[lock
->height
- 1].block_map_slot
.pbn
= pbn
;
2175 data_vio
->increment_updater
= (struct reference_updater
) {
2176 .operation
= VDO_JOURNAL_BLOCK_MAP_REMAPPING
,
2180 .state
= VDO_MAPPING_STATE_UNCOMPRESSED
,
2182 .lock
= data_vio
->allocation
.lock
,
2185 launch_data_vio_journal_callback(data_vio
, journal_block_map_allocation
);
2188 static void allocate_block_map_page(struct block_map_zone
*zone
,
2189 struct data_vio
*data_vio
)
2193 if (!data_vio
->write
|| data_vio
->is_discard
) {
2194 /* This is a pure read or a discard, so there's nothing left to do here. */
2195 finish_lookup(data_vio
, VDO_SUCCESS
);
2199 result
= attempt_page_lock(zone
, data_vio
);
2200 if (result
!= VDO_SUCCESS
) {
2201 abort_lookup(data_vio
, result
, "allocation");
2205 if (!data_vio
->tree_lock
.locked
)
2208 data_vio_allocate_data_block(data_vio
, VIO_BLOCK_MAP_WRITE_LOCK
,
2209 allocate_block
, allocation_failure
);
2213 * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio
2214 * resides and cache that result in the data_vio.
2216 * All ancestors in the tree will be allocated or loaded, as needed.
2218 void vdo_find_block_map_slot(struct data_vio
*data_vio
)
2220 page_number_t page_index
;
2221 struct block_map_tree_slot tree_slot
;
2222 struct data_location mapping
;
2223 struct block_map_page
*page
= NULL
;
2224 struct tree_lock
*lock
= &data_vio
->tree_lock
;
2225 struct block_map_zone
*zone
= data_vio
->logical
.zone
->block_map_zone
;
2227 zone
->active_lookups
++;
2228 if (vdo_is_state_draining(&zone
->state
)) {
2229 finish_lookup(data_vio
, VDO_SHUTTING_DOWN
);
2233 lock
->tree_slots
[0].block_map_slot
.slot
=
2234 data_vio
->logical
.lbn
% VDO_BLOCK_MAP_ENTRIES_PER_PAGE
;
2235 page_index
= (lock
->tree_slots
[0].page_index
/ zone
->block_map
->root_count
);
2236 tree_slot
= (struct block_map_tree_slot
) {
2237 .page_index
= page_index
/ VDO_BLOCK_MAP_ENTRIES_PER_PAGE
,
2240 .slot
= page_index
% VDO_BLOCK_MAP_ENTRIES_PER_PAGE
,
2244 for (lock
->height
= 1; lock
->height
<= VDO_BLOCK_MAP_TREE_HEIGHT
; lock
->height
++) {
2245 physical_block_number_t pbn
;
2247 lock
->tree_slots
[lock
->height
] = tree_slot
;
2248 page
= (struct block_map_page
*) (get_tree_page(zone
, lock
)->page_buffer
);
2249 pbn
= vdo_get_block_map_page_pbn(page
);
2250 if (pbn
!= VDO_ZERO_BLOCK
) {
2251 lock
->tree_slots
[lock
->height
].block_map_slot
.pbn
= pbn
;
2255 /* Calculate the index and slot for the next level. */
2256 tree_slot
.block_map_slot
.slot
=
2257 tree_slot
.page_index
% VDO_BLOCK_MAP_ENTRIES_PER_PAGE
;
2258 tree_slot
.page_index
= tree_slot
.page_index
/ VDO_BLOCK_MAP_ENTRIES_PER_PAGE
;
2261 /* The page at this height has been allocated and loaded. */
2262 mapping
= vdo_unpack_block_map_entry(&page
->entries
[tree_slot
.block_map_slot
.slot
]);
2263 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio
), &mapping
, lock
->height
)) {
2264 vdo_log_error_strerror(VDO_BAD_MAPPING
,
2265 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
2266 (unsigned long long) mapping
.pbn
, mapping
.state
,
2267 lock
->tree_slots
[lock
->height
- 1].page_index
,
2269 abort_load(data_vio
, VDO_BAD_MAPPING
);
2273 if (!vdo_is_mapped_location(&mapping
)) {
2274 /* The page we want one level down has not been allocated, so allocate it. */
2275 allocate_block_map_page(zone
, data_vio
);
2279 lock
->tree_slots
[lock
->height
- 1].block_map_slot
.pbn
= mapping
.pbn
;
2280 if (lock
->height
== 1) {
2281 /* This is the ultimate block map page, so we're done */
2282 finish_lookup(data_vio
, VDO_SUCCESS
);
2286 /* We know what page we need to load. */
2287 load_block_map_page(zone
, data_vio
);
2291 * Find the PBN of a leaf block map page. This method may only be used after all allocated tree
2292 * pages have been loaded, otherwise, it may give the wrong answer (0).
2294 physical_block_number_t
vdo_find_block_map_page_pbn(struct block_map
*map
,
2295 page_number_t page_number
)
2297 struct data_location mapping
;
2298 struct tree_page
*tree_page
;
2299 struct block_map_page
*page
;
2300 root_count_t root_index
= page_number
% map
->root_count
;
2301 page_number_t page_index
= page_number
/ map
->root_count
;
2302 slot_number_t slot
= page_index
% VDO_BLOCK_MAP_ENTRIES_PER_PAGE
;
2304 page_index
/= VDO_BLOCK_MAP_ENTRIES_PER_PAGE
;
2306 tree_page
= get_tree_page_by_index(map
->forest
, root_index
, 1, page_index
);
2307 page
= (struct block_map_page
*) tree_page
->page_buffer
;
2308 if (!page
->header
.initialized
)
2309 return VDO_ZERO_BLOCK
;
2311 mapping
= vdo_unpack_block_map_entry(&page
->entries
[slot
]);
2312 if (!vdo_is_valid_location(&mapping
) || vdo_is_state_compressed(mapping
.state
))
2313 return VDO_ZERO_BLOCK
;
2318 * Write a tree page or indicate that it has been re-dirtied if it is already being written. This
2319 * method is used when correcting errors in the tree during read-only rebuild.
2321 void vdo_write_tree_page(struct tree_page
*page
, struct block_map_zone
*zone
)
2323 bool waiting
= vdo_waiter_is_waiting(&page
->waiter
);
2325 if (waiting
&& (zone
->flusher
== page
))
2328 set_generation(zone
, page
, zone
->generation
);
2329 if (waiting
|| page
->writing
)
2332 enqueue_page(page
, zone
);
2335 static int make_segment(struct forest
*old_forest
, block_count_t new_pages
,
2336 struct boundary
*new_boundary
, struct forest
*forest
)
2338 size_t index
= (old_forest
== NULL
) ? 0 : old_forest
->segments
;
2339 struct tree_page
*page_ptr
;
2340 page_count_t segment_sizes
[VDO_BLOCK_MAP_TREE_HEIGHT
];
2345 forest
->segments
= index
+ 1;
2347 result
= vdo_allocate(forest
->segments
, struct boundary
,
2348 "forest boundary array", &forest
->boundaries
);
2349 if (result
!= VDO_SUCCESS
)
2352 result
= vdo_allocate(forest
->segments
, struct tree_page
*,
2353 "forest page pointers", &forest
->pages
);
2354 if (result
!= VDO_SUCCESS
)
2357 result
= vdo_allocate(new_pages
, struct tree_page
,
2358 "new forest pages", &forest
->pages
[index
]);
2359 if (result
!= VDO_SUCCESS
)
2363 memcpy(forest
->boundaries
, old_forest
->boundaries
,
2364 index
* sizeof(struct boundary
));
2365 memcpy(forest
->pages
, old_forest
->pages
,
2366 index
* sizeof(struct tree_page
*));
2369 memcpy(&(forest
->boundaries
[index
]), new_boundary
, sizeof(struct boundary
));
2371 for (height
= 0; height
< VDO_BLOCK_MAP_TREE_HEIGHT
; height
++) {
2372 segment_sizes
[height
] = new_boundary
->levels
[height
];
2374 segment_sizes
[height
] -= old_forest
->boundaries
[index
- 1].levels
[height
];
2377 page_ptr
= forest
->pages
[index
];
2378 for (root
= 0; root
< forest
->map
->root_count
; root
++) {
2379 struct block_map_tree_segment
*segment
;
2380 struct block_map_tree
*tree
= &(forest
->trees
[root
]);
2383 int result
= vdo_allocate(forest
->segments
,
2384 struct block_map_tree_segment
,
2385 "tree root segments", &tree
->segments
);
2386 if (result
!= VDO_SUCCESS
)
2390 memcpy(tree
->segments
, old_forest
->trees
[root
].segments
,
2391 index
* sizeof(struct block_map_tree_segment
));
2394 segment
= &(tree
->segments
[index
]);
2395 for (height
= 0; height
< VDO_BLOCK_MAP_TREE_HEIGHT
; height
++) {
2396 if (segment_sizes
[height
] == 0)
2399 segment
->levels
[height
] = page_ptr
;
2400 if (height
== (VDO_BLOCK_MAP_TREE_HEIGHT
- 1)) {
2401 /* Record the root. */
2402 struct block_map_page
*page
=
2403 vdo_format_block_map_page(page_ptr
->page_buffer
,
2405 VDO_INVALID_PBN
, true);
2407 vdo_pack_block_map_entry(forest
->map
->root_origin
+ root
,
2408 VDO_MAPPING_STATE_UNCOMPRESSED
);
2410 page_ptr
+= segment_sizes
[height
];
2417 static void deforest(struct forest
*forest
, size_t first_page_segment
)
2421 if (forest
->pages
!= NULL
) {
2424 for (segment
= first_page_segment
; segment
< forest
->segments
; segment
++)
2425 vdo_free(forest
->pages
[segment
]);
2426 vdo_free(forest
->pages
);
2429 for (root
= 0; root
< forest
->map
->root_count
; root
++)
2430 vdo_free(forest
->trees
[root
].segments
);
2432 vdo_free(forest
->boundaries
);
2437 * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if
2439 * @entries: The number of entries the block map will hold.
2441 * Return: VDO_SUCCESS or an error.
2443 static int make_forest(struct block_map
*map
, block_count_t entries
)
2445 struct forest
*forest
, *old_forest
= map
->forest
;
2446 struct boundary new_boundary
, *old_boundary
= NULL
;
2447 block_count_t new_pages
;
2450 if (old_forest
!= NULL
)
2451 old_boundary
= &(old_forest
->boundaries
[old_forest
->segments
- 1]);
2453 new_pages
= vdo_compute_new_forest_pages(map
->root_count
, old_boundary
,
2454 entries
, &new_boundary
);
2455 if (new_pages
== 0) {
2456 map
->next_entry_count
= entries
;
2460 result
= vdo_allocate_extended(struct forest
, map
->root_count
,
2461 struct block_map_tree
, __func__
,
2463 if (result
!= VDO_SUCCESS
)
2467 result
= make_segment(old_forest
, new_pages
, &new_boundary
, forest
);
2468 if (result
!= VDO_SUCCESS
) {
2469 deforest(forest
, forest
->segments
- 1);
2473 map
->next_forest
= forest
;
2474 map
->next_entry_count
= entries
;
2479 * replace_forest() - Replace a block_map's forest with the already-prepared larger forest.
2481 static void replace_forest(struct block_map
*map
)
2483 if (map
->next_forest
!= NULL
) {
2484 if (map
->forest
!= NULL
)
2485 deforest(map
->forest
, map
->forest
->segments
);
2486 map
->forest
= vdo_forget(map
->next_forest
);
2489 map
->entry_count
= map
->next_entry_count
;
2490 map
->next_entry_count
= 0;
2494 * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the
2497 static void finish_cursor(struct cursor
*cursor
)
2499 struct cursors
*cursors
= cursor
->parent
;
2500 struct vdo_completion
*completion
= cursors
->completion
;
2502 return_vio_to_pool(cursors
->pool
, vdo_forget(cursor
->vio
));
2503 if (--cursors
->active_roots
> 0)
2508 vdo_finish_completion(completion
);
2511 static void traverse(struct cursor
*cursor
);
2514 * continue_traversal() - Continue traversing a block map tree.
2515 * @completion: The VIO doing a read or write.
2517 static void continue_traversal(struct vdo_completion
*completion
)
2519 vio_record_metadata_io_error(as_vio(completion
));
2520 traverse(completion
->parent
);
2524 * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded.
2525 * @completion: The VIO doing the read.
2527 static void finish_traversal_load(struct vdo_completion
*completion
)
2529 struct cursor
*cursor
= completion
->parent
;
2530 height_t height
= cursor
->height
;
2531 struct cursor_level
*level
= &cursor
->levels
[height
];
2532 struct tree_page
*tree_page
=
2533 &(cursor
->tree
->segments
[0].levels
[height
][level
->page_index
]);
2534 struct block_map_page
*page
= (struct block_map_page
*) tree_page
->page_buffer
;
2536 vdo_copy_valid_page(cursor
->vio
->vio
.data
,
2537 cursor
->parent
->zone
->block_map
->nonce
,
2538 pbn_from_vio_bio(cursor
->vio
->vio
.bio
), page
);
2542 static void traversal_endio(struct bio
*bio
)
2544 struct vio
*vio
= bio
->bi_private
;
2545 struct cursor
*cursor
= vio
->completion
.parent
;
2547 continue_vio_after_io(vio
, finish_traversal_load
,
2548 cursor
->parent
->zone
->thread_id
);
2552 * traverse() - Traverse a single block map tree.
2554 * This is the recursive heart of the traversal process.
2556 static void traverse(struct cursor
*cursor
)
2558 for (; cursor
->height
< VDO_BLOCK_MAP_TREE_HEIGHT
; cursor
->height
++) {
2559 height_t height
= cursor
->height
;
2560 struct cursor_level
*level
= &cursor
->levels
[height
];
2561 struct tree_page
*tree_page
=
2562 &(cursor
->tree
->segments
[0].levels
[height
][level
->page_index
]);
2563 struct block_map_page
*page
= (struct block_map_page
*) tree_page
->page_buffer
;
2565 if (!page
->header
.initialized
)
2568 for (; level
->slot
< VDO_BLOCK_MAP_ENTRIES_PER_PAGE
; level
->slot
++) {
2569 struct cursor_level
*next_level
;
2570 page_number_t entry_index
=
2571 (VDO_BLOCK_MAP_ENTRIES_PER_PAGE
* level
->page_index
) + level
->slot
;
2572 struct data_location location
=
2573 vdo_unpack_block_map_entry(&page
->entries
[level
->slot
]);
2575 if (!vdo_is_valid_location(&location
)) {
2576 /* This entry is invalid, so remove it from the page. */
2577 page
->entries
[level
->slot
] = UNMAPPED_BLOCK_MAP_ENTRY
;
2578 vdo_write_tree_page(tree_page
, cursor
->parent
->zone
);
2582 if (!vdo_is_mapped_location(&location
))
2585 /* Erase mapped entries past the end of the logical space. */
2586 if (entry_index
>= cursor
->boundary
.levels
[height
]) {
2587 page
->entries
[level
->slot
] = UNMAPPED_BLOCK_MAP_ENTRY
;
2588 vdo_write_tree_page(tree_page
, cursor
->parent
->zone
);
2592 if (cursor
->height
< VDO_BLOCK_MAP_TREE_HEIGHT
- 1) {
2593 int result
= cursor
->parent
->entry_callback(location
.pbn
,
2594 cursor
->parent
->completion
);
2595 if (result
!= VDO_SUCCESS
) {
2596 page
->entries
[level
->slot
] = UNMAPPED_BLOCK_MAP_ENTRY
;
2597 vdo_write_tree_page(tree_page
, cursor
->parent
->zone
);
2602 if (cursor
->height
== 0)
2606 next_level
= &cursor
->levels
[cursor
->height
];
2607 next_level
->page_index
= entry_index
;
2608 next_level
->slot
= 0;
2610 vdo_submit_metadata_vio(&cursor
->vio
->vio
, location
.pbn
,
2611 traversal_endio
, continue_traversal
,
2612 REQ_OP_READ
| REQ_PRIO
);
2617 finish_cursor(cursor
);
2621 * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with
2622 * which to load pages.
2623 * @context: The pooled_vio just acquired.
2625 * Implements waiter_callback_fn.
2627 static void launch_cursor(struct vdo_waiter
*waiter
, void *context
)
2629 struct cursor
*cursor
= container_of(waiter
, struct cursor
, waiter
);
2630 struct pooled_vio
*pooled
= context
;
2632 cursor
->vio
= pooled
;
2633 pooled
->vio
.completion
.parent
= cursor
;
2634 pooled
->vio
.completion
.callback_thread_id
= cursor
->parent
->zone
->thread_id
;
2639 * compute_boundary() - Compute the number of pages used at each level of the given root's tree.
2641 * Return: The list of page counts as a boundary structure.
2643 static struct boundary
compute_boundary(struct block_map
*map
, root_count_t root_index
)
2645 struct boundary boundary
;
2647 page_count_t leaf_pages
= vdo_compute_block_map_page_count(map
->entry_count
);
2649 * Compute the leaf pages for this root. If the number of leaf pages does not distribute
2650 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to
2651 * roots starting from tree 0.
2653 page_count_t last_tree_root
= (leaf_pages
- 1) % map
->root_count
;
2654 page_count_t level_pages
= leaf_pages
/ map
->root_count
;
2656 if (root_index
<= last_tree_root
)
2659 for (height
= 0; height
< VDO_BLOCK_MAP_TREE_HEIGHT
- 1; height
++) {
2660 boundary
.levels
[height
] = level_pages
;
2661 level_pages
= DIV_ROUND_UP(level_pages
, VDO_BLOCK_MAP_ENTRIES_PER_PAGE
);
2664 /* The root node always exists, even if the root is otherwise unused. */
2665 boundary
.levels
[VDO_BLOCK_MAP_TREE_HEIGHT
- 1] = 1;
2671 * vdo_traverse_forest() - Walk the entire forest of a block map.
2672 * @callback: A function to call with the pbn of each allocated node in the forest.
2673 * @completion: The completion to notify on each traversed PBN, and when traversal completes.
2675 void vdo_traverse_forest(struct block_map
*map
, vdo_entry_callback_fn callback
,
2676 struct vdo_completion
*completion
)
2679 struct cursors
*cursors
;
2682 result
= vdo_allocate_extended(struct cursors
, map
->root_count
,
2683 struct cursor
, __func__
, &cursors
);
2684 if (result
!= VDO_SUCCESS
) {
2685 vdo_fail_completion(completion
, result
);
2689 cursors
->zone
= &map
->zones
[0];
2690 cursors
->pool
= cursors
->zone
->vio_pool
;
2691 cursors
->entry_callback
= callback
;
2692 cursors
->completion
= completion
;
2693 cursors
->active_roots
= map
->root_count
;
2694 for (root
= 0; root
< map
->root_count
; root
++) {
2695 struct cursor
*cursor
= &cursors
->cursors
[root
];
2697 *cursor
= (struct cursor
) {
2698 .tree
= &map
->forest
->trees
[root
],
2699 .height
= VDO_BLOCK_MAP_TREE_HEIGHT
- 1,
2701 .boundary
= compute_boundary(map
, root
),
2704 cursor
->waiter
.callback
= launch_cursor
;
2705 acquire_vio_from_pool(cursors
->pool
, &cursor
->waiter
);
2710 * initialize_block_map_zone() - Initialize the per-zone portions of the block map.
2711 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
2714 static int __must_check
initialize_block_map_zone(struct block_map
*map
,
2715 zone_count_t zone_number
,
2716 page_count_t cache_size
,
2717 block_count_t maximum_age
)
2721 struct vdo
*vdo
= map
->vdo
;
2722 struct block_map_zone
*zone
= &map
->zones
[zone_number
];
2724 BUILD_BUG_ON(sizeof(struct page_descriptor
) != sizeof(u64
));
2726 zone
->zone_number
= zone_number
;
2727 zone
->thread_id
= vdo
->thread_config
.logical_threads
[zone_number
];
2728 zone
->block_map
= map
;
2730 result
= vdo_allocate_extended(struct dirty_lists
, maximum_age
,
2731 dirty_era_t
, __func__
,
2732 &zone
->dirty_lists
);
2733 if (result
!= VDO_SUCCESS
)
2736 zone
->dirty_lists
->maximum_age
= maximum_age
;
2737 INIT_LIST_HEAD(&zone
->dirty_lists
->expired
[VDO_TREE_PAGE
]);
2738 INIT_LIST_HEAD(&zone
->dirty_lists
->expired
[VDO_CACHE_PAGE
]);
2740 for (i
= 0; i
< maximum_age
; i
++) {
2741 INIT_LIST_HEAD(&zone
->dirty_lists
->eras
[i
][VDO_TREE_PAGE
]);
2742 INIT_LIST_HEAD(&zone
->dirty_lists
->eras
[i
][VDO_CACHE_PAGE
]);
2745 result
= vdo_int_map_create(VDO_LOCK_MAP_CAPACITY
, &zone
->loading_pages
);
2746 if (result
!= VDO_SUCCESS
)
2749 result
= make_vio_pool(vdo
, BLOCK_MAP_VIO_POOL_SIZE
,
2750 zone
->thread_id
, VIO_TYPE_BLOCK_MAP_INTERIOR
,
2751 VIO_PRIORITY_METADATA
, zone
, &zone
->vio_pool
);
2752 if (result
!= VDO_SUCCESS
)
2755 vdo_set_admin_state_code(&zone
->state
, VDO_ADMIN_STATE_NORMAL_OPERATION
);
2757 zone
->page_cache
.zone
= zone
;
2758 zone
->page_cache
.vdo
= vdo
;
2759 zone
->page_cache
.page_count
= cache_size
/ map
->zone_count
;
2760 zone
->page_cache
.stats
.free_pages
= zone
->page_cache
.page_count
;
2762 result
= allocate_cache_components(&zone
->page_cache
);
2763 if (result
!= VDO_SUCCESS
)
2766 /* initialize empty circular queues */
2767 INIT_LIST_HEAD(&zone
->page_cache
.lru_list
);
2768 INIT_LIST_HEAD(&zone
->page_cache
.outgoing_list
);
2773 /* Implements vdo_zone_thread_getter_fn */
2774 static thread_id_t
get_block_map_zone_thread_id(void *context
, zone_count_t zone_number
)
2776 struct block_map
*map
= context
;
2778 return map
->zones
[zone_number
].thread_id
;
2781 /* Implements vdo_action_preamble_fn */
2782 static void prepare_for_era_advance(void *context
, struct vdo_completion
*parent
)
2784 struct block_map
*map
= context
;
2786 map
->current_era_point
= map
->pending_era_point
;
2787 vdo_finish_completion(parent
);
2790 /* Implements vdo_zone_action_fn */
2791 static void advance_block_map_zone_era(void *context
, zone_count_t zone_number
,
2792 struct vdo_completion
*parent
)
2794 struct block_map
*map
= context
;
2795 struct block_map_zone
*zone
= &map
->zones
[zone_number
];
2797 update_period(zone
->dirty_lists
, map
->current_era_point
);
2798 write_expired_elements(zone
);
2799 vdo_finish_completion(parent
);
2803 * Schedule an era advance if necessary. This method should not be called directly. Rather, call
2804 * vdo_schedule_default_action() on the block map's action manager.
2806 * Implements vdo_action_scheduler_fn.
2808 static bool schedule_era_advance(void *context
)
2810 struct block_map
*map
= context
;
2812 if (map
->current_era_point
== map
->pending_era_point
)
2815 return vdo_schedule_action(map
->action_manager
, prepare_for_era_advance
,
2816 advance_block_map_zone_era
, NULL
, NULL
);
2819 static void uninitialize_block_map_zone(struct block_map_zone
*zone
)
2821 struct vdo_page_cache
*cache
= &zone
->page_cache
;
2823 vdo_free(vdo_forget(zone
->dirty_lists
));
2824 free_vio_pool(vdo_forget(zone
->vio_pool
));
2825 vdo_int_map_free(vdo_forget(zone
->loading_pages
));
2826 if (cache
->infos
!= NULL
) {
2827 struct page_info
*info
;
2829 for (info
= cache
->infos
; info
< cache
->infos
+ cache
->page_count
; info
++)
2830 free_vio(vdo_forget(info
->vio
));
2833 vdo_int_map_free(vdo_forget(cache
->page_map
));
2834 vdo_free(vdo_forget(cache
->infos
));
2835 vdo_free(vdo_forget(cache
->pages
));
2838 void vdo_free_block_map(struct block_map
*map
)
2845 for (zone
= 0; zone
< map
->zone_count
; zone
++)
2846 uninitialize_block_map_zone(&map
->zones
[zone
]);
2848 vdo_abandon_block_map_growth(map
);
2849 if (map
->forest
!= NULL
)
2850 deforest(vdo_forget(map
->forest
), 0);
2851 vdo_free(vdo_forget(map
->action_manager
));
2855 /* @journal may be NULL. */
2856 int vdo_decode_block_map(struct block_map_state_2_0 state
, block_count_t logical_blocks
,
2857 struct vdo
*vdo
, struct recovery_journal
*journal
,
2858 nonce_t nonce
, page_count_t cache_size
, block_count_t maximum_age
,
2859 struct block_map
**map_ptr
)
2861 struct block_map
*map
;
2863 zone_count_t zone
= 0;
2865 BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE
!=
2866 ((VDO_BLOCK_SIZE
- sizeof(struct block_map_page
)) /
2867 sizeof(struct block_map_entry
)));
2868 result
= VDO_ASSERT(cache_size
> 0, "block map cache size is specified");
2869 if (result
!= VDO_SUCCESS
)
2872 result
= vdo_allocate_extended(struct block_map
,
2873 vdo
->thread_config
.logical_zone_count
,
2874 struct block_map_zone
, __func__
, &map
);
2875 if (result
!= VDO_SUCCESS
)
2879 map
->root_origin
= state
.root_origin
;
2880 map
->root_count
= state
.root_count
;
2881 map
->entry_count
= logical_blocks
;
2882 map
->journal
= journal
;
2885 result
= make_forest(map
, map
->entry_count
);
2886 if (result
!= VDO_SUCCESS
) {
2887 vdo_free_block_map(map
);
2891 replace_forest(map
);
2893 map
->zone_count
= vdo
->thread_config
.logical_zone_count
;
2894 for (zone
= 0; zone
< map
->zone_count
; zone
++) {
2895 result
= initialize_block_map_zone(map
, zone
, cache_size
, maximum_age
);
2896 if (result
!= VDO_SUCCESS
) {
2897 vdo_free_block_map(map
);
2902 result
= vdo_make_action_manager(map
->zone_count
, get_block_map_zone_thread_id
,
2903 vdo_get_recovery_journal_thread_id(journal
),
2904 map
, schedule_era_advance
, vdo
,
2905 &map
->action_manager
);
2906 if (result
!= VDO_SUCCESS
) {
2907 vdo_free_block_map(map
);
2915 struct block_map_state_2_0
vdo_record_block_map(const struct block_map
*map
)
2917 return (struct block_map_state_2_0
) {
2918 .flat_page_origin
= VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN
,
2919 /* This is the flat page count, which has turned out to always be 0. */
2920 .flat_page_count
= 0,
2921 .root_origin
= map
->root_origin
,
2922 .root_count
= map
->root_count
,
2926 /* The block map needs to know the journals' sequence number to initialize the eras. */
2927 void vdo_initialize_block_map_from_journal(struct block_map
*map
,
2928 struct recovery_journal
*journal
)
2932 map
->current_era_point
= vdo_get_recovery_journal_current_sequence_number(journal
);
2933 map
->pending_era_point
= map
->current_era_point
;
2935 for (z
= 0; z
< map
->zone_count
; z
++) {
2936 struct dirty_lists
*dirty_lists
= map
->zones
[z
].dirty_lists
;
2938 VDO_ASSERT_LOG_ONLY(dirty_lists
->next_period
== 0, "current period not set");
2939 dirty_lists
->oldest_period
= map
->current_era_point
;
2940 dirty_lists
->next_period
= map
->current_era_point
+ 1;
2941 dirty_lists
->offset
= map
->current_era_point
% dirty_lists
->maximum_age
;
2945 /* Compute the logical zone for the LBN of a data vio. */
2946 zone_count_t
vdo_compute_logical_zone(struct data_vio
*data_vio
)
2948 struct block_map
*map
= vdo_from_data_vio(data_vio
)->block_map
;
2949 struct tree_lock
*tree_lock
= &data_vio
->tree_lock
;
2950 page_number_t page_number
= data_vio
->logical
.lbn
/ VDO_BLOCK_MAP_ENTRIES_PER_PAGE
;
2952 tree_lock
->tree_slots
[0].page_index
= page_number
;
2953 tree_lock
->root_index
= page_number
% map
->root_count
;
2954 return (tree_lock
->root_index
% map
->zone_count
);
2957 void vdo_advance_block_map_era(struct block_map
*map
,
2958 sequence_number_t recovery_block_number
)
2963 map
->pending_era_point
= recovery_block_number
;
2964 vdo_schedule_default_action(map
->action_manager
);
2967 /* Implements vdo_admin_initiator_fn */
2968 static void initiate_drain(struct admin_state
*state
)
2970 struct block_map_zone
*zone
= container_of(state
, struct block_map_zone
, state
);
2972 VDO_ASSERT_LOG_ONLY((zone
->active_lookups
== 0),
2973 "%s() called with no active lookups", __func__
);
2975 if (!vdo_is_state_suspending(state
)) {
2976 while (zone
->dirty_lists
->oldest_period
< zone
->dirty_lists
->next_period
)
2977 expire_oldest_list(zone
->dirty_lists
);
2978 write_expired_elements(zone
);
2981 check_for_drain_complete(zone
);
2984 /* Implements vdo_zone_action_fn. */
2985 static void drain_zone(void *context
, zone_count_t zone_number
,
2986 struct vdo_completion
*parent
)
2988 struct block_map
*map
= context
;
2989 struct block_map_zone
*zone
= &map
->zones
[zone_number
];
2991 vdo_start_draining(&zone
->state
,
2992 vdo_get_current_manager_operation(map
->action_manager
),
2993 parent
, initiate_drain
);
2996 void vdo_drain_block_map(struct block_map
*map
, const struct admin_state_code
*operation
,
2997 struct vdo_completion
*parent
)
2999 vdo_schedule_operation(map
->action_manager
, operation
, NULL
, drain_zone
, NULL
,
3003 /* Implements vdo_zone_action_fn. */
3004 static void resume_block_map_zone(void *context
, zone_count_t zone_number
,
3005 struct vdo_completion
*parent
)
3007 struct block_map
*map
= context
;
3008 struct block_map_zone
*zone
= &map
->zones
[zone_number
];
3010 vdo_fail_completion(parent
, vdo_resume_if_quiescent(&zone
->state
));
3013 void vdo_resume_block_map(struct block_map
*map
, struct vdo_completion
*parent
)
3015 vdo_schedule_operation(map
->action_manager
, VDO_ADMIN_STATE_RESUMING
,
3016 NULL
, resume_block_map_zone
, NULL
, parent
);
3019 /* Allocate an expanded collection of trees, for a future growth. */
3020 int vdo_prepare_to_grow_block_map(struct block_map
*map
,
3021 block_count_t new_logical_blocks
)
3023 if (map
->next_entry_count
== new_logical_blocks
)
3026 if (map
->next_entry_count
> 0)
3027 vdo_abandon_block_map_growth(map
);
3029 if (new_logical_blocks
< map
->entry_count
) {
3030 map
->next_entry_count
= map
->entry_count
;
3034 return make_forest(map
, new_logical_blocks
);
3037 /* Implements vdo_action_preamble_fn */
3038 static void grow_forest(void *context
, struct vdo_completion
*completion
)
3040 replace_forest(context
);
3041 vdo_finish_completion(completion
);
3044 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */
3045 void vdo_grow_block_map(struct block_map
*map
, struct vdo_completion
*parent
)
3047 vdo_schedule_operation(map
->action_manager
,
3048 VDO_ADMIN_STATE_SUSPENDED_OPERATION
,
3049 grow_forest
, NULL
, NULL
, parent
);
3052 void vdo_abandon_block_map_growth(struct block_map
*map
)
3054 struct forest
*forest
= vdo_forget(map
->next_forest
);
3057 deforest(forest
, forest
->segments
- 1);
3059 map
->next_entry_count
= 0;
3062 /* Release the page completion and then continue the requester. */
3063 static inline void finish_processing_page(struct vdo_completion
*completion
, int result
)
3065 struct vdo_completion
*parent
= completion
->parent
;
3067 vdo_release_page_completion(completion
);
3068 vdo_continue_completion(parent
, result
);
3071 static void handle_page_error(struct vdo_completion
*completion
)
3073 finish_processing_page(completion
, completion
->result
);
3076 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */
3077 static void fetch_mapping_page(struct data_vio
*data_vio
, bool modifiable
,
3078 vdo_action_fn action
)
3080 struct block_map_zone
*zone
= data_vio
->logical
.zone
->block_map_zone
;
3082 if (vdo_is_state_draining(&zone
->state
)) {
3083 continue_data_vio_with_error(data_vio
, VDO_SHUTTING_DOWN
);
3087 vdo_get_page(&data_vio
->page_completion
, zone
,
3088 data_vio
->tree_lock
.tree_slots
[0].block_map_slot
.pbn
,
3089 modifiable
, &data_vio
->vio
.completion
,
3090 action
, handle_page_error
, false);
3094 * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped.
3096 * This indicates the block map entry for the logical block is either unmapped or corrupted.
3098 static void clear_mapped_location(struct data_vio
*data_vio
)
3100 data_vio
->mapped
= (struct zoned_pbn
) {
3101 .state
= VDO_MAPPING_STATE_UNMAPPED
,
3106 * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a
3109 * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any
3112 static int __must_check
set_mapped_location(struct data_vio
*data_vio
,
3113 const struct block_map_entry
*entry
)
3115 /* Unpack the PBN for logging purposes even if the entry is invalid. */
3116 struct data_location mapped
= vdo_unpack_block_map_entry(entry
);
3118 if (vdo_is_valid_location(&mapped
)) {
3121 result
= vdo_get_physical_zone(vdo_from_data_vio(data_vio
),
3122 mapped
.pbn
, &data_vio
->mapped
.zone
);
3123 if (result
== VDO_SUCCESS
) {
3124 data_vio
->mapped
.pbn
= mapped
.pbn
;
3125 data_vio
->mapped
.state
= mapped
.state
;
3130 * Return all errors not specifically known to be errors from validating the
3133 if ((result
!= VDO_OUT_OF_RANGE
) && (result
!= VDO_BAD_MAPPING
))
3138 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases
3139 * to VDO_BAD_MAPPING.
3141 vdo_log_error_strerror(VDO_BAD_MAPPING
,
3142 "PBN %llu with state %u read from the block map was invalid",
3143 (unsigned long long) mapped
.pbn
, mapped
.state
);
3146 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding
3149 if (!data_vio
->write
)
3150 return VDO_BAD_MAPPING
;
3153 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped
3154 * entry rather than fail the write.
3156 clear_mapped_location(data_vio
);
3160 /* This callback is registered in vdo_get_mapped_block(). */
3161 static void get_mapping_from_fetched_page(struct vdo_completion
*completion
)
3164 struct vdo_page_completion
*vpc
= as_vdo_page_completion(completion
);
3165 const struct block_map_page
*page
;
3166 const struct block_map_entry
*entry
;
3167 struct data_vio
*data_vio
= as_data_vio(completion
->parent
);
3168 struct block_map_tree_slot
*tree_slot
;
3170 if (completion
->result
!= VDO_SUCCESS
) {
3171 finish_processing_page(completion
, completion
->result
);
3175 result
= validate_completed_page(vpc
, false);
3176 if (result
!= VDO_SUCCESS
) {
3177 finish_processing_page(completion
, result
);
3181 page
= (const struct block_map_page
*) get_page_buffer(vpc
->info
);
3182 tree_slot
= &data_vio
->tree_lock
.tree_slots
[0];
3183 entry
= &page
->entries
[tree_slot
->block_map_slot
.slot
];
3185 result
= set_mapped_location(data_vio
, entry
);
3186 finish_processing_page(completion
, result
);
3189 void vdo_update_block_map_page(struct block_map_page
*page
, struct data_vio
*data_vio
,
3190 physical_block_number_t pbn
,
3191 enum block_mapping_state mapping_state
,
3192 sequence_number_t
*recovery_lock
)
3194 struct block_map_zone
*zone
= data_vio
->logical
.zone
->block_map_zone
;
3195 struct block_map
*block_map
= zone
->block_map
;
3196 struct recovery_journal
*journal
= block_map
->journal
;
3197 sequence_number_t old_locked
, new_locked
;
3198 struct tree_lock
*tree_lock
= &data_vio
->tree_lock
;
3200 /* Encode the new mapping. */
3201 page
->entries
[tree_lock
->tree_slots
[tree_lock
->height
].block_map_slot
.slot
] =
3202 vdo_pack_block_map_entry(pbn
, mapping_state
);
3204 /* Adjust references on the recovery journal blocks. */
3205 old_locked
= *recovery_lock
;
3206 new_locked
= data_vio
->recovery_sequence_number
;
3208 if ((old_locked
== 0) || (old_locked
> new_locked
)) {
3209 vdo_acquire_recovery_journal_block_reference(journal
, new_locked
,
3210 VDO_ZONE_TYPE_LOGICAL
,
3213 if (old_locked
> 0) {
3214 vdo_release_recovery_journal_block_reference(journal
, old_locked
,
3215 VDO_ZONE_TYPE_LOGICAL
,
3219 *recovery_lock
= new_locked
;
3223 * FIXME: explain this more
3224 * Release the transferred lock from the data_vio.
3226 vdo_release_journal_entry_lock(journal
, new_locked
);
3227 data_vio
->recovery_sequence_number
= 0;
3230 static void put_mapping_in_fetched_page(struct vdo_completion
*completion
)
3232 struct data_vio
*data_vio
= as_data_vio(completion
->parent
);
3233 sequence_number_t old_lock
;
3234 struct vdo_page_completion
*vpc
;
3235 struct page_info
*info
;
3238 if (completion
->result
!= VDO_SUCCESS
) {
3239 finish_processing_page(completion
, completion
->result
);
3243 vpc
= as_vdo_page_completion(completion
);
3244 result
= validate_completed_page(vpc
, true);
3245 if (result
!= VDO_SUCCESS
) {
3246 finish_processing_page(completion
, result
);
3251 old_lock
= info
->recovery_lock
;
3252 vdo_update_block_map_page((struct block_map_page
*) get_page_buffer(info
),
3253 data_vio
, data_vio
->new_mapped
.pbn
,
3254 data_vio
->new_mapped
.state
, &info
->recovery_lock
);
3255 set_info_state(info
, PS_DIRTY
);
3256 add_to_dirty_lists(info
->cache
->zone
, &info
->state_entry
,
3257 VDO_CACHE_PAGE
, old_lock
, info
->recovery_lock
);
3258 finish_processing_page(completion
, VDO_SUCCESS
);
3261 /* Read a stored block mapping into a data_vio. */
3262 void vdo_get_mapped_block(struct data_vio
*data_vio
)
3264 if (data_vio
->tree_lock
.tree_slots
[0].block_map_slot
.pbn
== VDO_ZERO_BLOCK
) {
3266 * We know that the block map page for this LBN has not been allocated, so the
3267 * block must be unmapped.
3269 clear_mapped_location(data_vio
);
3270 continue_data_vio(data_vio
);
3274 fetch_mapping_page(data_vio
, false, get_mapping_from_fetched_page
);
3277 /* Update a stored block mapping to reflect a data_vio's new mapping. */
3278 void vdo_put_mapped_block(struct data_vio
*data_vio
)
3280 fetch_mapping_page(data_vio
, true, put_mapping_in_fetched_page
);
3283 struct block_map_statistics
vdo_get_block_map_statistics(struct block_map
*map
)
3285 zone_count_t zone
= 0;
3286 struct block_map_statistics totals
;
3288 memset(&totals
, 0, sizeof(struct block_map_statistics
));
3289 for (zone
= 0; zone
< map
->zone_count
; zone
++) {
3290 const struct block_map_statistics
*stats
=
3291 &(map
->zones
[zone
].page_cache
.stats
);
3293 totals
.dirty_pages
+= READ_ONCE(stats
->dirty_pages
);
3294 totals
.clean_pages
+= READ_ONCE(stats
->clean_pages
);
3295 totals
.free_pages
+= READ_ONCE(stats
->free_pages
);
3296 totals
.failed_pages
+= READ_ONCE(stats
->failed_pages
);
3297 totals
.incoming_pages
+= READ_ONCE(stats
->incoming_pages
);
3298 totals
.outgoing_pages
+= READ_ONCE(stats
->outgoing_pages
);
3299 totals
.cache_pressure
+= READ_ONCE(stats
->cache_pressure
);
3300 totals
.read_count
+= READ_ONCE(stats
->read_count
);
3301 totals
.write_count
+= READ_ONCE(stats
->write_count
);
3302 totals
.failed_reads
+= READ_ONCE(stats
->failed_reads
);
3303 totals
.failed_writes
+= READ_ONCE(stats
->failed_writes
);
3304 totals
.reclaimed
+= READ_ONCE(stats
->reclaimed
);
3305 totals
.read_outgoing
+= READ_ONCE(stats
->read_outgoing
);
3306 totals
.found_in_cache
+= READ_ONCE(stats
->found_in_cache
);
3307 totals
.discard_required
+= READ_ONCE(stats
->discard_required
);
3308 totals
.wait_for_page
+= READ_ONCE(stats
->wait_for_page
);
3309 totals
.fetch_required
+= READ_ONCE(stats
->fetch_required
);
3310 totals
.pages_loaded
+= READ_ONCE(stats
->pages_loaded
);
3311 totals
.pages_saved
+= READ_ONCE(stats
->pages_saved
);
3312 totals
.flush_count
+= READ_ONCE(stats
->flush_count
);