Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "io-submitter.h"
7
8#include <linux/bio.h>
9#include <linux/kernel.h>
10#include <linux/mutex.h>
11
12#include "memory-alloc.h"
13#include "permassert.h"
14
15#include "data-vio.h"
16#include "logger.h"
17#include "types.h"
18#include "vdo.h"
19#include "vio.h"
20
21/*
22 * Submission of bio operations to the underlying storage device will go through a separate work
23 * queue thread (or more than one) to prevent blocking in other threads if the storage device has a
24 * full queue. The plug structure allows that thread to do better batching of requests to make the
25 * I/O more efficient.
26 *
27 * When multiple worker threads are used, a thread is chosen for a I/O operation submission based
28 * on the PBN, so a given PBN will consistently wind up on the same thread. Flush operations are
29 * assigned round-robin.
30 *
31 * The map (protected by the mutex) collects pending I/O operations so that the worker thread can
32 * reorder them to try to encourage I/O request merging in the request queue underneath.
33 */
34struct bio_queue_data {
35 struct vdo_work_queue *queue;
36 struct blk_plug plug;
37 struct int_map *map;
38 struct mutex lock;
39 unsigned int queue_number;
40};
41
42struct io_submitter {
43 unsigned int num_bio_queues_used;
44 unsigned int bio_queue_rotation_interval;
45 struct bio_queue_data bio_queue_data[];
46};
47
48static void start_bio_queue(void *ptr)
49{
50 struct bio_queue_data *bio_queue_data = ptr;
51
52 blk_start_plug(&bio_queue_data->plug);
53}
54
55static void finish_bio_queue(void *ptr)
56{
57 struct bio_queue_data *bio_queue_data = ptr;
58
59 blk_finish_plug(&bio_queue_data->plug);
60}
61
62static const struct vdo_work_queue_type bio_queue_type = {
63 .start = start_bio_queue,
64 .finish = finish_bio_queue,
65 .max_priority = BIO_Q_MAX_PRIORITY,
66 .default_priority = BIO_Q_DATA_PRIORITY,
67};
68
69/**
70 * count_all_bios() - Determine which bio counter to use.
71 * @vio: The vio associated with the bio.
72 * @bio: The bio to count.
73 */
74static void count_all_bios(struct vio *vio, struct bio *bio)
75{
76 struct atomic_statistics *stats = &vio->completion.vdo->stats;
77
78 if (is_data_vio(vio)) {
79 vdo_count_bios(&stats->bios_out, bio);
80 return;
81 }
82
83 vdo_count_bios(&stats->bios_meta, bio);
84 if (vio->type == VIO_TYPE_RECOVERY_JOURNAL)
85 vdo_count_bios(&stats->bios_journal, bio);
86 else if (vio->type == VIO_TYPE_BLOCK_MAP)
87 vdo_count_bios(&stats->bios_page_cache, bio);
88}
89
90/**
91 * assert_in_bio_zone() - Assert that a vio is in the correct bio zone and not in interrupt
92 * context.
93 * @vio: The vio to check.
94 */
95static void assert_in_bio_zone(struct vio *vio)
96{
97 VDO_ASSERT_LOG_ONLY(!in_interrupt(), "not in interrupt context");
98 assert_vio_in_bio_zone(vio);
99}
100
101/**
102 * send_bio_to_device() - Update stats and tracing info, then submit the supplied bio to the OS for
103 * processing.
104 * @vio: The vio associated with the bio.
105 * @bio: The bio to submit to the OS.
106 */
107static void send_bio_to_device(struct vio *vio, struct bio *bio)
108{
109 struct vdo *vdo = vio->completion.vdo;
110
111 assert_in_bio_zone(vio);
112 atomic64_inc(&vdo->stats.bios_submitted);
113 count_all_bios(vio, bio);
114 bio_set_dev(bio, vdo_get_backing_device(vdo));
115 submit_bio_noacct(bio);
116}
117
118/**
119 * vdo_submit_vio() - Submits a vio's bio to the underlying block device. May block if the device
120 * is busy. This callback should be used by vios which did not attempt to merge.
121 * @completion: The vio to submit.
122 */
123void vdo_submit_vio(struct vdo_completion *completion)
124{
125 struct vio *vio = as_vio(completion);
126
127 send_bio_to_device(vio, vio->bio);
128}
129
130/**
131 * get_bio_list() - Extract the list of bios to submit from a vio.
132 * @vio: The vio submitting I/O.
133 *
134 * The list will always contain at least one entry (the bio for the vio on which it is called), but
135 * other bios may have been merged with it as well.
136 *
137 * Return: The head of the bio list to submit.
138 */
139static struct bio *get_bio_list(struct vio *vio)
140{
141 struct bio *bio;
142 struct io_submitter *submitter = vio->completion.vdo->io_submitter;
143 struct bio_queue_data *bio_queue_data = &(submitter->bio_queue_data[vio->bio_zone]);
144
145 assert_in_bio_zone(vio);
146
147 mutex_lock(&bio_queue_data->lock);
148 vdo_int_map_remove(bio_queue_data->map,
149 vio->bios_merged.head->bi_iter.bi_sector);
150 vdo_int_map_remove(bio_queue_data->map,
151 vio->bios_merged.tail->bi_iter.bi_sector);
152 bio = vio->bios_merged.head;
153 bio_list_init(&vio->bios_merged);
154 mutex_unlock(&bio_queue_data->lock);
155
156 return bio;
157}
158
159/**
160 * submit_data_vio() - Submit a data_vio's bio to the storage below along with
161 * any bios that have been merged with it.
162 * @completion: The vio to submit.
163 *
164 * Context: This call may block and so should only be called from a bio thread.
165 */
166static void submit_data_vio(struct vdo_completion *completion)
167{
168 struct bio *bio, *next;
169 struct vio *vio = as_vio(completion);
170
171 assert_in_bio_zone(vio);
172 for (bio = get_bio_list(vio); bio != NULL; bio = next) {
173 next = bio->bi_next;
174 bio->bi_next = NULL;
175 send_bio_to_device((struct vio *) bio->bi_private, bio);
176 }
177}
178
179/**
180 * get_mergeable_locked() - Attempt to find an already queued bio that the current bio can be
181 * merged with.
182 * @map: The bio map to use for merging.
183 * @vio: The vio we want to merge.
184 * @back_merge: Set to true for a back merge, false for a front merge.
185 *
186 * There are two types of merging possible, forward and backward, which are distinguished by a flag
187 * that uses kernel elevator terminology.
188 *
189 * Return: The vio to merge to, NULL if no merging is possible.
190 */
191static struct vio *get_mergeable_locked(struct int_map *map, struct vio *vio,
192 bool back_merge)
193{
194 struct bio *bio = vio->bio;
195 sector_t merge_sector = bio->bi_iter.bi_sector;
196 struct vio *vio_merge;
197
198 if (back_merge)
199 merge_sector -= VDO_SECTORS_PER_BLOCK;
200 else
201 merge_sector += VDO_SECTORS_PER_BLOCK;
202
203 vio_merge = vdo_int_map_get(map, merge_sector);
204
205 if (vio_merge == NULL)
206 return NULL;
207
208 if (vio->completion.priority != vio_merge->completion.priority)
209 return NULL;
210
211 if (bio_data_dir(bio) != bio_data_dir(vio_merge->bio))
212 return NULL;
213
214 if (bio_list_empty(&vio_merge->bios_merged))
215 return NULL;
216
217 if (back_merge) {
218 return (vio_merge->bios_merged.tail->bi_iter.bi_sector == merge_sector ?
219 vio_merge : NULL);
220 }
221
222 return (vio_merge->bios_merged.head->bi_iter.bi_sector == merge_sector ?
223 vio_merge : NULL);
224}
225
226static int map_merged_vio(struct int_map *bio_map, struct vio *vio)
227{
228 int result;
229 sector_t bio_sector;
230
231 bio_sector = vio->bios_merged.head->bi_iter.bi_sector;
232 result = vdo_int_map_put(bio_map, bio_sector, vio, true, NULL);
233 if (result != VDO_SUCCESS)
234 return result;
235
236 bio_sector = vio->bios_merged.tail->bi_iter.bi_sector;
237 return vdo_int_map_put(bio_map, bio_sector, vio, true, NULL);
238}
239
240static int merge_to_prev_tail(struct int_map *bio_map, struct vio *vio,
241 struct vio *prev_vio)
242{
243 vdo_int_map_remove(bio_map, prev_vio->bios_merged.tail->bi_iter.bi_sector);
244 bio_list_merge(&prev_vio->bios_merged, &vio->bios_merged);
245 return map_merged_vio(bio_map, prev_vio);
246}
247
248static int merge_to_next_head(struct int_map *bio_map, struct vio *vio,
249 struct vio *next_vio)
250{
251 /*
252 * Handle "next merge" and "gap fill" cases the same way so as to reorder bios in a way
253 * that's compatible with using funnel queues in work queues. This avoids removing an
254 * existing completion.
255 */
256 vdo_int_map_remove(bio_map, next_vio->bios_merged.head->bi_iter.bi_sector);
257 bio_list_merge_head(&next_vio->bios_merged, &vio->bios_merged);
258 return map_merged_vio(bio_map, next_vio);
259}
260
261/**
262 * try_bio_map_merge() - Attempt to merge a vio's bio with other pending I/Os.
263 * @vio: The vio to merge.
264 *
265 * Currently this is only used for data_vios, but is broken out for future use with metadata vios.
266 *
267 * Return: Whether or not the vio was merged.
268 */
269static bool try_bio_map_merge(struct vio *vio)
270{
271 int result;
272 bool merged = true;
273 struct bio *bio = vio->bio;
274 struct vio *prev_vio, *next_vio;
275 struct vdo *vdo = vio->completion.vdo;
276 struct bio_queue_data *bio_queue_data =
277 &vdo->io_submitter->bio_queue_data[vio->bio_zone];
278
279 bio->bi_next = NULL;
280 bio_list_init(&vio->bios_merged);
281 bio_list_add(&vio->bios_merged, bio);
282
283 mutex_lock(&bio_queue_data->lock);
284 prev_vio = get_mergeable_locked(bio_queue_data->map, vio, true);
285 next_vio = get_mergeable_locked(bio_queue_data->map, vio, false);
286 if (prev_vio == next_vio)
287 next_vio = NULL;
288
289 if ((prev_vio == NULL) && (next_vio == NULL)) {
290 /* no merge. just add to bio_queue */
291 merged = false;
292 result = vdo_int_map_put(bio_queue_data->map,
293 bio->bi_iter.bi_sector,
294 vio, true, NULL);
295 } else if (next_vio == NULL) {
296 /* Only prev. merge to prev's tail */
297 result = merge_to_prev_tail(bio_queue_data->map, vio, prev_vio);
298 } else {
299 /* Only next. merge to next's head */
300 result = merge_to_next_head(bio_queue_data->map, vio, next_vio);
301 }
302 mutex_unlock(&bio_queue_data->lock);
303
304 /* We don't care about failure of int_map_put in this case. */
305 VDO_ASSERT_LOG_ONLY(result == VDO_SUCCESS, "bio map insertion succeeds");
306 return merged;
307}
308
309/**
310 * vdo_submit_data_vio() - Submit I/O for a data_vio.
311 * @data_vio: The data_vio for which to issue I/O.
312 *
313 * If possible, this I/O will be merged other pending I/Os. Otherwise, the data_vio will be sent to
314 * the appropriate bio zone directly.
315 */
316void vdo_submit_data_vio(struct data_vio *data_vio)
317{
318 if (try_bio_map_merge(&data_vio->vio))
319 return;
320
321 launch_data_vio_bio_zone_callback(data_vio, submit_data_vio);
322}
323
324/**
325 * __submit_metadata_vio() - Submit I/O for a metadata vio.
326 * @vio: The vio for which to issue I/O.
327 * @physical: The physical block number to read or write.
328 * @callback: The bio endio function which will be called after the I/O completes.
329 * @error_handler: The handler for submission or I/O errors; may be NULL.
330 * @operation: The type of I/O to perform.
331 * @data: The buffer to read or write; may be NULL.
332 * @size: The I/O amount in bytes.
333 *
334 * The vio is enqueued on a vdo bio queue so that bio submission (which may block) does not block
335 * other vdo threads.
336 *
337 * That the error handler will run on the correct thread is only true so long as the thread calling
338 * this function, and the thread set in the endio callback are the same, as well as the fact that
339 * no error can occur on the bio queue. Currently this is true for all callers, but additional care
340 * will be needed if this ever changes.
341 */
342void __submit_metadata_vio(struct vio *vio, physical_block_number_t physical,
343 bio_end_io_t callback, vdo_action_fn error_handler,
344 blk_opf_t operation, char *data, int size)
345{
346 int result;
347 struct vdo_completion *completion = &vio->completion;
348 const struct admin_state_code *code = vdo_get_admin_state(completion->vdo);
349
350
351 VDO_ASSERT_LOG_ONLY(!code->quiescent, "I/O not allowed in state %s", code->name);
352
353 vdo_reset_completion(completion);
354 completion->error_handler = error_handler;
355 result = vio_reset_bio_with_size(vio, data, size, callback, operation | REQ_META,
356 physical);
357 if (result != VDO_SUCCESS) {
358 continue_vio(vio, result);
359 return;
360 }
361
362 vdo_set_completion_callback(completion, vdo_submit_vio,
363 get_vio_bio_zone_thread_id(vio));
364 vdo_launch_completion_with_priority(completion, get_metadata_priority(vio));
365}
366
367/**
368 * vdo_submit_metadata_vio_wait() - Submit I/O for a metadata vio and wait for completion.
369 * @vio: the vio for which to issue I/O
370 * @physical: the physical block number to read or write
371 * @operation: the type of I/O to perform
372 *
373 * The function operates similarly to __submit_metadata_vio except that it will
374 * block until the work is done. It can be used to do i/o before work queues
375 * and thread completions are set up.
376 *
377 * Return: VDO_SUCCESS or an error.
378 */
379int vdo_submit_metadata_vio_wait(struct vio *vio,
380 physical_block_number_t physical,
381 blk_opf_t operation)
382{
383 int result;
384
385 result = vio_reset_bio(vio, vio->data, NULL, operation | REQ_META, physical);
386 if (result != VDO_SUCCESS)
387 return result;
388
389 bio_set_dev(vio->bio, vdo_get_backing_device(vio->completion.vdo));
390 submit_bio_wait(vio->bio);
391 return blk_status_to_errno(vio->bio->bi_status);
392}
393
394/**
395 * vdo_make_io_submitter() - Create an io_submitter structure.
396 * @thread_count: Number of bio-submission threads to set up.
397 * @rotation_interval: Interval to use when rotating between bio-submission threads when enqueuing
398 * completions.
399 * @max_requests_active: Number of bios for merge tracking.
400 * @vdo: The vdo which will use this submitter.
401 * @io_submitter_ptr: pointer to the new data structure.
402 *
403 * Return: VDO_SUCCESS or an error.
404 */
405int vdo_make_io_submitter(unsigned int thread_count, unsigned int rotation_interval,
406 unsigned int max_requests_active, struct vdo *vdo,
407 struct io_submitter **io_submitter_ptr)
408{
409 unsigned int i;
410 struct io_submitter *io_submitter;
411 int result;
412
413 result = vdo_allocate_extended(thread_count, bio_queue_data, "bio submission data",
414 &io_submitter);
415 if (result != VDO_SUCCESS)
416 return result;
417
418 io_submitter->bio_queue_rotation_interval = rotation_interval;
419
420 /* Setup for each bio-submission work queue */
421 for (i = 0; i < thread_count; i++) {
422 struct bio_queue_data *bio_queue_data = &io_submitter->bio_queue_data[i];
423
424 mutex_init(&bio_queue_data->lock);
425 /*
426 * One I/O operation per request, but both first & last sector numbers.
427 *
428 * If requests are assigned to threads round-robin, they should be distributed
429 * quite evenly. But if they're assigned based on PBN, things can sometimes be very
430 * uneven. So for now, we'll assume that all requests *may* wind up on one thread,
431 * and thus all in the same map.
432 */
433 result = vdo_int_map_create(max_requests_active * 2,
434 &bio_queue_data->map);
435 if (result != VDO_SUCCESS) {
436 /*
437 * Clean up the partially initialized bio-queue entirely and indicate that
438 * initialization failed.
439 */
440 vdo_log_error("bio map initialization failed %d", result);
441 vdo_cleanup_io_submitter(io_submitter);
442 vdo_free_io_submitter(io_submitter);
443 return result;
444 }
445
446 bio_queue_data->queue_number = i;
447 result = vdo_make_thread(vdo, vdo->thread_config.bio_threads[i],
448 &bio_queue_type, 1, (void **) &bio_queue_data);
449 if (result != VDO_SUCCESS) {
450 /*
451 * Clean up the partially initialized bio-queue entirely and indicate that
452 * initialization failed.
453 */
454 vdo_int_map_free(vdo_forget(bio_queue_data->map));
455 vdo_log_error("bio queue initialization failed %d", result);
456 vdo_cleanup_io_submitter(io_submitter);
457 vdo_free_io_submitter(io_submitter);
458 return result;
459 }
460
461 bio_queue_data->queue = vdo->threads[vdo->thread_config.bio_threads[i]].queue;
462 io_submitter->num_bio_queues_used++;
463 }
464
465 *io_submitter_ptr = io_submitter;
466
467 return VDO_SUCCESS;
468}
469
470/**
471 * vdo_cleanup_io_submitter() - Tear down the io_submitter fields as needed for a physical layer.
472 * @io_submitter: The I/O submitter data to tear down; may be NULL.
473 */
474void vdo_cleanup_io_submitter(struct io_submitter *io_submitter)
475{
476 int i;
477
478 if (io_submitter == NULL)
479 return;
480
481 for (i = io_submitter->num_bio_queues_used - 1; i >= 0; i--)
482 vdo_finish_work_queue(io_submitter->bio_queue_data[i].queue);
483}
484
485/**
486 * vdo_free_io_submitter() - Free the io_submitter fields and structure as needed.
487 * @io_submitter: The I/O submitter data to destroy.
488 *
489 * This must be called after vdo_cleanup_io_submitter(). It is used to release resources late in
490 * the shutdown process to avoid or reduce the chance of race conditions.
491 */
492void vdo_free_io_submitter(struct io_submitter *io_submitter)
493{
494 int i;
495
496 if (io_submitter == NULL)
497 return;
498
499 for (i = io_submitter->num_bio_queues_used - 1; i >= 0; i--) {
500 io_submitter->num_bio_queues_used--;
501 /* vdo_destroy() will free the work queue, so just give up our reference to it. */
502 vdo_forget(io_submitter->bio_queue_data[i].queue);
503 vdo_int_map_free(vdo_forget(io_submitter->bio_queue_data[i].map));
504 }
505 vdo_free(io_submitter);
506}