1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
9 #include <linux/blkdev.h>
10 #include <linux/kernel.h>
11 #include <linux/ratelimit.h>
14 #include "memory-alloc.h"
15 #include "permassert.h"
17 #include "constants.h"
18 #include "io-submitter.h"
21 /* A vio_pool is a collection of preallocated vios. */
23 /* The number of objects managed by the pool */
25 /* The list of objects which are available */
26 struct list_head available
;
27 /* The queue of requestors waiting for objects from the pool */
28 struct vdo_wait_queue waiting
;
29 /* The number of objects currently in use */
31 /* The list of objects which are in use */
32 struct list_head busy
;
33 /* The ID of the thread on which this pool may be used */
34 thread_id_t thread_id
;
35 /* The buffer backing the pool's vios */
37 /* The pool entries */
38 struct pooled_vio vios
[];
41 physical_block_number_t
pbn_from_vio_bio(struct bio
*bio
)
43 struct vio
*vio
= bio
->bi_private
;
44 struct vdo
*vdo
= vio
->completion
.vdo
;
45 physical_block_number_t pbn
= bio
->bi_iter
.bi_sector
/ VDO_SECTORS_PER_BLOCK
;
47 return ((pbn
== VDO_GEOMETRY_BLOCK_LOCATION
) ? pbn
: pbn
+ vdo
->geometry
.bio_offset
);
50 static int create_multi_block_bio(block_count_t size
, struct bio
**bio_ptr
)
52 struct bio
*bio
= NULL
;
55 result
= vdo_allocate_extended(struct bio
, size
+ 1, struct bio_vec
,
57 if (result
!= VDO_SUCCESS
)
64 int vdo_create_bio(struct bio
**bio_ptr
)
66 return create_multi_block_bio(1, bio_ptr
);
69 void vdo_free_bio(struct bio
*bio
)
75 vdo_free(vdo_forget(bio
));
78 int allocate_vio_components(struct vdo
*vdo
, enum vio_type vio_type
,
79 enum vio_priority priority
, void *parent
,
80 unsigned int block_count
, char *data
, struct vio
*vio
)
85 result
= VDO_ASSERT(block_count
<= MAX_BLOCKS_PER_VIO
,
86 "block count %u does not exceed maximum %u", block_count
,
88 if (result
!= VDO_SUCCESS
)
91 result
= VDO_ASSERT(((vio_type
!= VIO_TYPE_UNINITIALIZED
) && (vio_type
!= VIO_TYPE_DATA
)),
92 "%d is a metadata type", vio_type
);
93 if (result
!= VDO_SUCCESS
)
96 result
= create_multi_block_bio(block_count
, &bio
);
97 if (result
!= VDO_SUCCESS
)
100 initialize_vio(vio
, bio
, block_count
, vio_type
, priority
, vdo
);
101 vio
->completion
.parent
= parent
;
107 * create_multi_block_metadata_vio() - Create a vio.
108 * @vdo: The vdo on which the vio will operate.
109 * @vio_type: The type of vio to create.
110 * @priority: The relative priority to assign to the vio.
111 * @parent: The parent of the vio.
112 * @block_count: The size of the vio in blocks.
114 * @vio_ptr: A pointer to hold the new vio.
116 * Return: VDO_SUCCESS or an error.
118 int create_multi_block_metadata_vio(struct vdo
*vdo
, enum vio_type vio_type
,
119 enum vio_priority priority
, void *parent
,
120 unsigned int block_count
, char *data
,
121 struct vio
**vio_ptr
)
126 BUILD_BUG_ON(sizeof(struct vio
) > 256);
129 * Metadata vios should use direct allocation and not use the buffer pool, which is
130 * reserved for submissions from the linux block layer.
132 result
= vdo_allocate(1, struct vio
, __func__
, &vio
);
133 if (result
!= VDO_SUCCESS
) {
134 vdo_log_error("metadata vio allocation failure %d", result
);
138 result
= allocate_vio_components(vdo
, vio_type
, priority
, parent
, block_count
,
140 if (result
!= VDO_SUCCESS
) {
150 * free_vio_components() - Free the components of a vio embedded in a larger structure.
151 * @vio: The vio to destroy
153 void free_vio_components(struct vio
*vio
)
158 BUG_ON(is_data_vio(vio
));
159 vdo_free_bio(vdo_forget(vio
->bio
));
163 * free_vio() - Destroy a vio.
164 * @vio: The vio to destroy.
166 void free_vio(struct vio
*vio
)
168 free_vio_components(vio
);
172 /* Set bio properties for a VDO read or write. */
173 void vdo_set_bio_properties(struct bio
*bio
, struct vio
*vio
, bio_end_io_t callback
,
174 blk_opf_t bi_opf
, physical_block_number_t pbn
)
176 struct vdo
*vdo
= vio
->completion
.vdo
;
177 struct device_config
*config
= vdo
->device_config
;
179 pbn
-= vdo
->geometry
.bio_offset
;
180 vio
->bio_zone
= ((pbn
/ config
->thread_counts
.bio_rotation_interval
) %
181 config
->thread_counts
.bio_threads
);
183 bio
->bi_private
= vio
;
184 bio
->bi_end_io
= callback
;
185 bio
->bi_opf
= bi_opf
;
186 bio
->bi_iter
.bi_sector
= pbn
* VDO_SECTORS_PER_BLOCK
;
190 * Prepares the bio to perform IO with the specified buffer. May only be used on a VDO-allocated
191 * bio, as it assumes the bio wraps a 4k buffer that is 4k aligned, but there does not have to be a
192 * vio associated with the bio.
194 int vio_reset_bio(struct vio
*vio
, char *data
, bio_end_io_t callback
,
195 blk_opf_t bi_opf
, physical_block_number_t pbn
)
197 int bvec_count
, offset
, len
, i
;
198 struct bio
*bio
= vio
->bio
;
200 bio_reset(bio
, bio
->bi_bdev
, bi_opf
);
201 vdo_set_bio_properties(bio
, vio
, callback
, bi_opf
, pbn
);
206 bio
->bi_io_vec
= bio
->bi_inline_vecs
;
207 bio
->bi_max_vecs
= vio
->block_count
+ 1;
208 len
= VDO_BLOCK_SIZE
* vio
->block_count
;
209 offset
= offset_in_page(data
);
210 bvec_count
= DIV_ROUND_UP(offset
+ len
, PAGE_SIZE
);
213 * If we knew that data was always on one page, or contiguous pages, we wouldn't need the
214 * loop. But if we're using vmalloc, it's not impossible that the data is in different
215 * pages that can't be merged in bio_add_page...
217 for (i
= 0; (i
< bvec_count
) && (len
> 0); i
++) {
220 int bytes
= PAGE_SIZE
- offset
;
225 page
= is_vmalloc_addr(data
) ? vmalloc_to_page(data
) : virt_to_page(data
);
226 bytes_added
= bio_add_page(bio
, page
, bytes
, offset
);
228 if (bytes_added
!= bytes
) {
229 return vdo_log_error_strerror(VDO_BIO_CREATION_FAILED
,
230 "Could only add %i bytes to bio",
243 * update_vio_error_stats() - Update per-vio error stats and log the error.
244 * @vio: The vio which got an error.
245 * @format: The format of the message to log (a printf style format).
247 void update_vio_error_stats(struct vio
*vio
, const char *format
, ...)
249 static DEFINE_RATELIMIT_STATE(error_limiter
, DEFAULT_RATELIMIT_INTERVAL
,
250 DEFAULT_RATELIMIT_BURST
);
253 struct vdo
*vdo
= vio
->completion
.vdo
;
255 switch (vio
->completion
.result
) {
257 atomic64_inc(&vdo
->stats
.read_only_error_count
);
261 atomic64_inc(&vdo
->stats
.no_space_error_count
);
262 priority
= VDO_LOG_DEBUG
;
266 priority
= VDO_LOG_ERR
;
269 if (!__ratelimit(&error_limiter
))
272 va_start(args
, format
);
273 vdo_vlog_strerror(priority
, vio
->completion
.result
, VDO_LOGGING_MODULE_NAME
,
278 void vio_record_metadata_io_error(struct vio
*vio
)
280 const char *description
;
281 physical_block_number_t pbn
= pbn_from_vio_bio(vio
->bio
);
283 if (bio_op(vio
->bio
) == REQ_OP_READ
) {
284 description
= "read";
285 } else if ((vio
->bio
->bi_opf
& REQ_PREFLUSH
) == REQ_PREFLUSH
) {
286 description
= (((vio
->bio
->bi_opf
& REQ_FUA
) == REQ_FUA
) ?
287 "write+preflush+fua" :
289 } else if ((vio
->bio
->bi_opf
& REQ_FUA
) == REQ_FUA
) {
290 description
= "write+fua";
292 description
= "write";
295 update_vio_error_stats(vio
,
296 "Completing %s vio of type %u for physical block %llu with error",
297 description
, vio
->type
, (unsigned long long) pbn
);
301 * make_vio_pool() - Create a new vio pool.
303 * @pool_size: The number of vios in the pool.
304 * @thread_id: The ID of the thread using this pool.
305 * @vio_type: The type of vios in the pool.
306 * @priority: The priority with which vios from the pool should be enqueued.
307 * @context: The context that each entry will have.
308 * @pool_ptr: The resulting pool.
310 * Return: A success or error code.
312 int make_vio_pool(struct vdo
*vdo
, size_t pool_size
, thread_id_t thread_id
,
313 enum vio_type vio_type
, enum vio_priority priority
, void *context
,
314 struct vio_pool
**pool_ptr
)
316 struct vio_pool
*pool
;
320 result
= vdo_allocate_extended(struct vio_pool
, pool_size
, struct pooled_vio
,
322 if (result
!= VDO_SUCCESS
)
325 pool
->thread_id
= thread_id
;
326 INIT_LIST_HEAD(&pool
->available
);
327 INIT_LIST_HEAD(&pool
->busy
);
329 result
= vdo_allocate(pool_size
* VDO_BLOCK_SIZE
, char,
330 "VIO pool buffer", &pool
->buffer
);
331 if (result
!= VDO_SUCCESS
) {
337 for (pool
->size
= 0; pool
->size
< pool_size
; pool
->size
++, ptr
+= VDO_BLOCK_SIZE
) {
338 struct pooled_vio
*pooled
= &pool
->vios
[pool
->size
];
340 result
= allocate_vio_components(vdo
, vio_type
, priority
, NULL
, 1, ptr
,
342 if (result
!= VDO_SUCCESS
) {
347 pooled
->context
= context
;
348 list_add_tail(&pooled
->pool_entry
, &pool
->available
);
356 * free_vio_pool() - Destroy a vio pool.
357 * @pool: The pool to free.
359 void free_vio_pool(struct vio_pool
*pool
)
361 struct pooled_vio
*pooled
, *tmp
;
366 /* Remove all available vios from the object pool. */
367 VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&pool
->waiting
),
368 "VIO pool must not have any waiters when being freed");
369 VDO_ASSERT_LOG_ONLY((pool
->busy_count
== 0),
370 "VIO pool must not have %zu busy entries when being freed",
372 VDO_ASSERT_LOG_ONLY(list_empty(&pool
->busy
),
373 "VIO pool must not have busy entries when being freed");
375 list_for_each_entry_safe(pooled
, tmp
, &pool
->available
, pool_entry
) {
376 list_del(&pooled
->pool_entry
);
377 free_vio_components(&pooled
->vio
);
381 VDO_ASSERT_LOG_ONLY(pool
->size
== 0,
382 "VIO pool must not have missing entries when being freed");
384 vdo_free(vdo_forget(pool
->buffer
));
389 * is_vio_pool_busy() - Check whether an vio pool has outstanding entries.
391 * Return: true if the pool is busy.
393 bool is_vio_pool_busy(struct vio_pool
*pool
)
395 return (pool
->busy_count
!= 0);
399 * acquire_vio_from_pool() - Acquire a vio and buffer from the pool (asynchronous).
400 * @pool: The vio pool.
401 * @waiter: Object that is requesting a vio.
403 void acquire_vio_from_pool(struct vio_pool
*pool
, struct vdo_waiter
*waiter
)
405 struct pooled_vio
*pooled
;
407 VDO_ASSERT_LOG_ONLY((pool
->thread_id
== vdo_get_callback_thread_id()),
408 "acquire from active vio_pool called from correct thread");
410 if (list_empty(&pool
->available
)) {
411 vdo_waitq_enqueue_waiter(&pool
->waiting
, waiter
);
415 pooled
= list_first_entry(&pool
->available
, struct pooled_vio
, pool_entry
);
417 list_move_tail(&pooled
->pool_entry
, &pool
->busy
);
418 (*waiter
->callback
)(waiter
, pooled
);
422 * return_vio_to_pool() - Return a vio to the pool
423 * @pool: The vio pool.
424 * @vio: The pooled vio to return.
426 void return_vio_to_pool(struct vio_pool
*pool
, struct pooled_vio
*vio
)
428 VDO_ASSERT_LOG_ONLY((pool
->thread_id
== vdo_get_callback_thread_id()),
429 "vio pool entry returned on same thread as it was acquired");
431 vio
->vio
.completion
.error_handler
= NULL
;
432 vio
->vio
.completion
.parent
= NULL
;
433 if (vdo_waitq_has_waiters(&pool
->waiting
)) {
434 vdo_waitq_notify_next_waiter(&pool
->waiting
, NULL
, vio
);
438 list_move_tail(&vio
->pool_entry
, &pool
->available
);
443 * Various counting functions for statistics.
444 * These are used for bios coming into VDO, as well as bios generated by VDO.
446 void vdo_count_bios(struct atomic_bio_stats
*bio_stats
, struct bio
*bio
)
448 if (((bio
->bi_opf
& REQ_PREFLUSH
) != 0) && (bio
->bi_iter
.bi_size
== 0)) {
449 atomic64_inc(&bio_stats
->empty_flush
);
450 atomic64_inc(&bio_stats
->flush
);
454 switch (bio_op(bio
)) {
456 atomic64_inc(&bio_stats
->write
);
459 atomic64_inc(&bio_stats
->read
);
462 atomic64_inc(&bio_stats
->discard
);
465 * All other operations are filtered out in dmvdo.c, or not created by VDO, so
469 VDO_ASSERT_LOG_ONLY(0, "Bio operation %d not a write, read, discard, or empty flush",
473 if ((bio
->bi_opf
& REQ_PREFLUSH
) != 0)
474 atomic64_inc(&bio_stats
->flush
);
475 if (bio
->bi_opf
& REQ_FUA
)
476 atomic64_inc(&bio_stats
->fua
);
479 static void count_all_bios_completed(struct vio
*vio
, struct bio
*bio
)
481 struct atomic_statistics
*stats
= &vio
->completion
.vdo
->stats
;
483 if (is_data_vio(vio
)) {
484 vdo_count_bios(&stats
->bios_out_completed
, bio
);
488 vdo_count_bios(&stats
->bios_meta_completed
, bio
);
489 if (vio
->type
== VIO_TYPE_RECOVERY_JOURNAL
)
490 vdo_count_bios(&stats
->bios_journal_completed
, bio
);
491 else if (vio
->type
== VIO_TYPE_BLOCK_MAP
)
492 vdo_count_bios(&stats
->bios_page_cache_completed
, bio
);
495 void vdo_count_completed_bios(struct bio
*bio
)
497 struct vio
*vio
= (struct vio
*) bio
->bi_private
;
499 atomic64_inc(&vio
->completion
.vdo
->stats
.bios_completed
);
500 count_all_bios_completed(vio
, bio
);