Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 506 lines 16 kB view raw
1// SPDX-License-Identifier: GPL-2.0-only 2/* 3 * Copyright 2023 Red Hat 4 */ 5 6#include "io-submitter.h" 7 8#include <linux/bio.h> 9#include <linux/kernel.h> 10#include <linux/mutex.h> 11 12#include "memory-alloc.h" 13#include "permassert.h" 14 15#include "data-vio.h" 16#include "logger.h" 17#include "types.h" 18#include "vdo.h" 19#include "vio.h" 20 21/* 22 * Submission of bio operations to the underlying storage device will go through a separate work 23 * queue thread (or more than one) to prevent blocking in other threads if the storage device has a 24 * full queue. The plug structure allows that thread to do better batching of requests to make the 25 * I/O more efficient. 26 * 27 * When multiple worker threads are used, a thread is chosen for a I/O operation submission based 28 * on the PBN, so a given PBN will consistently wind up on the same thread. Flush operations are 29 * assigned round-robin. 30 * 31 * The map (protected by the mutex) collects pending I/O operations so that the worker thread can 32 * reorder them to try to encourage I/O request merging in the request queue underneath. 33 */ 34struct bio_queue_data { 35 struct vdo_work_queue *queue; 36 struct blk_plug plug; 37 struct int_map *map; 38 struct mutex lock; 39 unsigned int queue_number; 40}; 41 42struct io_submitter { 43 unsigned int num_bio_queues_used; 44 unsigned int bio_queue_rotation_interval; 45 struct bio_queue_data bio_queue_data[]; 46}; 47 48static void start_bio_queue(void *ptr) 49{ 50 struct bio_queue_data *bio_queue_data = ptr; 51 52 blk_start_plug(&bio_queue_data->plug); 53} 54 55static void finish_bio_queue(void *ptr) 56{ 57 struct bio_queue_data *bio_queue_data = ptr; 58 59 blk_finish_plug(&bio_queue_data->plug); 60} 61 62static const struct vdo_work_queue_type bio_queue_type = { 63 .start = start_bio_queue, 64 .finish = finish_bio_queue, 65 .max_priority = BIO_Q_MAX_PRIORITY, 66 .default_priority = BIO_Q_DATA_PRIORITY, 67}; 68 69/** 70 * count_all_bios() - Determine which bio counter to use. 71 * @vio: The vio associated with the bio. 72 * @bio: The bio to count. 73 */ 74static void count_all_bios(struct vio *vio, struct bio *bio) 75{ 76 struct atomic_statistics *stats = &vio->completion.vdo->stats; 77 78 if (is_data_vio(vio)) { 79 vdo_count_bios(&stats->bios_out, bio); 80 return; 81 } 82 83 vdo_count_bios(&stats->bios_meta, bio); 84 if (vio->type == VIO_TYPE_RECOVERY_JOURNAL) 85 vdo_count_bios(&stats->bios_journal, bio); 86 else if (vio->type == VIO_TYPE_BLOCK_MAP) 87 vdo_count_bios(&stats->bios_page_cache, bio); 88} 89 90/** 91 * assert_in_bio_zone() - Assert that a vio is in the correct bio zone and not in interrupt 92 * context. 93 * @vio: The vio to check. 94 */ 95static void assert_in_bio_zone(struct vio *vio) 96{ 97 VDO_ASSERT_LOG_ONLY(!in_interrupt(), "not in interrupt context"); 98 assert_vio_in_bio_zone(vio); 99} 100 101/** 102 * send_bio_to_device() - Update stats and tracing info, then submit the supplied bio to the OS for 103 * processing. 104 * @vio: The vio associated with the bio. 105 * @bio: The bio to submit to the OS. 106 */ 107static void send_bio_to_device(struct vio *vio, struct bio *bio) 108{ 109 struct vdo *vdo = vio->completion.vdo; 110 111 assert_in_bio_zone(vio); 112 atomic64_inc(&vdo->stats.bios_submitted); 113 count_all_bios(vio, bio); 114 bio_set_dev(bio, vdo_get_backing_device(vdo)); 115 submit_bio_noacct(bio); 116} 117 118/** 119 * vdo_submit_vio() - Submits a vio's bio to the underlying block device. May block if the device 120 * is busy. This callback should be used by vios which did not attempt to merge. 121 * @completion: The vio to submit. 122 */ 123void vdo_submit_vio(struct vdo_completion *completion) 124{ 125 struct vio *vio = as_vio(completion); 126 127 send_bio_to_device(vio, vio->bio); 128} 129 130/** 131 * get_bio_list() - Extract the list of bios to submit from a vio. 132 * @vio: The vio submitting I/O. 133 * 134 * The list will always contain at least one entry (the bio for the vio on which it is called), but 135 * other bios may have been merged with it as well. 136 * 137 * Return: The head of the bio list to submit. 138 */ 139static struct bio *get_bio_list(struct vio *vio) 140{ 141 struct bio *bio; 142 struct io_submitter *submitter = vio->completion.vdo->io_submitter; 143 struct bio_queue_data *bio_queue_data = &(submitter->bio_queue_data[vio->bio_zone]); 144 145 assert_in_bio_zone(vio); 146 147 mutex_lock(&bio_queue_data->lock); 148 vdo_int_map_remove(bio_queue_data->map, 149 vio->bios_merged.head->bi_iter.bi_sector); 150 vdo_int_map_remove(bio_queue_data->map, 151 vio->bios_merged.tail->bi_iter.bi_sector); 152 bio = vio->bios_merged.head; 153 bio_list_init(&vio->bios_merged); 154 mutex_unlock(&bio_queue_data->lock); 155 156 return bio; 157} 158 159/** 160 * submit_data_vio() - Submit a data_vio's bio to the storage below along with 161 * any bios that have been merged with it. 162 * @completion: The vio to submit. 163 * 164 * Context: This call may block and so should only be called from a bio thread. 165 */ 166static void submit_data_vio(struct vdo_completion *completion) 167{ 168 struct bio *bio, *next; 169 struct vio *vio = as_vio(completion); 170 171 assert_in_bio_zone(vio); 172 for (bio = get_bio_list(vio); bio != NULL; bio = next) { 173 next = bio->bi_next; 174 bio->bi_next = NULL; 175 send_bio_to_device((struct vio *) bio->bi_private, bio); 176 } 177} 178 179/** 180 * get_mergeable_locked() - Attempt to find an already queued bio that the current bio can be 181 * merged with. 182 * @map: The bio map to use for merging. 183 * @vio: The vio we want to merge. 184 * @back_merge: Set to true for a back merge, false for a front merge. 185 * 186 * There are two types of merging possible, forward and backward, which are distinguished by a flag 187 * that uses kernel elevator terminology. 188 * 189 * Return: The vio to merge to, NULL if no merging is possible. 190 */ 191static struct vio *get_mergeable_locked(struct int_map *map, struct vio *vio, 192 bool back_merge) 193{ 194 struct bio *bio = vio->bio; 195 sector_t merge_sector = bio->bi_iter.bi_sector; 196 struct vio *vio_merge; 197 198 if (back_merge) 199 merge_sector -= VDO_SECTORS_PER_BLOCK; 200 else 201 merge_sector += VDO_SECTORS_PER_BLOCK; 202 203 vio_merge = vdo_int_map_get(map, merge_sector); 204 205 if (vio_merge == NULL) 206 return NULL; 207 208 if (vio->completion.priority != vio_merge->completion.priority) 209 return NULL; 210 211 if (bio_data_dir(bio) != bio_data_dir(vio_merge->bio)) 212 return NULL; 213 214 if (bio_list_empty(&vio_merge->bios_merged)) 215 return NULL; 216 217 if (back_merge) { 218 return (vio_merge->bios_merged.tail->bi_iter.bi_sector == merge_sector ? 219 vio_merge : NULL); 220 } 221 222 return (vio_merge->bios_merged.head->bi_iter.bi_sector == merge_sector ? 223 vio_merge : NULL); 224} 225 226static int map_merged_vio(struct int_map *bio_map, struct vio *vio) 227{ 228 int result; 229 sector_t bio_sector; 230 231 bio_sector = vio->bios_merged.head->bi_iter.bi_sector; 232 result = vdo_int_map_put(bio_map, bio_sector, vio, true, NULL); 233 if (result != VDO_SUCCESS) 234 return result; 235 236 bio_sector = vio->bios_merged.tail->bi_iter.bi_sector; 237 return vdo_int_map_put(bio_map, bio_sector, vio, true, NULL); 238} 239 240static int merge_to_prev_tail(struct int_map *bio_map, struct vio *vio, 241 struct vio *prev_vio) 242{ 243 vdo_int_map_remove(bio_map, prev_vio->bios_merged.tail->bi_iter.bi_sector); 244 bio_list_merge(&prev_vio->bios_merged, &vio->bios_merged); 245 return map_merged_vio(bio_map, prev_vio); 246} 247 248static int merge_to_next_head(struct int_map *bio_map, struct vio *vio, 249 struct vio *next_vio) 250{ 251 /* 252 * Handle "next merge" and "gap fill" cases the same way so as to reorder bios in a way 253 * that's compatible with using funnel queues in work queues. This avoids removing an 254 * existing completion. 255 */ 256 vdo_int_map_remove(bio_map, next_vio->bios_merged.head->bi_iter.bi_sector); 257 bio_list_merge_head(&next_vio->bios_merged, &vio->bios_merged); 258 return map_merged_vio(bio_map, next_vio); 259} 260 261/** 262 * try_bio_map_merge() - Attempt to merge a vio's bio with other pending I/Os. 263 * @vio: The vio to merge. 264 * 265 * Currently this is only used for data_vios, but is broken out for future use with metadata vios. 266 * 267 * Return: Whether or not the vio was merged. 268 */ 269static bool try_bio_map_merge(struct vio *vio) 270{ 271 int result; 272 bool merged = true; 273 struct bio *bio = vio->bio; 274 struct vio *prev_vio, *next_vio; 275 struct vdo *vdo = vio->completion.vdo; 276 struct bio_queue_data *bio_queue_data = 277 &vdo->io_submitter->bio_queue_data[vio->bio_zone]; 278 279 bio->bi_next = NULL; 280 bio_list_init(&vio->bios_merged); 281 bio_list_add(&vio->bios_merged, bio); 282 283 mutex_lock(&bio_queue_data->lock); 284 prev_vio = get_mergeable_locked(bio_queue_data->map, vio, true); 285 next_vio = get_mergeable_locked(bio_queue_data->map, vio, false); 286 if (prev_vio == next_vio) 287 next_vio = NULL; 288 289 if ((prev_vio == NULL) && (next_vio == NULL)) { 290 /* no merge. just add to bio_queue */ 291 merged = false; 292 result = vdo_int_map_put(bio_queue_data->map, 293 bio->bi_iter.bi_sector, 294 vio, true, NULL); 295 } else if (next_vio == NULL) { 296 /* Only prev. merge to prev's tail */ 297 result = merge_to_prev_tail(bio_queue_data->map, vio, prev_vio); 298 } else { 299 /* Only next. merge to next's head */ 300 result = merge_to_next_head(bio_queue_data->map, vio, next_vio); 301 } 302 mutex_unlock(&bio_queue_data->lock); 303 304 /* We don't care about failure of int_map_put in this case. */ 305 VDO_ASSERT_LOG_ONLY(result == VDO_SUCCESS, "bio map insertion succeeds"); 306 return merged; 307} 308 309/** 310 * vdo_submit_data_vio() - Submit I/O for a data_vio. 311 * @data_vio: The data_vio for which to issue I/O. 312 * 313 * If possible, this I/O will be merged other pending I/Os. Otherwise, the data_vio will be sent to 314 * the appropriate bio zone directly. 315 */ 316void vdo_submit_data_vio(struct data_vio *data_vio) 317{ 318 if (try_bio_map_merge(&data_vio->vio)) 319 return; 320 321 launch_data_vio_bio_zone_callback(data_vio, submit_data_vio); 322} 323 324/** 325 * __submit_metadata_vio() - Submit I/O for a metadata vio. 326 * @vio: The vio for which to issue I/O. 327 * @physical: The physical block number to read or write. 328 * @callback: The bio endio function which will be called after the I/O completes. 329 * @error_handler: The handler for submission or I/O errors; may be NULL. 330 * @operation: The type of I/O to perform. 331 * @data: The buffer to read or write; may be NULL. 332 * @size: The I/O amount in bytes. 333 * 334 * The vio is enqueued on a vdo bio queue so that bio submission (which may block) does not block 335 * other vdo threads. 336 * 337 * That the error handler will run on the correct thread is only true so long as the thread calling 338 * this function, and the thread set in the endio callback are the same, as well as the fact that 339 * no error can occur on the bio queue. Currently this is true for all callers, but additional care 340 * will be needed if this ever changes. 341 */ 342void __submit_metadata_vio(struct vio *vio, physical_block_number_t physical, 343 bio_end_io_t callback, vdo_action_fn error_handler, 344 blk_opf_t operation, char *data, int size) 345{ 346 int result; 347 struct vdo_completion *completion = &vio->completion; 348 const struct admin_state_code *code = vdo_get_admin_state(completion->vdo); 349 350 351 VDO_ASSERT_LOG_ONLY(!code->quiescent, "I/O not allowed in state %s", code->name); 352 353 vdo_reset_completion(completion); 354 completion->error_handler = error_handler; 355 result = vio_reset_bio_with_size(vio, data, size, callback, operation | REQ_META, 356 physical); 357 if (result != VDO_SUCCESS) { 358 continue_vio(vio, result); 359 return; 360 } 361 362 vdo_set_completion_callback(completion, vdo_submit_vio, 363 get_vio_bio_zone_thread_id(vio)); 364 vdo_launch_completion_with_priority(completion, get_metadata_priority(vio)); 365} 366 367/** 368 * vdo_submit_metadata_vio_wait() - Submit I/O for a metadata vio and wait for completion. 369 * @vio: the vio for which to issue I/O 370 * @physical: the physical block number to read or write 371 * @operation: the type of I/O to perform 372 * 373 * The function operates similarly to __submit_metadata_vio except that it will 374 * block until the work is done. It can be used to do i/o before work queues 375 * and thread completions are set up. 376 * 377 * Return: VDO_SUCCESS or an error. 378 */ 379int vdo_submit_metadata_vio_wait(struct vio *vio, 380 physical_block_number_t physical, 381 blk_opf_t operation) 382{ 383 int result; 384 385 result = vio_reset_bio(vio, vio->data, NULL, operation | REQ_META, physical); 386 if (result != VDO_SUCCESS) 387 return result; 388 389 bio_set_dev(vio->bio, vdo_get_backing_device(vio->completion.vdo)); 390 submit_bio_wait(vio->bio); 391 return blk_status_to_errno(vio->bio->bi_status); 392} 393 394/** 395 * vdo_make_io_submitter() - Create an io_submitter structure. 396 * @thread_count: Number of bio-submission threads to set up. 397 * @rotation_interval: Interval to use when rotating between bio-submission threads when enqueuing 398 * completions. 399 * @max_requests_active: Number of bios for merge tracking. 400 * @vdo: The vdo which will use this submitter. 401 * @io_submitter_ptr: pointer to the new data structure. 402 * 403 * Return: VDO_SUCCESS or an error. 404 */ 405int vdo_make_io_submitter(unsigned int thread_count, unsigned int rotation_interval, 406 unsigned int max_requests_active, struct vdo *vdo, 407 struct io_submitter **io_submitter_ptr) 408{ 409 unsigned int i; 410 struct io_submitter *io_submitter; 411 int result; 412 413 result = vdo_allocate_extended(thread_count, bio_queue_data, "bio submission data", 414 &io_submitter); 415 if (result != VDO_SUCCESS) 416 return result; 417 418 io_submitter->bio_queue_rotation_interval = rotation_interval; 419 420 /* Setup for each bio-submission work queue */ 421 for (i = 0; i < thread_count; i++) { 422 struct bio_queue_data *bio_queue_data = &io_submitter->bio_queue_data[i]; 423 424 mutex_init(&bio_queue_data->lock); 425 /* 426 * One I/O operation per request, but both first & last sector numbers. 427 * 428 * If requests are assigned to threads round-robin, they should be distributed 429 * quite evenly. But if they're assigned based on PBN, things can sometimes be very 430 * uneven. So for now, we'll assume that all requests *may* wind up on one thread, 431 * and thus all in the same map. 432 */ 433 result = vdo_int_map_create(max_requests_active * 2, 434 &bio_queue_data->map); 435 if (result != VDO_SUCCESS) { 436 /* 437 * Clean up the partially initialized bio-queue entirely and indicate that 438 * initialization failed. 439 */ 440 vdo_log_error("bio map initialization failed %d", result); 441 vdo_cleanup_io_submitter(io_submitter); 442 vdo_free_io_submitter(io_submitter); 443 return result; 444 } 445 446 bio_queue_data->queue_number = i; 447 result = vdo_make_thread(vdo, vdo->thread_config.bio_threads[i], 448 &bio_queue_type, 1, (void **) &bio_queue_data); 449 if (result != VDO_SUCCESS) { 450 /* 451 * Clean up the partially initialized bio-queue entirely and indicate that 452 * initialization failed. 453 */ 454 vdo_int_map_free(vdo_forget(bio_queue_data->map)); 455 vdo_log_error("bio queue initialization failed %d", result); 456 vdo_cleanup_io_submitter(io_submitter); 457 vdo_free_io_submitter(io_submitter); 458 return result; 459 } 460 461 bio_queue_data->queue = vdo->threads[vdo->thread_config.bio_threads[i]].queue; 462 io_submitter->num_bio_queues_used++; 463 } 464 465 *io_submitter_ptr = io_submitter; 466 467 return VDO_SUCCESS; 468} 469 470/** 471 * vdo_cleanup_io_submitter() - Tear down the io_submitter fields as needed for a physical layer. 472 * @io_submitter: The I/O submitter data to tear down; may be NULL. 473 */ 474void vdo_cleanup_io_submitter(struct io_submitter *io_submitter) 475{ 476 int i; 477 478 if (io_submitter == NULL) 479 return; 480 481 for (i = io_submitter->num_bio_queues_used - 1; i >= 0; i--) 482 vdo_finish_work_queue(io_submitter->bio_queue_data[i].queue); 483} 484 485/** 486 * vdo_free_io_submitter() - Free the io_submitter fields and structure as needed. 487 * @io_submitter: The I/O submitter data to destroy. 488 * 489 * This must be called after vdo_cleanup_io_submitter(). It is used to release resources late in 490 * the shutdown process to avoid or reduce the chance of race conditions. 491 */ 492void vdo_free_io_submitter(struct io_submitter *io_submitter) 493{ 494 int i; 495 496 if (io_submitter == NULL) 497 return; 498 499 for (i = io_submitter->num_bio_queues_used - 1; i >= 0; i--) { 500 io_submitter->num_bio_queues_used--; 501 /* vdo_destroy() will free the work queue, so just give up our reference to it. */ 502 vdo_forget(io_submitter->bio_queue_data[i].queue); 503 vdo_int_map_free(vdo_forget(io_submitter->bio_queue_data[i].map)); 504 } 505 vdo_free(io_submitter); 506}