1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
6 #include "io-submitter.h"
9 #include <linux/kernel.h>
10 #include <linux/mutex.h>
12 #include "memory-alloc.h"
13 #include "permassert.h"
22 * Submission of bio operations to the underlying storage device will go through a separate work
23 * queue thread (or more than one) to prevent blocking in other threads if the storage device has a
24 * full queue. The plug structure allows that thread to do better batching of requests to make the
27 * When multiple worker threads are used, a thread is chosen for a I/O operation submission based
28 * on the PBN, so a given PBN will consistently wind up on the same thread. Flush operations are
29 * assigned round-robin.
31 * The map (protected by the mutex) collects pending I/O operations so that the worker thread can
32 * reorder them to try to encourage I/O request merging in the request queue underneath.
34 struct bio_queue_data
{
35 struct vdo_work_queue
*queue
;
39 unsigned int queue_number
;
43 unsigned int num_bio_queues_used
;
44 unsigned int bio_queue_rotation_interval
;
45 struct bio_queue_data bio_queue_data
[];
48 static void start_bio_queue(void *ptr
)
50 struct bio_queue_data
*bio_queue_data
= ptr
;
52 blk_start_plug(&bio_queue_data
->plug
);
55 static void finish_bio_queue(void *ptr
)
57 struct bio_queue_data
*bio_queue_data
= ptr
;
59 blk_finish_plug(&bio_queue_data
->plug
);
62 static const struct vdo_work_queue_type bio_queue_type
= {
63 .start
= start_bio_queue
,
64 .finish
= finish_bio_queue
,
65 .max_priority
= BIO_Q_MAX_PRIORITY
,
66 .default_priority
= BIO_Q_DATA_PRIORITY
,
70 * count_all_bios() - Determine which bio counter to use.
71 * @vio: The vio associated with the bio.
72 * @bio: The bio to count.
74 static void count_all_bios(struct vio
*vio
, struct bio
*bio
)
76 struct atomic_statistics
*stats
= &vio
->completion
.vdo
->stats
;
78 if (is_data_vio(vio
)) {
79 vdo_count_bios(&stats
->bios_out
, bio
);
83 vdo_count_bios(&stats
->bios_meta
, bio
);
84 if (vio
->type
== VIO_TYPE_RECOVERY_JOURNAL
)
85 vdo_count_bios(&stats
->bios_journal
, bio
);
86 else if (vio
->type
== VIO_TYPE_BLOCK_MAP
)
87 vdo_count_bios(&stats
->bios_page_cache
, bio
);
91 * assert_in_bio_zone() - Assert that a vio is in the correct bio zone and not in interrupt
93 * @vio: The vio to check.
95 static void assert_in_bio_zone(struct vio
*vio
)
97 VDO_ASSERT_LOG_ONLY(!in_interrupt(), "not in interrupt context");
98 assert_vio_in_bio_zone(vio
);
102 * send_bio_to_device() - Update stats and tracing info, then submit the supplied bio to the OS for
104 * @vio: The vio associated with the bio.
105 * @bio: The bio to submit to the OS.
107 static void send_bio_to_device(struct vio
*vio
, struct bio
*bio
)
109 struct vdo
*vdo
= vio
->completion
.vdo
;
111 assert_in_bio_zone(vio
);
112 atomic64_inc(&vdo
->stats
.bios_submitted
);
113 count_all_bios(vio
, bio
);
114 bio_set_dev(bio
, vdo_get_backing_device(vdo
));
115 submit_bio_noacct(bio
);
119 * vdo_submit_vio() - Submits a vio's bio to the underlying block device. May block if the device
120 * is busy. This callback should be used by vios which did not attempt to merge.
122 void vdo_submit_vio(struct vdo_completion
*completion
)
124 struct vio
*vio
= as_vio(completion
);
126 send_bio_to_device(vio
, vio
->bio
);
130 * get_bio_list() - Extract the list of bios to submit from a vio.
131 * @vio: The vio submitting I/O.
133 * The list will always contain at least one entry (the bio for the vio on which it is called), but
134 * other bios may have been merged with it as well.
136 * Return: bio The head of the bio list to submit.
138 static struct bio
*get_bio_list(struct vio
*vio
)
141 struct io_submitter
*submitter
= vio
->completion
.vdo
->io_submitter
;
142 struct bio_queue_data
*bio_queue_data
= &(submitter
->bio_queue_data
[vio
->bio_zone
]);
144 assert_in_bio_zone(vio
);
146 mutex_lock(&bio_queue_data
->lock
);
147 vdo_int_map_remove(bio_queue_data
->map
,
148 vio
->bios_merged
.head
->bi_iter
.bi_sector
);
149 vdo_int_map_remove(bio_queue_data
->map
,
150 vio
->bios_merged
.tail
->bi_iter
.bi_sector
);
151 bio
= vio
->bios_merged
.head
;
152 bio_list_init(&vio
->bios_merged
);
153 mutex_unlock(&bio_queue_data
->lock
);
159 * submit_data_vio() - Submit a data_vio's bio to the storage below along with
160 * any bios that have been merged with it.
162 * Context: This call may block and so should only be called from a bio thread.
164 static void submit_data_vio(struct vdo_completion
*completion
)
166 struct bio
*bio
, *next
;
167 struct vio
*vio
= as_vio(completion
);
169 assert_in_bio_zone(vio
);
170 for (bio
= get_bio_list(vio
); bio
!= NULL
; bio
= next
) {
173 send_bio_to_device((struct vio
*) bio
->bi_private
, bio
);
178 * get_mergeable_locked() - Attempt to find an already queued bio that the current bio can be
180 * @map: The bio map to use for merging.
181 * @vio: The vio we want to merge.
182 * @back_merge: Set to true for a back merge, false for a front merge.
184 * There are two types of merging possible, forward and backward, which are distinguished by a flag
185 * that uses kernel elevator terminology.
187 * Return: the vio to merge to, NULL if no merging is possible.
189 static struct vio
*get_mergeable_locked(struct int_map
*map
, struct vio
*vio
,
192 struct bio
*bio
= vio
->bio
;
193 sector_t merge_sector
= bio
->bi_iter
.bi_sector
;
194 struct vio
*vio_merge
;
197 merge_sector
-= VDO_SECTORS_PER_BLOCK
;
199 merge_sector
+= VDO_SECTORS_PER_BLOCK
;
201 vio_merge
= vdo_int_map_get(map
, merge_sector
);
203 if (vio_merge
== NULL
)
206 if (vio
->completion
.priority
!= vio_merge
->completion
.priority
)
209 if (bio_data_dir(bio
) != bio_data_dir(vio_merge
->bio
))
212 if (bio_list_empty(&vio_merge
->bios_merged
))
216 return (vio_merge
->bios_merged
.tail
->bi_iter
.bi_sector
== merge_sector
?
220 return (vio_merge
->bios_merged
.head
->bi_iter
.bi_sector
== merge_sector
?
224 static int map_merged_vio(struct int_map
*bio_map
, struct vio
*vio
)
229 bio_sector
= vio
->bios_merged
.head
->bi_iter
.bi_sector
;
230 result
= vdo_int_map_put(bio_map
, bio_sector
, vio
, true, NULL
);
231 if (result
!= VDO_SUCCESS
)
234 bio_sector
= vio
->bios_merged
.tail
->bi_iter
.bi_sector
;
235 return vdo_int_map_put(bio_map
, bio_sector
, vio
, true, NULL
);
238 static int merge_to_prev_tail(struct int_map
*bio_map
, struct vio
*vio
,
239 struct vio
*prev_vio
)
241 vdo_int_map_remove(bio_map
, prev_vio
->bios_merged
.tail
->bi_iter
.bi_sector
);
242 bio_list_merge(&prev_vio
->bios_merged
, &vio
->bios_merged
);
243 return map_merged_vio(bio_map
, prev_vio
);
246 static int merge_to_next_head(struct int_map
*bio_map
, struct vio
*vio
,
247 struct vio
*next_vio
)
250 * Handle "next merge" and "gap fill" cases the same way so as to reorder bios in a way
251 * that's compatible with using funnel queues in work queues. This avoids removing an
252 * existing completion.
254 vdo_int_map_remove(bio_map
, next_vio
->bios_merged
.head
->bi_iter
.bi_sector
);
255 bio_list_merge_head(&next_vio
->bios_merged
, &vio
->bios_merged
);
256 return map_merged_vio(bio_map
, next_vio
);
260 * try_bio_map_merge() - Attempt to merge a vio's bio with other pending I/Os.
261 * @vio: The vio to merge.
263 * Currently this is only used for data_vios, but is broken out for future use with metadata vios.
265 * Return: whether or not the vio was merged.
267 static bool try_bio_map_merge(struct vio
*vio
)
271 struct bio
*bio
= vio
->bio
;
272 struct vio
*prev_vio
, *next_vio
;
273 struct vdo
*vdo
= vio
->completion
.vdo
;
274 struct bio_queue_data
*bio_queue_data
=
275 &vdo
->io_submitter
->bio_queue_data
[vio
->bio_zone
];
278 bio_list_init(&vio
->bios_merged
);
279 bio_list_add(&vio
->bios_merged
, bio
);
281 mutex_lock(&bio_queue_data
->lock
);
282 prev_vio
= get_mergeable_locked(bio_queue_data
->map
, vio
, true);
283 next_vio
= get_mergeable_locked(bio_queue_data
->map
, vio
, false);
284 if (prev_vio
== next_vio
)
287 if ((prev_vio
== NULL
) && (next_vio
== NULL
)) {
288 /* no merge. just add to bio_queue */
290 result
= vdo_int_map_put(bio_queue_data
->map
,
291 bio
->bi_iter
.bi_sector
,
293 } else if (next_vio
== NULL
) {
294 /* Only prev. merge to prev's tail */
295 result
= merge_to_prev_tail(bio_queue_data
->map
, vio
, prev_vio
);
297 /* Only next. merge to next's head */
298 result
= merge_to_next_head(bio_queue_data
->map
, vio
, next_vio
);
300 mutex_unlock(&bio_queue_data
->lock
);
302 /* We don't care about failure of int_map_put in this case. */
303 VDO_ASSERT_LOG_ONLY(result
== VDO_SUCCESS
, "bio map insertion succeeds");
308 * vdo_submit_data_vio() - Submit I/O for a data_vio.
309 * @data_vio: the data_vio for which to issue I/O.
311 * If possible, this I/O will be merged other pending I/Os. Otherwise, the data_vio will be sent to
312 * the appropriate bio zone directly.
314 void vdo_submit_data_vio(struct data_vio
*data_vio
)
316 if (try_bio_map_merge(&data_vio
->vio
))
319 launch_data_vio_bio_zone_callback(data_vio
, submit_data_vio
);
323 * __submit_metadata_vio() - Submit I/O for a metadata vio.
324 * @vio: the vio for which to issue I/O
325 * @physical: the physical block number to read or write
326 * @callback: the bio endio function which will be called after the I/O completes
327 * @error_handler: the handler for submission or I/O errors (may be NULL)
328 * @operation: the type of I/O to perform
329 * @data: the buffer to read or write (may be NULL)
331 * The vio is enqueued on a vdo bio queue so that bio submission (which may block) does not block
334 * That the error handler will run on the correct thread is only true so long as the thread calling
335 * this function, and the thread set in the endio callback are the same, as well as the fact that
336 * no error can occur on the bio queue. Currently this is true for all callers, but additional care
337 * will be needed if this ever changes.
339 void __submit_metadata_vio(struct vio
*vio
, physical_block_number_t physical
,
340 bio_end_io_t callback
, vdo_action_fn error_handler
,
341 blk_opf_t operation
, char *data
)
344 struct vdo_completion
*completion
= &vio
->completion
;
345 const struct admin_state_code
*code
= vdo_get_admin_state(completion
->vdo
);
348 VDO_ASSERT_LOG_ONLY(!code
->quiescent
, "I/O not allowed in state %s", code
->name
);
350 vdo_reset_completion(completion
);
351 completion
->error_handler
= error_handler
;
352 result
= vio_reset_bio(vio
, data
, callback
, operation
| REQ_META
, physical
);
353 if (result
!= VDO_SUCCESS
) {
354 continue_vio(vio
, result
);
358 vdo_set_completion_callback(completion
, vdo_submit_vio
,
359 get_vio_bio_zone_thread_id(vio
));
360 vdo_launch_completion_with_priority(completion
, get_metadata_priority(vio
));
364 * vdo_make_io_submitter() - Create an io_submitter structure.
365 * @thread_count: Number of bio-submission threads to set up.
366 * @rotation_interval: Interval to use when rotating between bio-submission threads when enqueuing
368 * @max_requests_active: Number of bios for merge tracking.
369 * @vdo: The vdo which will use this submitter.
370 * @io_submitter_ptr: pointer to the new data structure.
372 * Return: VDO_SUCCESS or an error.
374 int vdo_make_io_submitter(unsigned int thread_count
, unsigned int rotation_interval
,
375 unsigned int max_requests_active
, struct vdo
*vdo
,
376 struct io_submitter
**io_submitter_ptr
)
379 struct io_submitter
*io_submitter
;
382 result
= vdo_allocate_extended(struct io_submitter
, thread_count
,
383 struct bio_queue_data
, "bio submission data",
385 if (result
!= VDO_SUCCESS
)
388 io_submitter
->bio_queue_rotation_interval
= rotation_interval
;
390 /* Setup for each bio-submission work queue */
391 for (i
= 0; i
< thread_count
; i
++) {
392 struct bio_queue_data
*bio_queue_data
= &io_submitter
->bio_queue_data
[i
];
394 mutex_init(&bio_queue_data
->lock
);
396 * One I/O operation per request, but both first & last sector numbers.
398 * If requests are assigned to threads round-robin, they should be distributed
399 * quite evenly. But if they're assigned based on PBN, things can sometimes be very
400 * uneven. So for now, we'll assume that all requests *may* wind up on one thread,
401 * and thus all in the same map.
403 result
= vdo_int_map_create(max_requests_active
* 2,
404 &bio_queue_data
->map
);
405 if (result
!= VDO_SUCCESS
) {
407 * Clean up the partially initialized bio-queue entirely and indicate that
408 * initialization failed.
410 vdo_log_error("bio map initialization failed %d", result
);
411 vdo_cleanup_io_submitter(io_submitter
);
412 vdo_free_io_submitter(io_submitter
);
416 bio_queue_data
->queue_number
= i
;
417 result
= vdo_make_thread(vdo
, vdo
->thread_config
.bio_threads
[i
],
418 &bio_queue_type
, 1, (void **) &bio_queue_data
);
419 if (result
!= VDO_SUCCESS
) {
421 * Clean up the partially initialized bio-queue entirely and indicate that
422 * initialization failed.
424 vdo_int_map_free(vdo_forget(bio_queue_data
->map
));
425 vdo_log_error("bio queue initialization failed %d", result
);
426 vdo_cleanup_io_submitter(io_submitter
);
427 vdo_free_io_submitter(io_submitter
);
431 bio_queue_data
->queue
= vdo
->threads
[vdo
->thread_config
.bio_threads
[i
]].queue
;
432 io_submitter
->num_bio_queues_used
++;
435 *io_submitter_ptr
= io_submitter
;
441 * vdo_cleanup_io_submitter() - Tear down the io_submitter fields as needed for a physical layer.
442 * @io_submitter: The I/O submitter data to tear down (may be NULL).
444 void vdo_cleanup_io_submitter(struct io_submitter
*io_submitter
)
448 if (io_submitter
== NULL
)
451 for (i
= io_submitter
->num_bio_queues_used
- 1; i
>= 0; i
--)
452 vdo_finish_work_queue(io_submitter
->bio_queue_data
[i
].queue
);
456 * vdo_free_io_submitter() - Free the io_submitter fields and structure as needed.
457 * @io_submitter: The I/O submitter data to destroy.
459 * This must be called after vdo_cleanup_io_submitter(). It is used to release resources late in
460 * the shutdown process to avoid or reduce the chance of race conditions.
462 void vdo_free_io_submitter(struct io_submitter
*io_submitter
)
466 if (io_submitter
== NULL
)
469 for (i
= io_submitter
->num_bio_queues_used
- 1; i
>= 0; i
--) {
470 io_submitter
->num_bio_queues_used
--;
471 /* vdo_destroy() will free the work queue, so just give up our reference to it. */
472 vdo_forget(io_submitter
->bio_queue_data
[i
].queue
);
473 vdo_int_map_free(vdo_forget(io_submitter
->bio_queue_data
[i
].map
));
475 vdo_free(io_submitter
);