1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
8 #include <linux/atomic.h>
10 #include <linux/blkdev.h>
11 #include <linux/delay.h>
12 #include <linux/device-mapper.h>
13 #include <linux/jiffies.h>
14 #include <linux/kernel.h>
15 #include <linux/list.h>
16 #include <linux/lz4.h>
17 #include <linux/minmax.h>
18 #include <linux/sched.h>
19 #include <linux/spinlock.h>
20 #include <linux/wait.h>
23 #include "memory-alloc.h"
24 #include "murmurhash3.h"
25 #include "permassert.h"
27 #include "block-map.h"
29 #include "encodings.h"
31 #include "io-submitter.h"
32 #include "logical-zone.h"
34 #include "recovery-journal.h"
35 #include "slab-depot.h"
36 #include "status-codes.h"
40 #include "wait-queue.h"
45 * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those
46 * flags on our own bio(s) for that request may help underlying layers better fulfill the user
47 * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other
48 * flags, as they convey incorrect information.
50 * These flags are always irrelevant if we have already finished the user bio as they are only
51 * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how
52 * important finishing the finished bio was.
54 * Note that bio.c contains the complete list of flags we believe may be set; the following list
55 * explains the action taken with each of those flags VDO could receive:
57 * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio
58 * completion is required for further work to be done by the issuer.
59 * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer
60 * treats it as more urgent, similar to REQ_SYNC.
61 * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is
63 * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO.
64 * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't
65 * match incoming IO, so this flag is incorrect for it.
66 * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise.
67 * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance.
68 * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled
69 * ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load
72 static blk_opf_t PASSTHROUGH_FLAGS
= (REQ_PRIO
| REQ_META
| REQ_SYNC
| REQ_RAHEAD
);
77 * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For
78 * correctness, and in order to avoid potentially expensive or blocking memory allocations during
79 * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order
80 * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for
81 * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios
82 * for which a data_vio or discard permit are not available will block until the necessary
83 * resources are available. The pool is also responsible for distributing resources to blocked
84 * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by
85 * performing the work of actually assigning resources to blocked threads or placing data_vios back
86 * into the pool on a single cpu at a time.
88 * The pool contains two "limiters", one for tracking data_vios and one for tracking discard
89 * permits. The limiters also provide safe cross-thread access to pool statistics without the need
90 * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to
91 * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources
92 * are available, the incoming bio will be assigned to the acquired data_vio, and it will be
93 * launched. However, if either of these are unavailable, the arrival time of the bio is recorded
94 * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate
95 * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will
96 * break if jiffies are only 32 bits.)
98 * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio()
99 * will be called on it. This function will add the data_vio to a funnel queue, and then check the
100 * state of the pool. If the pool is not currently processing released data_vios, the pool's
101 * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to
102 * hold the pool's lock, and also batches release work while avoiding starvation of the cpu
105 * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which
106 * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For
107 * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there
108 * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the
109 * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting
110 * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool.
111 * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or
112 * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the
113 * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit
117 #define DATA_VIO_RELEASE_BATCH_SIZE 128
119 static const unsigned int VDO_SECTORS_PER_BLOCK_MASK
= VDO_SECTORS_PER_BLOCK
- 1;
120 static const u32 COMPRESSION_STATUS_MASK
= 0xff;
121 static const u32 MAY_NOT_COMPRESS_MASK
= 0x80000000;
124 typedef void (*assigner_fn
)(struct limiter
*limiter
);
126 /* Bookkeeping structure for a single type of resource. */
128 /* The data_vio_pool to which this limiter belongs */
129 struct data_vio_pool
*pool
;
130 /* The maximum number of data_vios available */
131 data_vio_count_t limit
;
132 /* The number of resources in use */
133 data_vio_count_t busy
;
134 /* The maximum number of resources ever simultaneously in use */
135 data_vio_count_t max_busy
;
136 /* The number of resources to release */
137 data_vio_count_t release_count
;
138 /* The number of waiters to wake */
139 data_vio_count_t wake_count
;
140 /* The list of waiting bios which are known to process_release_callback() */
141 struct bio_list waiters
;
142 /* The list of waiting bios which are not yet known to process_release_callback() */
143 struct bio_list new_waiters
;
144 /* The list of waiters which have their permits */
145 struct bio_list
*permitted_waiters
;
146 /* The function for assigning a resource to a waiter */
147 assigner_fn assigner
;
148 /* The queue of blocked threads */
149 wait_queue_head_t blocked_threads
;
150 /* The arrival time of the eldest waiter */
155 * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread,
156 * and are released in batches.
158 struct data_vio_pool
{
159 /* Completion for scheduling releases */
160 struct vdo_completion completion
;
161 /* The administrative state of the pool */
162 struct admin_state state
;
163 /* Lock protecting the pool */
165 /* The main limiter controlling the total data_vios in the pool. */
166 struct limiter limiter
;
167 /* The limiter controlling data_vios for discard */
168 struct limiter discard_limiter
;
169 /* The list of bios which have discard permits but still need a data_vio */
170 struct bio_list permitted_discards
;
171 /* The list of available data_vios */
172 struct list_head available
;
173 /* The queue of data_vios waiting to be returned to the pool */
174 struct funnel_queue
*queue
;
175 /* Whether the pool is processing, or scheduled to process releases */
177 /* The data vios in the pool */
178 struct data_vio data_vios
[];
181 static const char * const ASYNC_OPERATION_NAMES
[] = {
185 "attempt_logical_block_lock",
186 "lock_duplicate_pbn",
187 "check_for_duplication",
190 "find_block_map_slot",
191 "get_mapped_block_for_read",
192 "get_mapped_block_for_write",
195 "vdo_attempt_packing",
198 "update_dedupe_index",
199 "update_reference_counts",
200 "verify_duplication",
204 /* The steps taken cleaning up a VIO, in the order they are performed. */
205 enum data_vio_cleanup_stage
{
207 VIO_RELEASE_HASH_LOCK
= VIO_CLEANUP_START
,
208 VIO_RELEASE_ALLOCATED
,
209 VIO_RELEASE_RECOVERY_LOCKS
,
214 static inline struct data_vio_pool
* __must_check
215 as_data_vio_pool(struct vdo_completion
*completion
)
217 vdo_assert_completion_type(completion
, VDO_DATA_VIO_POOL_COMPLETION
);
218 return container_of(completion
, struct data_vio_pool
, completion
);
221 static inline u64
get_arrival_time(struct bio
*bio
)
223 return (u64
) bio
->bi_private
;
227 * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios
228 * or waiters while holding the pool's lock.
230 static bool check_for_drain_complete_locked(struct data_vio_pool
*pool
)
232 if (pool
->limiter
.busy
> 0)
235 VDO_ASSERT_LOG_ONLY((pool
->discard_limiter
.busy
== 0),
236 "no outstanding discard permits");
238 return (bio_list_empty(&pool
->limiter
.new_waiters
) &&
239 bio_list_empty(&pool
->discard_limiter
.new_waiters
));
242 static void initialize_lbn_lock(struct data_vio
*data_vio
, logical_block_number_t lbn
)
244 struct vdo
*vdo
= vdo_from_data_vio(data_vio
);
245 zone_count_t zone_number
;
246 struct lbn_lock
*lock
= &data_vio
->logical
;
249 lock
->locked
= false;
250 vdo_waitq_init(&lock
->waiters
);
251 zone_number
= vdo_compute_logical_zone(data_vio
);
252 lock
->zone
= &vdo
->logical_zones
->zones
[zone_number
];
255 static void launch_locked_request(struct data_vio
*data_vio
)
257 data_vio
->logical
.locked
= true;
258 if (data_vio
->write
) {
259 struct vdo
*vdo
= vdo_from_data_vio(data_vio
);
261 if (vdo_is_read_only(vdo
)) {
262 continue_data_vio_with_error(data_vio
, VDO_READ_ONLY
);
267 data_vio
->last_async_operation
= VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT
;
268 vdo_find_block_map_slot(data_vio
);
271 static void acknowledge_data_vio(struct data_vio
*data_vio
)
273 struct vdo
*vdo
= vdo_from_data_vio(data_vio
);
274 struct bio
*bio
= data_vio
->user_bio
;
275 int error
= vdo_status_to_errno(data_vio
->vio
.completion
.result
);
280 VDO_ASSERT_LOG_ONLY((data_vio
->remaining_discard
<=
281 (u32
) (VDO_BLOCK_SIZE
- data_vio
->offset
)),
282 "data_vio to acknowledge is not an incomplete discard");
284 data_vio
->user_bio
= NULL
;
285 vdo_count_bios(&vdo
->stats
.bios_acknowledged
, bio
);
286 if (data_vio
->is_partial
)
287 vdo_count_bios(&vdo
->stats
.bios_acknowledged_partial
, bio
);
289 bio
->bi_status
= errno_to_blk_status(error
);
293 static void copy_to_bio(struct bio
*bio
, char *data_ptr
)
295 struct bio_vec biovec
;
296 struct bvec_iter iter
;
298 bio_for_each_segment(biovec
, bio
, iter
) {
299 memcpy_to_bvec(&biovec
, data_ptr
);
300 data_ptr
+= biovec
.bv_len
;
304 struct data_vio_compression_status
get_data_vio_compression_status(struct data_vio
*data_vio
)
306 u32 packed
= atomic_read(&data_vio
->compression
.status
);
308 /* pairs with cmpxchg in set_data_vio_compression_status */
310 return (struct data_vio_compression_status
) {
311 .stage
= packed
& COMPRESSION_STATUS_MASK
,
312 .may_not_compress
= ((packed
& MAY_NOT_COMPRESS_MASK
) != 0),
317 * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored
319 * @status: The state to convert.
321 * Return: The compression state packed into a u32.
323 static u32 __must_check
pack_status(struct data_vio_compression_status status
)
325 return status
.stage
| (status
.may_not_compress
? MAY_NOT_COMPRESS_MASK
: 0);
329 * set_data_vio_compression_status() - Set the compression status of a data_vio.
330 * @data_vio: The data_vio to change.
331 * @status: The expected current status of the data_vio.
332 * @new_status: The status to set.
334 * Return: true if the new status was set, false if the data_vio's compression status did not
335 * match the expected state, and so was left unchanged.
337 static bool __must_check
338 set_data_vio_compression_status(struct data_vio
*data_vio
,
339 struct data_vio_compression_status status
,
340 struct data_vio_compression_status new_status
)
343 u32 expected
= pack_status(status
);
344 u32 replacement
= pack_status(new_status
);
347 * Extra barriers because this was original developed using a CAS operation that implicitly
350 smp_mb__before_atomic();
351 actual
= atomic_cmpxchg(&data_vio
->compression
.status
, expected
, replacement
);
352 /* same as before_atomic */
353 smp_mb__after_atomic();
354 return (expected
== actual
);
357 struct data_vio_compression_status
advance_data_vio_compression_stage(struct data_vio
*data_vio
)
360 struct data_vio_compression_status status
=
361 get_data_vio_compression_status(data_vio
);
362 struct data_vio_compression_status new_status
= status
;
364 if (status
.stage
== DATA_VIO_POST_PACKER
) {
365 /* We're already in the last stage. */
369 if (status
.may_not_compress
) {
371 * Compression has been dis-allowed for this VIO, so skip the rest of the
372 * path and go to the end.
374 new_status
.stage
= DATA_VIO_POST_PACKER
;
376 /* Go to the next state. */
380 if (set_data_vio_compression_status(data_vio
, status
, new_status
))
383 /* Another thread changed the status out from under us so try again. */
388 * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed.
390 * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it.
392 bool cancel_data_vio_compression(struct data_vio
*data_vio
)
394 struct data_vio_compression_status status
, new_status
;
397 status
= get_data_vio_compression_status(data_vio
);
398 if (status
.may_not_compress
|| (status
.stage
== DATA_VIO_POST_PACKER
)) {
399 /* This data_vio is already set up to not block in the packer. */
403 new_status
.stage
= status
.stage
;
404 new_status
.may_not_compress
= true;
406 if (set_data_vio_compression_status(data_vio
, status
, new_status
))
410 return ((status
.stage
== DATA_VIO_PACKING
) && !status
.may_not_compress
);
414 * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block.
415 * @completion: The data_vio for an external data request as a completion.
417 * This is the start of the path for all external requests. It is registered in launch_data_vio().
419 static void attempt_logical_block_lock(struct vdo_completion
*completion
)
421 struct data_vio
*data_vio
= as_data_vio(completion
);
422 struct lbn_lock
*lock
= &data_vio
->logical
;
423 struct vdo
*vdo
= vdo_from_data_vio(data_vio
);
424 struct data_vio
*lock_holder
;
427 assert_data_vio_in_logical_zone(data_vio
);
429 if (data_vio
->logical
.lbn
>= vdo
->states
.vdo
.config
.logical_blocks
) {
430 continue_data_vio_with_error(data_vio
, VDO_OUT_OF_RANGE
);
434 result
= vdo_int_map_put(lock
->zone
->lbn_operations
, lock
->lbn
,
435 data_vio
, false, (void **) &lock_holder
);
436 if (result
!= VDO_SUCCESS
) {
437 continue_data_vio_with_error(data_vio
, result
);
441 if (lock_holder
== NULL
) {
442 /* We got the lock */
443 launch_locked_request(data_vio
);
447 result
= VDO_ASSERT(lock_holder
->logical
.locked
, "logical block lock held");
448 if (result
!= VDO_SUCCESS
) {
449 continue_data_vio_with_error(data_vio
, result
);
454 * If the new request is a pure read request (not read-modify-write) and the lock_holder is
455 * writing and has received an allocation, service the read request immediately by copying
456 * data from the lock_holder to avoid having to flush the write out of the packer just to
457 * prevent the read from waiting indefinitely. If the lock_holder does not yet have an
458 * allocation, prevent it from blocking in the packer and wait on it. This is necessary in
459 * order to prevent returning data that may not have actually been written.
461 if (!data_vio
->write
&& READ_ONCE(lock_holder
->allocation_succeeded
)) {
462 copy_to_bio(data_vio
->user_bio
, lock_holder
->vio
.data
+ data_vio
->offset
);
463 acknowledge_data_vio(data_vio
);
464 complete_data_vio(completion
);
468 data_vio
->last_async_operation
= VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK
;
469 vdo_waitq_enqueue_waiter(&lock_holder
->logical
.waiters
, &data_vio
->waiter
);
472 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the
475 if (lock_holder
->write
&& cancel_data_vio_compression(lock_holder
)) {
476 data_vio
->compression
.lock_holder
= lock_holder
;
477 launch_data_vio_packer_callback(data_vio
,
478 vdo_remove_lock_holder_from_packer
);
483 * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the
484 * same parent and other state and send it on its way.
486 static void launch_data_vio(struct data_vio
*data_vio
, logical_block_number_t lbn
)
488 struct vdo_completion
*completion
= &data_vio
->vio
.completion
;
491 * Clearing the tree lock must happen before initializing the LBN lock, which also adds
492 * information to the tree lock.
494 memset(&data_vio
->tree_lock
, 0, sizeof(data_vio
->tree_lock
));
495 initialize_lbn_lock(data_vio
, lbn
);
496 INIT_LIST_HEAD(&data_vio
->hash_lock_entry
);
497 INIT_LIST_HEAD(&data_vio
->write_entry
);
499 memset(&data_vio
->allocation
, 0, sizeof(data_vio
->allocation
));
501 data_vio
->is_duplicate
= false;
503 memset(&data_vio
->record_name
, 0, sizeof(data_vio
->record_name
));
504 memset(&data_vio
->duplicate
, 0, sizeof(data_vio
->duplicate
));
505 vdo_reset_completion(&data_vio
->decrement_completion
);
506 vdo_reset_completion(completion
);
507 completion
->error_handler
= handle_data_vio_error
;
508 set_data_vio_logical_callback(data_vio
, attempt_logical_block_lock
);
509 vdo_enqueue_completion(completion
, VDO_DEFAULT_Q_MAP_BIO_PRIORITY
);
512 static bool is_zero_block(char *block
)
516 for (i
= 0; i
< VDO_BLOCK_SIZE
; i
+= sizeof(u64
)) {
517 if (*((u64
*) &block
[i
]))
524 static void copy_from_bio(struct bio
*bio
, char *data_ptr
)
526 struct bio_vec biovec
;
527 struct bvec_iter iter
;
529 bio_for_each_segment(biovec
, bio
, iter
) {
530 memcpy_from_bvec(data_ptr
, &biovec
);
531 data_ptr
+= biovec
.bv_len
;
535 static void launch_bio(struct vdo
*vdo
, struct data_vio
*data_vio
, struct bio
*bio
)
537 logical_block_number_t lbn
;
539 * Zero out the fields which don't need to be preserved (i.e. which are not pointers to
540 * separately allocated objects).
542 memset(data_vio
, 0, offsetof(struct data_vio
, vio
));
543 memset(&data_vio
->compression
, 0, offsetof(struct compression_state
, block
));
545 data_vio
->user_bio
= bio
;
546 data_vio
->offset
= to_bytes(bio
->bi_iter
.bi_sector
& VDO_SECTORS_PER_BLOCK_MASK
);
547 data_vio
->is_partial
= (bio
->bi_iter
.bi_size
< VDO_BLOCK_SIZE
) || (data_vio
->offset
!= 0);
550 * Discards behave very differently than other requests when coming in from device-mapper.
551 * We have to be able to handle any size discards and various sector offsets within a
554 if (bio_op(bio
) == REQ_OP_DISCARD
) {
555 data_vio
->remaining_discard
= bio
->bi_iter
.bi_size
;
556 data_vio
->write
= true;
557 data_vio
->is_discard
= true;
558 if (data_vio
->is_partial
) {
559 vdo_count_bios(&vdo
->stats
.bios_in_partial
, bio
);
560 data_vio
->read
= true;
562 } else if (data_vio
->is_partial
) {
563 vdo_count_bios(&vdo
->stats
.bios_in_partial
, bio
);
564 data_vio
->read
= true;
565 if (bio_data_dir(bio
) == WRITE
)
566 data_vio
->write
= true;
567 } else if (bio_data_dir(bio
) == READ
) {
568 data_vio
->read
= true;
571 * Copy the bio data to a char array so that we can continue to use the data after
572 * we acknowledge the bio.
574 copy_from_bio(bio
, data_vio
->vio
.data
);
575 data_vio
->is_zero
= is_zero_block(data_vio
->vio
.data
);
576 data_vio
->write
= true;
579 if (data_vio
->user_bio
->bi_opf
& REQ_FUA
)
580 data_vio
->fua
= true;
582 lbn
= (bio
->bi_iter
.bi_sector
- vdo
->starting_sector_offset
) / VDO_SECTORS_PER_BLOCK
;
583 launch_data_vio(data_vio
, lbn
);
586 static void assign_data_vio(struct limiter
*limiter
, struct data_vio
*data_vio
)
588 struct bio
*bio
= bio_list_pop(limiter
->permitted_waiters
);
590 launch_bio(limiter
->pool
->completion
.vdo
, data_vio
, bio
);
591 limiter
->wake_count
++;
593 bio
= bio_list_peek(limiter
->permitted_waiters
);
594 limiter
->arrival
= ((bio
== NULL
) ? U64_MAX
: get_arrival_time(bio
));
597 static void assign_discard_permit(struct limiter
*limiter
)
599 struct bio
*bio
= bio_list_pop(&limiter
->waiters
);
601 if (limiter
->arrival
== U64_MAX
)
602 limiter
->arrival
= get_arrival_time(bio
);
604 bio_list_add(limiter
->permitted_waiters
, bio
);
607 static void get_waiters(struct limiter
*limiter
)
609 bio_list_merge_init(&limiter
->waiters
, &limiter
->new_waiters
);
612 static inline struct data_vio
*get_available_data_vio(struct data_vio_pool
*pool
)
614 struct data_vio
*data_vio
=
615 list_first_entry(&pool
->available
, struct data_vio
, pool_entry
);
617 list_del_init(&data_vio
->pool_entry
);
621 static void assign_data_vio_to_waiter(struct limiter
*limiter
)
623 assign_data_vio(limiter
, get_available_data_vio(limiter
->pool
));
626 static void update_limiter(struct limiter
*limiter
)
628 struct bio_list
*waiters
= &limiter
->waiters
;
629 data_vio_count_t available
= limiter
->limit
- limiter
->busy
;
631 VDO_ASSERT_LOG_ONLY((limiter
->release_count
<= limiter
->busy
),
632 "Release count %u is not more than busy count %u",
633 limiter
->release_count
, limiter
->busy
);
635 get_waiters(limiter
);
636 for (; (limiter
->release_count
> 0) && !bio_list_empty(waiters
); limiter
->release_count
--)
637 limiter
->assigner(limiter
);
639 if (limiter
->release_count
> 0) {
640 WRITE_ONCE(limiter
->busy
, limiter
->busy
- limiter
->release_count
);
641 limiter
->release_count
= 0;
645 for (; (available
> 0) && !bio_list_empty(waiters
); available
--)
646 limiter
->assigner(limiter
);
648 WRITE_ONCE(limiter
->busy
, limiter
->limit
- available
);
649 if (limiter
->max_busy
< limiter
->busy
)
650 WRITE_ONCE(limiter
->max_busy
, limiter
->busy
);
654 * schedule_releases() - Ensure that release processing is scheduled.
656 * If this call switches the state to processing, enqueue. Otherwise, some other thread has already
659 static void schedule_releases(struct data_vio_pool
*pool
)
661 /* Pairs with the barrier in process_release_callback(). */
662 smp_mb__before_atomic();
663 if (atomic_cmpxchg(&pool
->processing
, false, true))
666 pool
->completion
.requeue
= true;
667 vdo_launch_completion_with_priority(&pool
->completion
,
668 CPU_Q_COMPLETE_VIO_PRIORITY
);
671 static void reuse_or_release_resources(struct data_vio_pool
*pool
,
672 struct data_vio
*data_vio
,
673 struct list_head
*returned
)
675 if (data_vio
->remaining_discard
> 0) {
676 if (bio_list_empty(&pool
->discard_limiter
.waiters
)) {
677 /* Return the data_vio's discard permit. */
678 pool
->discard_limiter
.release_count
++;
680 assign_discard_permit(&pool
->discard_limiter
);
684 if (pool
->limiter
.arrival
< pool
->discard_limiter
.arrival
) {
685 assign_data_vio(&pool
->limiter
, data_vio
);
686 } else if (pool
->discard_limiter
.arrival
< U64_MAX
) {
687 assign_data_vio(&pool
->discard_limiter
, data_vio
);
689 list_add(&data_vio
->pool_entry
, returned
);
690 pool
->limiter
.release_count
++;
695 * process_release_callback() - Process a batch of data_vio releases.
696 * @completion: The pool with data_vios to release.
698 static void process_release_callback(struct vdo_completion
*completion
)
700 struct data_vio_pool
*pool
= as_data_vio_pool(completion
);
703 data_vio_count_t processed
;
704 data_vio_count_t to_wake
;
705 data_vio_count_t discards_to_wake
;
708 spin_lock(&pool
->lock
);
709 get_waiters(&pool
->discard_limiter
);
710 get_waiters(&pool
->limiter
);
711 spin_unlock(&pool
->lock
);
713 if (pool
->limiter
.arrival
== U64_MAX
) {
714 struct bio
*bio
= bio_list_peek(&pool
->limiter
.waiters
);
717 pool
->limiter
.arrival
= get_arrival_time(bio
);
720 for (processed
= 0; processed
< DATA_VIO_RELEASE_BATCH_SIZE
; processed
++) {
721 struct data_vio
*data_vio
;
722 struct funnel_queue_entry
*entry
= vdo_funnel_queue_poll(pool
->queue
);
727 data_vio
= as_data_vio(container_of(entry
, struct vdo_completion
,
728 work_queue_entry_link
));
729 acknowledge_data_vio(data_vio
);
730 reuse_or_release_resources(pool
, data_vio
, &returned
);
733 spin_lock(&pool
->lock
);
735 * There is a race where waiters could be added while we are in the unlocked section above.
736 * Those waiters could not see the resources we are now about to release, so we assign
737 * those resources now as we have no guarantee of being rescheduled. This is handled in
740 update_limiter(&pool
->discard_limiter
);
741 list_splice(&returned
, &pool
->available
);
742 update_limiter(&pool
->limiter
);
743 to_wake
= pool
->limiter
.wake_count
;
744 pool
->limiter
.wake_count
= 0;
745 discards_to_wake
= pool
->discard_limiter
.wake_count
;
746 pool
->discard_limiter
.wake_count
= 0;
748 atomic_set(&pool
->processing
, false);
749 /* Pairs with the barrier in schedule_releases(). */
752 reschedule
= !vdo_is_funnel_queue_empty(pool
->queue
);
753 drained
= (!reschedule
&&
754 vdo_is_state_draining(&pool
->state
) &&
755 check_for_drain_complete_locked(pool
));
756 spin_unlock(&pool
->lock
);
759 wake_up_nr(&pool
->limiter
.blocked_threads
, to_wake
);
761 if (discards_to_wake
> 0)
762 wake_up_nr(&pool
->discard_limiter
.blocked_threads
, discards_to_wake
);
765 schedule_releases(pool
);
767 vdo_finish_draining(&pool
->state
);
770 static void initialize_limiter(struct limiter
*limiter
, struct data_vio_pool
*pool
,
771 assigner_fn assigner
, data_vio_count_t limit
)
773 limiter
->pool
= pool
;
774 limiter
->assigner
= assigner
;
775 limiter
->limit
= limit
;
776 limiter
->arrival
= U64_MAX
;
777 init_waitqueue_head(&limiter
->blocked_threads
);
781 * initialize_data_vio() - Allocate the components of a data_vio.
783 * The caller is responsible for cleaning up the data_vio on error.
785 * Return: VDO_SUCCESS or an error.
787 static int initialize_data_vio(struct data_vio
*data_vio
, struct vdo
*vdo
)
792 BUILD_BUG_ON(VDO_BLOCK_SIZE
> PAGE_SIZE
);
793 result
= vdo_allocate_memory(VDO_BLOCK_SIZE
, 0, "data_vio data",
794 &data_vio
->vio
.data
);
795 if (result
!= VDO_SUCCESS
)
796 return vdo_log_error_strerror(result
,
797 "data_vio data allocation failure");
799 result
= vdo_allocate_memory(VDO_BLOCK_SIZE
, 0, "compressed block",
800 &data_vio
->compression
.block
);
801 if (result
!= VDO_SUCCESS
) {
802 return vdo_log_error_strerror(result
,
803 "data_vio compressed block allocation failure");
806 result
= vdo_allocate_memory(VDO_BLOCK_SIZE
, 0, "vio scratch",
807 &data_vio
->scratch_block
);
808 if (result
!= VDO_SUCCESS
)
809 return vdo_log_error_strerror(result
,
810 "data_vio scratch allocation failure");
812 result
= vdo_create_bio(&bio
);
813 if (result
!= VDO_SUCCESS
)
814 return vdo_log_error_strerror(result
,
815 "data_vio data bio allocation failure");
817 vdo_initialize_completion(&data_vio
->decrement_completion
, vdo
,
818 VDO_DECREMENT_COMPLETION
);
819 initialize_vio(&data_vio
->vio
, bio
, 1, VIO_TYPE_DATA
, VIO_PRIORITY_DATA
, vdo
);
824 static void destroy_data_vio(struct data_vio
*data_vio
)
826 if (data_vio
== NULL
)
829 vdo_free_bio(vdo_forget(data_vio
->vio
.bio
));
830 vdo_free(vdo_forget(data_vio
->vio
.data
));
831 vdo_free(vdo_forget(data_vio
->compression
.block
));
832 vdo_free(vdo_forget(data_vio
->scratch_block
));
836 * make_data_vio_pool() - Initialize a data_vio pool.
837 * @vdo: The vdo to which the pool will belong.
838 * @pool_size: The number of data_vios in the pool.
839 * @discard_limit: The maximum number of data_vios which may be used for discards.
840 * @pool_ptr: A pointer to hold the newly allocated pool.
842 int make_data_vio_pool(struct vdo
*vdo
, data_vio_count_t pool_size
,
843 data_vio_count_t discard_limit
, struct data_vio_pool
**pool_ptr
)
846 struct data_vio_pool
*pool
;
849 result
= vdo_allocate_extended(struct data_vio_pool
, pool_size
, struct data_vio
,
851 if (result
!= VDO_SUCCESS
)
854 VDO_ASSERT_LOG_ONLY((discard_limit
<= pool_size
),
855 "discard limit does not exceed pool size");
856 initialize_limiter(&pool
->discard_limiter
, pool
, assign_discard_permit
,
858 pool
->discard_limiter
.permitted_waiters
= &pool
->permitted_discards
;
859 initialize_limiter(&pool
->limiter
, pool
, assign_data_vio_to_waiter
, pool_size
);
860 pool
->limiter
.permitted_waiters
= &pool
->limiter
.waiters
;
861 INIT_LIST_HEAD(&pool
->available
);
862 spin_lock_init(&pool
->lock
);
863 vdo_set_admin_state_code(&pool
->state
, VDO_ADMIN_STATE_NORMAL_OPERATION
);
864 vdo_initialize_completion(&pool
->completion
, vdo
, VDO_DATA_VIO_POOL_COMPLETION
);
865 vdo_prepare_completion(&pool
->completion
, process_release_callback
,
866 process_release_callback
, vdo
->thread_config
.cpu_thread
,
869 result
= vdo_make_funnel_queue(&pool
->queue
);
870 if (result
!= VDO_SUCCESS
) {
871 free_data_vio_pool(vdo_forget(pool
));
875 for (i
= 0; i
< pool_size
; i
++) {
876 struct data_vio
*data_vio
= &pool
->data_vios
[i
];
878 result
= initialize_data_vio(data_vio
, vdo
);
879 if (result
!= VDO_SUCCESS
) {
880 destroy_data_vio(data_vio
);
881 free_data_vio_pool(pool
);
885 list_add(&data_vio
->pool_entry
, &pool
->available
);
893 * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it.
895 * All data_vios must be returned to the pool before calling this function.
897 void free_data_vio_pool(struct data_vio_pool
*pool
)
899 struct data_vio
*data_vio
, *tmp
;
905 * Pairs with the barrier in process_release_callback(). Possibly not needed since it
906 * caters to an enqueue vs. free race.
909 BUG_ON(atomic_read(&pool
->processing
));
911 spin_lock(&pool
->lock
);
912 VDO_ASSERT_LOG_ONLY((pool
->limiter
.busy
== 0),
913 "data_vio pool must not have %u busy entries when being freed",
915 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool
->limiter
.waiters
) &&
916 bio_list_empty(&pool
->limiter
.new_waiters
)),
917 "data_vio pool must not have threads waiting to read or write when being freed");
918 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool
->discard_limiter
.waiters
) &&
919 bio_list_empty(&pool
->discard_limiter
.new_waiters
)),
920 "data_vio pool must not have threads waiting to discard when being freed");
921 spin_unlock(&pool
->lock
);
923 list_for_each_entry_safe(data_vio
, tmp
, &pool
->available
, pool_entry
) {
924 list_del_init(&data_vio
->pool_entry
);
925 destroy_data_vio(data_vio
);
928 vdo_free_funnel_queue(vdo_forget(pool
->queue
));
932 static bool acquire_permit(struct limiter
*limiter
)
934 if (limiter
->busy
>= limiter
->limit
)
937 WRITE_ONCE(limiter
->busy
, limiter
->busy
+ 1);
938 if (limiter
->max_busy
< limiter
->busy
)
939 WRITE_ONCE(limiter
->max_busy
, limiter
->busy
);
943 static void wait_permit(struct limiter
*limiter
, struct bio
*bio
)
944 __releases(&limiter
->pool
->lock
)
948 bio_list_add(&limiter
->new_waiters
, bio
);
949 prepare_to_wait_exclusive(&limiter
->blocked_threads
, &wait
,
950 TASK_UNINTERRUPTIBLE
);
951 spin_unlock(&limiter
->pool
->lock
);
953 finish_wait(&limiter
->blocked_threads
, &wait
);
957 * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it.
959 * This will block if data_vios or discard permits are not available.
961 void vdo_launch_bio(struct data_vio_pool
*pool
, struct bio
*bio
)
963 struct data_vio
*data_vio
;
965 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool
->state
),
966 "data_vio_pool not quiescent on acquire");
968 bio
->bi_private
= (void *) jiffies
;
969 spin_lock(&pool
->lock
);
970 if ((bio_op(bio
) == REQ_OP_DISCARD
) &&
971 !acquire_permit(&pool
->discard_limiter
)) {
972 wait_permit(&pool
->discard_limiter
, bio
);
976 if (!acquire_permit(&pool
->limiter
)) {
977 wait_permit(&pool
->limiter
, bio
);
981 data_vio
= get_available_data_vio(pool
);
982 spin_unlock(&pool
->lock
);
983 launch_bio(pool
->completion
.vdo
, data_vio
, bio
);
986 /* Implements vdo_admin_initiator_fn. */
987 static void initiate_drain(struct admin_state
*state
)
990 struct data_vio_pool
*pool
= container_of(state
, struct data_vio_pool
, state
);
992 spin_lock(&pool
->lock
);
993 drained
= check_for_drain_complete_locked(pool
);
994 spin_unlock(&pool
->lock
);
997 vdo_finish_draining(state
);
1000 static void assert_on_vdo_cpu_thread(const struct vdo
*vdo
, const char *name
)
1002 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo
->thread_config
.cpu_thread
),
1003 "%s called on cpu thread", name
);
1007 * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool.
1008 * @completion: The completion to notify when the pool has drained.
1010 void drain_data_vio_pool(struct data_vio_pool
*pool
, struct vdo_completion
*completion
)
1012 assert_on_vdo_cpu_thread(completion
->vdo
, __func__
);
1013 vdo_start_draining(&pool
->state
, VDO_ADMIN_STATE_SUSPENDING
, completion
,
1018 * resume_data_vio_pool() - Resume a data_vio pool.
1019 * @completion: The completion to notify when the pool has resumed.
1021 void resume_data_vio_pool(struct data_vio_pool
*pool
, struct vdo_completion
*completion
)
1023 assert_on_vdo_cpu_thread(completion
->vdo
, __func__
);
1024 vdo_continue_completion(completion
, vdo_resume_if_quiescent(&pool
->state
));
1027 static void dump_limiter(const char *name
, struct limiter
*limiter
)
1029 vdo_log_info("%s: %u of %u busy (max %u), %s", name
, limiter
->busy
,
1030 limiter
->limit
, limiter
->max_busy
,
1031 ((bio_list_empty(&limiter
->waiters
) &&
1032 bio_list_empty(&limiter
->new_waiters
)) ?
1033 "no waiters" : "has waiters"));
1037 * dump_data_vio_pool() - Dump a data_vio pool to the log.
1038 * @dump_vios: Whether to dump the details of each busy data_vio as well.
1040 void dump_data_vio_pool(struct data_vio_pool
*pool
, bool dump_vios
)
1043 * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the
1044 * second clock tick). These numbers were picked based on experiments with lab machines.
1046 static const int ELEMENTS_PER_BATCH
= 35;
1047 static const int SLEEP_FOR_SYSLOG
= 4000;
1052 spin_lock(&pool
->lock
);
1053 dump_limiter("data_vios", &pool
->limiter
);
1054 dump_limiter("discard permits", &pool
->discard_limiter
);
1059 for (i
= 0; i
< pool
->limiter
.limit
; i
++) {
1060 struct data_vio
*data_vio
= &pool
->data_vios
[i
];
1062 if (!list_empty(&data_vio
->pool_entry
))
1065 dump_data_vio(data_vio
);
1066 if (++dumped
>= ELEMENTS_PER_BATCH
) {
1067 spin_unlock(&pool
->lock
);
1069 fsleep(SLEEP_FOR_SYSLOG
);
1070 spin_lock(&pool
->lock
);
1075 spin_unlock(&pool
->lock
);
1078 data_vio_count_t
get_data_vio_pool_active_requests(struct data_vio_pool
*pool
)
1080 return READ_ONCE(pool
->limiter
.busy
);
1083 data_vio_count_t
get_data_vio_pool_request_limit(struct data_vio_pool
*pool
)
1085 return READ_ONCE(pool
->limiter
.limit
);
1088 data_vio_count_t
get_data_vio_pool_maximum_requests(struct data_vio_pool
*pool
)
1090 return READ_ONCE(pool
->limiter
.max_busy
);
1093 static void update_data_vio_error_stats(struct data_vio
*data_vio
)
1096 static const char * const operations
[] = {
1100 [3] = "read-modify-write",
1103 [7] = "read-modify-write+fua",
1109 if (data_vio
->write
)
1115 update_vio_error_stats(&data_vio
->vio
,
1116 "Completing %s vio for LBN %llu with error after %s",
1118 (unsigned long long) data_vio
->logical
.lbn
,
1119 get_data_vio_operation_name(data_vio
));
1122 static void perform_cleanup_stage(struct data_vio
*data_vio
,
1123 enum data_vio_cleanup_stage stage
);
1126 * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at
1127 * the end of processing a data_vio.
1129 static void release_allocated_lock(struct vdo_completion
*completion
)
1131 struct data_vio
*data_vio
= as_data_vio(completion
);
1133 assert_data_vio_in_allocated_zone(data_vio
);
1134 release_data_vio_allocation_lock(data_vio
, false);
1135 perform_cleanup_stage(data_vio
, VIO_RELEASE_RECOVERY_LOCKS
);
1138 /** release_lock() - Release an uncontended LBN lock. */
1139 static void release_lock(struct data_vio
*data_vio
, struct lbn_lock
*lock
)
1141 struct int_map
*lock_map
= lock
->zone
->lbn_operations
;
1142 struct data_vio
*lock_holder
;
1144 if (!lock
->locked
) {
1145 /* The lock is not locked, so it had better not be registered in the lock map. */
1146 struct data_vio
*lock_holder
= vdo_int_map_get(lock_map
, lock
->lbn
);
1148 VDO_ASSERT_LOG_ONLY((data_vio
!= lock_holder
),
1149 "no logical block lock held for block %llu",
1150 (unsigned long long) lock
->lbn
);
1154 /* Release the lock by removing the lock from the map. */
1155 lock_holder
= vdo_int_map_remove(lock_map
, lock
->lbn
);
1156 VDO_ASSERT_LOG_ONLY((data_vio
== lock_holder
),
1157 "logical block lock mismatch for block %llu",
1158 (unsigned long long) lock
->lbn
);
1159 lock
->locked
= false;
1162 /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */
1163 static void transfer_lock(struct data_vio
*data_vio
, struct lbn_lock
*lock
)
1165 struct data_vio
*lock_holder
, *next_lock_holder
;
1168 VDO_ASSERT_LOG_ONLY(lock
->locked
, "lbn_lock with waiters is not locked");
1170 /* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */
1172 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock
->waiters
));
1174 /* Transfer the remaining lock waiters to the next lock holder. */
1175 vdo_waitq_transfer_all_waiters(&lock
->waiters
,
1176 &next_lock_holder
->logical
.waiters
);
1178 result
= vdo_int_map_put(lock
->zone
->lbn_operations
, lock
->lbn
,
1179 next_lock_holder
, true, (void **) &lock_holder
);
1180 if (result
!= VDO_SUCCESS
) {
1181 continue_data_vio_with_error(next_lock_holder
, result
);
1185 VDO_ASSERT_LOG_ONLY((lock_holder
== data_vio
),
1186 "logical block lock mismatch for block %llu",
1187 (unsigned long long) lock
->lbn
);
1188 lock
->locked
= false;
1191 * If there are still waiters, other data_vios must be trying to get the lock we just
1192 * transferred. We must ensure that the new lock holder doesn't block in the packer.
1194 if (vdo_waitq_has_waiters(&next_lock_holder
->logical
.waiters
))
1195 cancel_data_vio_compression(next_lock_holder
);
1198 * Avoid stack overflow on lock transfer.
1199 * FIXME: this is only an issue in the 1 thread config.
1201 next_lock_holder
->vio
.completion
.requeue
= true;
1202 launch_locked_request(next_lock_holder
);
1206 * release_logical_lock() - Release the logical block lock and flush generation lock at the end of
1207 * processing a data_vio.
1209 static void release_logical_lock(struct vdo_completion
*completion
)
1211 struct data_vio
*data_vio
= as_data_vio(completion
);
1212 struct lbn_lock
*lock
= &data_vio
->logical
;
1214 assert_data_vio_in_logical_zone(data_vio
);
1216 if (vdo_waitq_has_waiters(&lock
->waiters
))
1217 transfer_lock(data_vio
, lock
);
1219 release_lock(data_vio
, lock
);
1221 vdo_release_flush_generation_lock(data_vio
);
1222 perform_cleanup_stage(data_vio
, VIO_CLEANUP_DONE
);
1225 /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */
1226 static void clean_hash_lock(struct vdo_completion
*completion
)
1228 struct data_vio
*data_vio
= as_data_vio(completion
);
1230 assert_data_vio_in_hash_zone(data_vio
);
1231 if (completion
->result
!= VDO_SUCCESS
) {
1232 vdo_clean_failed_hash_lock(data_vio
);
1236 vdo_release_hash_lock(data_vio
);
1237 perform_cleanup_stage(data_vio
, VIO_RELEASE_LOGICAL
);
1241 * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up.
1243 * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the
1246 static void finish_cleanup(struct data_vio
*data_vio
)
1248 struct vdo_completion
*completion
= &data_vio
->vio
.completion
;
1249 u32 discard_size
= min_t(u32
, data_vio
->remaining_discard
,
1250 VDO_BLOCK_SIZE
- data_vio
->offset
);
1252 VDO_ASSERT_LOG_ONLY(data_vio
->allocation
.lock
== NULL
,
1253 "complete data_vio has no allocation lock");
1254 VDO_ASSERT_LOG_ONLY(data_vio
->hash_lock
== NULL
,
1255 "complete data_vio has no hash lock");
1256 if ((data_vio
->remaining_discard
<= discard_size
) ||
1257 (completion
->result
!= VDO_SUCCESS
)) {
1258 struct data_vio_pool
*pool
= completion
->vdo
->data_vio_pool
;
1260 vdo_funnel_queue_put(pool
->queue
, &completion
->work_queue_entry_link
);
1261 schedule_releases(pool
);
1265 data_vio
->remaining_discard
-= discard_size
;
1266 data_vio
->is_partial
= (data_vio
->remaining_discard
< VDO_BLOCK_SIZE
);
1267 data_vio
->read
= data_vio
->is_partial
;
1268 data_vio
->offset
= 0;
1269 completion
->requeue
= true;
1270 data_vio
->first_reference_operation_complete
= false;
1271 launch_data_vio(data_vio
, data_vio
->logical
.lbn
+ 1);
1274 /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */
1275 static void perform_cleanup_stage(struct data_vio
*data_vio
,
1276 enum data_vio_cleanup_stage stage
)
1278 struct vdo
*vdo
= vdo_from_data_vio(data_vio
);
1281 case VIO_RELEASE_HASH_LOCK
:
1282 if (data_vio
->hash_lock
!= NULL
) {
1283 launch_data_vio_hash_zone_callback(data_vio
, clean_hash_lock
);
1288 case VIO_RELEASE_ALLOCATED
:
1289 if (data_vio_has_allocation(data_vio
)) {
1290 launch_data_vio_allocated_zone_callback(data_vio
,
1291 release_allocated_lock
);
1296 case VIO_RELEASE_RECOVERY_LOCKS
:
1297 if ((data_vio
->recovery_sequence_number
> 0) &&
1298 (READ_ONCE(vdo
->read_only_notifier
.read_only_error
) == VDO_SUCCESS
) &&
1299 (data_vio
->vio
.completion
.result
!= VDO_READ_ONLY
))
1300 vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock");
1303 case VIO_RELEASE_LOGICAL
:
1304 launch_data_vio_logical_callback(data_vio
, release_logical_lock
);
1308 finish_cleanup(data_vio
);
1312 void complete_data_vio(struct vdo_completion
*completion
)
1314 struct data_vio
*data_vio
= as_data_vio(completion
);
1316 completion
->error_handler
= NULL
;
1317 data_vio
->last_async_operation
= VIO_ASYNC_OP_CLEANUP
;
1318 perform_cleanup_stage(data_vio
,
1319 (data_vio
->write
? VIO_CLEANUP_START
: VIO_RELEASE_LOGICAL
));
1322 static void enter_read_only_mode(struct vdo_completion
*completion
)
1324 if (vdo_is_read_only(completion
->vdo
))
1327 if (completion
->result
!= VDO_READ_ONLY
) {
1328 struct data_vio
*data_vio
= as_data_vio(completion
);
1330 vdo_log_error_strerror(completion
->result
,
1331 "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s",
1332 (unsigned long long) data_vio
->logical
.lbn
,
1333 (unsigned long long) data_vio
->new_mapped
.pbn
,
1334 (unsigned long long) data_vio
->mapped
.pbn
,
1335 (unsigned long long) data_vio
->allocation
.pbn
,
1336 get_data_vio_operation_name(data_vio
));
1339 vdo_enter_read_only_mode(completion
->vdo
, completion
->result
);
1342 void handle_data_vio_error(struct vdo_completion
*completion
)
1344 struct data_vio
*data_vio
= as_data_vio(completion
);
1346 if ((completion
->result
== VDO_READ_ONLY
) || (data_vio
->user_bio
== NULL
))
1347 enter_read_only_mode(completion
);
1349 update_data_vio_error_stats(data_vio
);
1350 complete_data_vio(completion
);
1354 * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a
1357 const char *get_data_vio_operation_name(struct data_vio
*data_vio
)
1359 BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER
- MIN_VIO_ASYNC_OPERATION_NUMBER
) !=
1360 ARRAY_SIZE(ASYNC_OPERATION_NAMES
));
1362 return ((data_vio
->last_async_operation
< MAX_VIO_ASYNC_OPERATION_NUMBER
) ?
1363 ASYNC_OPERATION_NAMES
[data_vio
->last_async_operation
] :
1364 "unknown async operation");
1368 * data_vio_allocate_data_block() - Allocate a data block.
1370 * @write_lock_type: The type of write lock to obtain on the block.
1371 * @callback: The callback which will attempt an allocation in the current zone and continue if it
1373 * @error_handler: The handler for errors while allocating.
1375 void data_vio_allocate_data_block(struct data_vio
*data_vio
,
1376 enum pbn_lock_type write_lock_type
,
1377 vdo_action_fn callback
, vdo_action_fn error_handler
)
1379 struct allocation
*allocation
= &data_vio
->allocation
;
1381 VDO_ASSERT_LOG_ONLY((allocation
->pbn
== VDO_ZERO_BLOCK
),
1382 "data_vio does not have an allocation");
1383 allocation
->write_lock_type
= write_lock_type
;
1384 allocation
->zone
= vdo_get_next_allocation_zone(data_vio
->logical
.zone
);
1385 allocation
->first_allocation_zone
= allocation
->zone
->zone_number
;
1387 data_vio
->vio
.completion
.error_handler
= error_handler
;
1388 launch_data_vio_allocated_zone_callback(data_vio
, callback
);
1392 * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block.
1393 * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten).
1395 * If the reference to the locked block is still provisional, it will be released as well.
1397 void release_data_vio_allocation_lock(struct data_vio
*data_vio
, bool reset
)
1399 struct allocation
*allocation
= &data_vio
->allocation
;
1400 physical_block_number_t locked_pbn
= allocation
->pbn
;
1402 assert_data_vio_in_allocated_zone(data_vio
);
1404 if (reset
|| vdo_pbn_lock_has_provisional_reference(allocation
->lock
))
1405 allocation
->pbn
= VDO_ZERO_BLOCK
;
1407 vdo_release_physical_zone_pbn_lock(allocation
->zone
, locked_pbn
,
1408 vdo_forget(allocation
->lock
));
1412 * uncompress_data_vio() - Uncompress the data a data_vio has just read.
1413 * @mapping_state: The mapping state indicating which fragment to decompress.
1414 * @buffer: The buffer to receive the uncompressed data.
1416 int uncompress_data_vio(struct data_vio
*data_vio
,
1417 enum block_mapping_state mapping_state
, char *buffer
)
1420 u16 fragment_offset
, fragment_size
;
1421 struct compressed_block
*block
= data_vio
->compression
.block
;
1422 int result
= vdo_get_compressed_block_fragment(mapping_state
, block
,
1423 &fragment_offset
, &fragment_size
);
1425 if (result
!= VDO_SUCCESS
) {
1426 vdo_log_debug("%s: compressed fragment error %d", __func__
, result
);
1430 size
= LZ4_decompress_safe((block
->data
+ fragment_offset
), buffer
,
1431 fragment_size
, VDO_BLOCK_SIZE
);
1432 if (size
!= VDO_BLOCK_SIZE
) {
1433 vdo_log_debug("%s: lz4 error", __func__
);
1434 return VDO_INVALID_FRAGMENT
;
1441 * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle.
1442 * @completion: The data_vio which has just finished its read.
1444 * This callback is registered in read_block().
1446 static void modify_for_partial_write(struct vdo_completion
*completion
)
1448 struct data_vio
*data_vio
= as_data_vio(completion
);
1449 char *data
= data_vio
->vio
.data
;
1450 struct bio
*bio
= data_vio
->user_bio
;
1452 assert_data_vio_on_cpu_thread(data_vio
);
1454 if (bio_op(bio
) == REQ_OP_DISCARD
) {
1455 memset(data
+ data_vio
->offset
, '\0', min_t(u32
,
1456 data_vio
->remaining_discard
,
1457 VDO_BLOCK_SIZE
- data_vio
->offset
));
1459 copy_from_bio(bio
, data
+ data_vio
->offset
);
1462 data_vio
->is_zero
= is_zero_block(data
);
1463 data_vio
->read
= false;
1464 launch_data_vio_logical_callback(data_vio
,
1465 continue_data_vio_with_block_map_slot
);
1468 static void complete_read(struct vdo_completion
*completion
)
1470 struct data_vio
*data_vio
= as_data_vio(completion
);
1471 char *data
= data_vio
->vio
.data
;
1472 bool compressed
= vdo_is_state_compressed(data_vio
->mapped
.state
);
1474 assert_data_vio_on_cpu_thread(data_vio
);
1477 int result
= uncompress_data_vio(data_vio
, data_vio
->mapped
.state
, data
);
1479 if (result
!= VDO_SUCCESS
) {
1480 continue_data_vio_with_error(data_vio
, result
);
1485 if (data_vio
->write
) {
1486 modify_for_partial_write(completion
);
1490 if (compressed
|| data_vio
->is_partial
)
1491 copy_to_bio(data_vio
->user_bio
, data
+ data_vio
->offset
);
1493 acknowledge_data_vio(data_vio
);
1494 complete_data_vio(completion
);
1497 static void read_endio(struct bio
*bio
)
1499 struct data_vio
*data_vio
= vio_as_data_vio(bio
->bi_private
);
1500 int result
= blk_status_to_errno(bio
->bi_status
);
1502 vdo_count_completed_bios(bio
);
1503 if (result
!= VDO_SUCCESS
) {
1504 continue_data_vio_with_error(data_vio
, result
);
1508 launch_data_vio_cpu_callback(data_vio
, complete_read
,
1509 CPU_Q_COMPLETE_READ_PRIORITY
);
1512 static void complete_zero_read(struct vdo_completion
*completion
)
1514 struct data_vio
*data_vio
= as_data_vio(completion
);
1516 assert_data_vio_on_cpu_thread(data_vio
);
1518 if (data_vio
->is_partial
) {
1519 memset(data_vio
->vio
.data
, 0, VDO_BLOCK_SIZE
);
1520 if (data_vio
->write
) {
1521 modify_for_partial_write(completion
);
1525 zero_fill_bio(data_vio
->user_bio
);
1528 complete_read(completion
);
1532 * read_block() - Read a block asynchronously.
1534 * This is the callback registered in read_block_mapping().
1536 static void read_block(struct vdo_completion
*completion
)
1538 struct data_vio
*data_vio
= as_data_vio(completion
);
1539 struct vio
*vio
= as_vio(completion
);
1540 int result
= VDO_SUCCESS
;
1542 if (data_vio
->mapped
.pbn
== VDO_ZERO_BLOCK
) {
1543 launch_data_vio_cpu_callback(data_vio
, complete_zero_read
,
1544 CPU_Q_COMPLETE_VIO_PRIORITY
);
1548 data_vio
->last_async_operation
= VIO_ASYNC_OP_READ_DATA_VIO
;
1549 if (vdo_is_state_compressed(data_vio
->mapped
.state
)) {
1550 result
= vio_reset_bio(vio
, (char *) data_vio
->compression
.block
,
1551 read_endio
, REQ_OP_READ
, data_vio
->mapped
.pbn
);
1553 blk_opf_t opf
= ((data_vio
->user_bio
->bi_opf
& PASSTHROUGH_FLAGS
) | REQ_OP_READ
);
1555 if (data_vio
->is_partial
) {
1556 result
= vio_reset_bio(vio
, vio
->data
, read_endio
, opf
,
1557 data_vio
->mapped
.pbn
);
1559 /* A full 4k read. Use the incoming bio to avoid having to copy the data */
1560 bio_reset(vio
->bio
, vio
->bio
->bi_bdev
, opf
);
1561 bio_init_clone(data_vio
->user_bio
->bi_bdev
, vio
->bio
,
1562 data_vio
->user_bio
, GFP_KERNEL
);
1564 /* Copy over the original bio iovec and opflags. */
1565 vdo_set_bio_properties(vio
->bio
, vio
, read_endio
, opf
,
1566 data_vio
->mapped
.pbn
);
1570 if (result
!= VDO_SUCCESS
) {
1571 continue_data_vio_with_error(data_vio
, result
);
1575 vdo_submit_data_vio(data_vio
);
1578 static inline struct data_vio
*
1579 reference_count_update_completion_as_data_vio(struct vdo_completion
*completion
)
1581 if (completion
->type
== VIO_COMPLETION
)
1582 return as_data_vio(completion
);
1584 return container_of(completion
, struct data_vio
, decrement_completion
);
1588 * update_block_map() - Rendezvous of the data_vio and decrement completions after each has
1589 * made its reference updates. Handle any error from either, or proceed
1590 * to updating the block map.
1591 * @completion: The completion of the write in progress.
1593 static void update_block_map(struct vdo_completion
*completion
)
1595 struct data_vio
*data_vio
= reference_count_update_completion_as_data_vio(completion
);
1597 assert_data_vio_in_logical_zone(data_vio
);
1599 if (!data_vio
->first_reference_operation_complete
) {
1600 /* Rendezvous, we're first */
1601 data_vio
->first_reference_operation_complete
= true;
1605 completion
= &data_vio
->vio
.completion
;
1606 vdo_set_completion_result(completion
, data_vio
->decrement_completion
.result
);
1607 if (completion
->result
!= VDO_SUCCESS
) {
1608 handle_data_vio_error(completion
);
1612 completion
->error_handler
= handle_data_vio_error
;
1613 if (data_vio
->hash_lock
!= NULL
)
1614 set_data_vio_hash_zone_callback(data_vio
, vdo_continue_hash_lock
);
1616 completion
->callback
= complete_data_vio
;
1618 data_vio
->last_async_operation
= VIO_ASYNC_OP_PUT_MAPPED_BLOCK
;
1619 vdo_put_mapped_block(data_vio
);
1622 static void decrement_reference_count(struct vdo_completion
*completion
)
1624 struct data_vio
*data_vio
= container_of(completion
, struct data_vio
,
1625 decrement_completion
);
1627 assert_data_vio_in_mapped_zone(data_vio
);
1629 vdo_set_completion_callback(completion
, update_block_map
,
1630 data_vio
->logical
.zone
->thread_id
);
1631 completion
->error_handler
= update_block_map
;
1632 vdo_modify_reference_count(completion
, &data_vio
->decrement_updater
);
1635 static void increment_reference_count(struct vdo_completion
*completion
)
1637 struct data_vio
*data_vio
= as_data_vio(completion
);
1639 assert_data_vio_in_new_mapped_zone(data_vio
);
1641 if (data_vio
->downgrade_allocation_lock
) {
1643 * Now that the data has been written, it's safe to deduplicate against the
1644 * block. Downgrade the allocation lock to a read lock so it can be used later by
1645 * the hash lock. This is done here since it needs to happen sometime before we
1646 * return to the hash zone, and we are currently on the correct thread. For
1647 * compressed blocks, the downgrade will have already been done.
1649 vdo_downgrade_pbn_write_lock(data_vio
->allocation
.lock
, false);
1652 set_data_vio_logical_callback(data_vio
, update_block_map
);
1653 completion
->error_handler
= update_block_map
;
1654 vdo_modify_reference_count(completion
, &data_vio
->increment_updater
);
1657 /** journal_remapping() - Add a recovery journal entry for a data remapping. */
1658 static void journal_remapping(struct vdo_completion
*completion
)
1660 struct data_vio
*data_vio
= as_data_vio(completion
);
1662 assert_data_vio_in_journal_zone(data_vio
);
1664 data_vio
->decrement_updater
.operation
= VDO_JOURNAL_DATA_REMAPPING
;
1665 data_vio
->decrement_updater
.zpbn
= data_vio
->mapped
;
1666 if (data_vio
->new_mapped
.pbn
== VDO_ZERO_BLOCK
) {
1667 data_vio
->first_reference_operation_complete
= true;
1668 if (data_vio
->mapped
.pbn
== VDO_ZERO_BLOCK
)
1669 set_data_vio_logical_callback(data_vio
, update_block_map
);
1671 set_data_vio_new_mapped_zone_callback(data_vio
,
1672 increment_reference_count
);
1675 if (data_vio
->mapped
.pbn
== VDO_ZERO_BLOCK
) {
1676 data_vio
->first_reference_operation_complete
= true;
1678 vdo_set_completion_callback(&data_vio
->decrement_completion
,
1679 decrement_reference_count
,
1680 data_vio
->mapped
.zone
->thread_id
);
1683 data_vio
->last_async_operation
= VIO_ASYNC_OP_JOURNAL_REMAPPING
;
1684 vdo_add_recovery_journal_entry(completion
->vdo
->recovery_journal
, data_vio
);
1688 * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write.
1690 * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate
1691 * journal entry referencing the removal of this LBN->PBN mapping.
1693 static void read_old_block_mapping(struct vdo_completion
*completion
)
1695 struct data_vio
*data_vio
= as_data_vio(completion
);
1697 assert_data_vio_in_logical_zone(data_vio
);
1699 data_vio
->last_async_operation
= VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE
;
1700 set_data_vio_journal_callback(data_vio
, journal_remapping
);
1701 vdo_get_mapped_block(data_vio
);
1704 void update_metadata_for_data_vio_write(struct data_vio
*data_vio
, struct pbn_lock
*lock
)
1706 data_vio
->increment_updater
= (struct reference_updater
) {
1707 .operation
= VDO_JOURNAL_DATA_REMAPPING
,
1709 .zpbn
= data_vio
->new_mapped
,
1713 launch_data_vio_logical_callback(data_vio
, read_old_block_mapping
);
1717 * pack_compressed_data() - Attempt to pack the compressed data_vio into a block.
1719 * This is the callback registered in launch_compress_data_vio().
1721 static void pack_compressed_data(struct vdo_completion
*completion
)
1723 struct data_vio
*data_vio
= as_data_vio(completion
);
1725 assert_data_vio_in_packer_zone(data_vio
);
1727 if (!vdo_get_compressing(vdo_from_data_vio(data_vio
)) ||
1728 get_data_vio_compression_status(data_vio
).may_not_compress
) {
1729 write_data_vio(data_vio
);
1733 data_vio
->last_async_operation
= VIO_ASYNC_OP_ATTEMPT_PACKING
;
1734 vdo_attempt_packing(data_vio
);
1738 * compress_data_vio() - Do the actual work of compressing the data on a CPU queue.
1740 * This callback is registered in launch_compress_data_vio().
1742 static void compress_data_vio(struct vdo_completion
*completion
)
1744 struct data_vio
*data_vio
= as_data_vio(completion
);
1747 assert_data_vio_on_cpu_thread(data_vio
);
1750 * By putting the compressed data at the start of the compressed block data field, we won't
1751 * need to copy it if this data_vio becomes a compressed write agent.
1753 size
= LZ4_compress_default(data_vio
->vio
.data
,
1754 data_vio
->compression
.block
->data
, VDO_BLOCK_SIZE
,
1755 VDO_MAX_COMPRESSED_FRAGMENT_SIZE
,
1756 (char *) vdo_get_work_queue_private_data());
1757 if ((size
> 0) && (size
< VDO_COMPRESSED_BLOCK_DATA_SIZE
)) {
1758 data_vio
->compression
.size
= size
;
1759 launch_data_vio_packer_callback(data_vio
, pack_compressed_data
);
1763 write_data_vio(data_vio
);
1767 * launch_compress_data_vio() - Continue a write by attempting to compress the data.
1769 * This is a re-entry point to vio_write used by hash locks.
1771 void launch_compress_data_vio(struct data_vio
*data_vio
)
1773 VDO_ASSERT_LOG_ONLY(!data_vio
->is_duplicate
, "compressing a non-duplicate block");
1774 VDO_ASSERT_LOG_ONLY(data_vio
->hash_lock
!= NULL
,
1775 "data_vio to compress has a hash_lock");
1776 VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio
),
1777 "data_vio to compress has an allocation");
1780 * There are 4 reasons why a data_vio which has reached this point will not be eligible for
1783 * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the
1784 * write request also requests FUA.
1786 * 2) A data_vio should not be compressed when compression is disabled for the vdo.
1788 * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not
1789 * yet been acknowledged and hence blocking in the packer would be bad.
1791 * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the
1792 * packer would also be bad.
1794 if (data_vio
->fua
||
1795 !vdo_get_compressing(vdo_from_data_vio(data_vio
)) ||
1796 ((data_vio
->user_bio
!= NULL
) && (bio_op(data_vio
->user_bio
) == REQ_OP_DISCARD
)) ||
1797 (advance_data_vio_compression_stage(data_vio
).stage
!= DATA_VIO_COMPRESSING
)) {
1798 write_data_vio(data_vio
);
1802 data_vio
->last_async_operation
= VIO_ASYNC_OP_COMPRESS_DATA_VIO
;
1803 launch_data_vio_cpu_callback(data_vio
, compress_data_vio
,
1804 CPU_Q_COMPRESS_BLOCK_PRIORITY
);
1808 * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record
1811 * This callback is registered in prepare_for_dedupe().
1813 static void hash_data_vio(struct vdo_completion
*completion
)
1815 struct data_vio
*data_vio
= as_data_vio(completion
);
1817 assert_data_vio_on_cpu_thread(data_vio
);
1818 VDO_ASSERT_LOG_ONLY(!data_vio
->is_zero
, "zero blocks should not be hashed");
1820 murmurhash3_128(data_vio
->vio
.data
, VDO_BLOCK_SIZE
, 0x62ea60be,
1821 &data_vio
->record_name
);
1823 data_vio
->hash_zone
= vdo_select_hash_zone(vdo_from_data_vio(data_vio
)->hash_zones
,
1824 &data_vio
->record_name
);
1825 data_vio
->last_async_operation
= VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK
;
1826 launch_data_vio_hash_zone_callback(data_vio
, vdo_acquire_hash_lock
);
1829 /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */
1830 static void prepare_for_dedupe(struct data_vio
*data_vio
)
1832 /* We don't care what thread we are on. */
1833 VDO_ASSERT_LOG_ONLY(!data_vio
->is_zero
, "must not prepare to dedupe zero blocks");
1836 * Before we can dedupe, we need to know the record name, so the first
1837 * step is to hash the block data.
1839 data_vio
->last_async_operation
= VIO_ASYNC_OP_HASH_DATA_VIO
;
1840 launch_data_vio_cpu_callback(data_vio
, hash_data_vio
, CPU_Q_HASH_BLOCK_PRIORITY
);
1844 * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called
1845 * when a data_vio's write to the underlying storage has completed.
1847 static void write_bio_finished(struct bio
*bio
)
1849 struct data_vio
*data_vio
= vio_as_data_vio((struct vio
*) bio
->bi_private
);
1851 vdo_count_completed_bios(bio
);
1852 vdo_set_completion_result(&data_vio
->vio
.completion
,
1853 blk_status_to_errno(bio
->bi_status
));
1854 data_vio
->downgrade_allocation_lock
= true;
1855 update_metadata_for_data_vio_write(data_vio
, data_vio
->allocation
.lock
);
1858 /** write_data_vio() - Write a data block to storage without compression. */
1859 void write_data_vio(struct data_vio
*data_vio
)
1861 struct data_vio_compression_status status
, new_status
;
1864 if (!data_vio_has_allocation(data_vio
)) {
1866 * There was no space to write this block and we failed to deduplicate or compress
1869 continue_data_vio_with_error(data_vio
, VDO_NO_SPACE
);
1873 new_status
= (struct data_vio_compression_status
) {
1874 .stage
= DATA_VIO_POST_PACKER
,
1875 .may_not_compress
= true,
1879 status
= get_data_vio_compression_status(data_vio
);
1880 } while ((status
.stage
!= DATA_VIO_POST_PACKER
) &&
1881 !set_data_vio_compression_status(data_vio
, status
, new_status
));
1883 /* Write the data from the data block buffer. */
1884 result
= vio_reset_bio(&data_vio
->vio
, data_vio
->vio
.data
,
1885 write_bio_finished
, REQ_OP_WRITE
,
1886 data_vio
->allocation
.pbn
);
1887 if (result
!= VDO_SUCCESS
) {
1888 continue_data_vio_with_error(data_vio
, result
);
1892 data_vio
->last_async_operation
= VIO_ASYNC_OP_WRITE_DATA_VIO
;
1893 vdo_submit_data_vio(data_vio
);
1897 * acknowledge_write_callback() - Acknowledge a write to the requestor.
1899 * This callback is registered in allocate_block() and continue_write_with_block_map_slot().
1901 static void acknowledge_write_callback(struct vdo_completion
*completion
)
1903 struct data_vio
*data_vio
= as_data_vio(completion
);
1904 struct vdo
*vdo
= completion
->vdo
;
1906 VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo
) ||
1907 (vdo_get_callback_thread_id() == vdo
->thread_config
.bio_ack_thread
)),
1908 "%s() called on bio ack queue", __func__
);
1909 VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio
),
1910 "write VIO to be acknowledged has a flush generation lock");
1911 acknowledge_data_vio(data_vio
);
1912 if (data_vio
->new_mapped
.pbn
== VDO_ZERO_BLOCK
) {
1913 /* This is a zero write or discard */
1914 update_metadata_for_data_vio_write(data_vio
, NULL
);
1918 prepare_for_dedupe(data_vio
);
1922 * allocate_block() - Attempt to allocate a block in the current allocation zone.
1924 * This callback is registered in continue_write_with_block_map_slot().
1926 static void allocate_block(struct vdo_completion
*completion
)
1928 struct data_vio
*data_vio
= as_data_vio(completion
);
1930 assert_data_vio_in_allocated_zone(data_vio
);
1932 if (!vdo_allocate_block_in_zone(data_vio
))
1935 completion
->error_handler
= handle_data_vio_error
;
1936 WRITE_ONCE(data_vio
->allocation_succeeded
, true);
1937 data_vio
->new_mapped
= (struct zoned_pbn
) {
1938 .zone
= data_vio
->allocation
.zone
,
1939 .pbn
= data_vio
->allocation
.pbn
,
1940 .state
= VDO_MAPPING_STATE_UNCOMPRESSED
,
1943 if (data_vio
->fua
||
1944 data_vio
->remaining_discard
> (u32
) (VDO_BLOCK_SIZE
- data_vio
->offset
)) {
1945 prepare_for_dedupe(data_vio
);
1949 data_vio
->last_async_operation
= VIO_ASYNC_OP_ACKNOWLEDGE_WRITE
;
1950 launch_data_vio_on_bio_ack_queue(data_vio
, acknowledge_write_callback
);
1954 * handle_allocation_error() - Handle an error attempting to allocate a block.
1956 * This error handler is registered in continue_write_with_block_map_slot().
1958 static void handle_allocation_error(struct vdo_completion
*completion
)
1960 struct data_vio
*data_vio
= as_data_vio(completion
);
1962 if (completion
->result
== VDO_NO_SPACE
) {
1963 /* We failed to get an allocation, but we can try to dedupe. */
1964 vdo_reset_completion(completion
);
1965 completion
->error_handler
= handle_data_vio_error
;
1966 prepare_for_dedupe(data_vio
);
1970 /* We got a "real" error, not just a failure to allocate, so fail the request. */
1971 handle_data_vio_error(completion
);
1974 static int assert_is_discard(struct data_vio
*data_vio
)
1976 int result
= VDO_ASSERT(data_vio
->is_discard
,
1977 "data_vio with no block map page is a discard");
1979 return ((result
== VDO_SUCCESS
) ? result
: VDO_READ_ONLY
);
1983 * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map.
1985 * This callback is registered in launch_read_data_vio().
1987 void continue_data_vio_with_block_map_slot(struct vdo_completion
*completion
)
1989 struct data_vio
*data_vio
= as_data_vio(completion
);
1991 assert_data_vio_in_logical_zone(data_vio
);
1992 if (data_vio
->read
) {
1993 set_data_vio_logical_callback(data_vio
, read_block
);
1994 data_vio
->last_async_operation
= VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ
;
1995 vdo_get_mapped_block(data_vio
);
1999 vdo_acquire_flush_generation_lock(data_vio
);
2001 if (data_vio
->tree_lock
.tree_slots
[0].block_map_slot
.pbn
== VDO_ZERO_BLOCK
) {
2003 * This is a discard for a block on a block map page which has not been allocated, so
2004 * there's nothing more we need to do.
2006 completion
->callback
= complete_data_vio
;
2007 continue_data_vio_with_error(data_vio
, assert_is_discard(data_vio
));
2012 * We need an allocation if this is neither a full-block discard nor a
2013 * full-block zero write.
2015 if (!data_vio
->is_zero
&& (!data_vio
->is_discard
|| data_vio
->is_partial
)) {
2016 data_vio_allocate_data_block(data_vio
, VIO_WRITE_LOCK
, allocate_block
,
2017 handle_allocation_error
);
2022 * We don't need to write any data, so skip allocation and just update the block map and
2023 * reference counts (via the journal).
2025 data_vio
->new_mapped
.pbn
= VDO_ZERO_BLOCK
;
2026 if (data_vio
->is_zero
)
2027 data_vio
->new_mapped
.state
= VDO_MAPPING_STATE_UNCOMPRESSED
;
2029 if (data_vio
->remaining_discard
> (u32
) (VDO_BLOCK_SIZE
- data_vio
->offset
)) {
2030 /* This is not the final block of a discard so we can't acknowledge it yet. */
2031 update_metadata_for_data_vio_write(data_vio
, NULL
);
2035 data_vio
->last_async_operation
= VIO_ASYNC_OP_ACKNOWLEDGE_WRITE
;
2036 launch_data_vio_on_bio_ack_queue(data_vio
, acknowledge_write_callback
);