Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge tag 'for-5.4/io_uring-2019-09-15' of git://git.kernel.dk/linux-block

Pull io_uring updates from Jens Axboe:

- Allocate SQ/CQ ring together, more efficient. Expose this through a
feature flag as well, so we can reduce the number of mmaps by 1
(Hristo and me)

- Fix for sequence logic with SQ thread (Jackie).

- Add support for links with drain commands (Jackie).

- Improved async merging (me)

- Improved buffered async write performance (me)

- Support SQ poll wakeup + event get in single io_uring_enter() (me)

- Support larger SQ ring size. For epoll conversions, the 4k limit was
too small for some prod workloads (Daniel).

- put_user_page() usage (John)

* tag 'for-5.4/io_uring-2019-09-15' of git://git.kernel.dk/linux-block:
io_uring: increase IORING_MAX_ENTRIES to 32K
io_uring: make sqpoll wakeup possible with getevents
io_uring: extend async work merging
io_uring: limit parallelism of buffered writes
io_uring: add io_queue_async_work() helper
io_uring: optimize submit_and_wait API
io_uring: add support for link with drain
io_uring: fix wrong sequence setting logic
io_uring: expose single mmap capability
io_uring: allocate the two rings together
fs/io_uring.c: convert put_page() to put_user_page*()

+355 -184
+348 -183
fs/io_uring.c
··· 75 75 76 76 #include "internal.h" 77 77 78 - #define IORING_MAX_ENTRIES 4096 78 + #define IORING_MAX_ENTRIES 32768 79 79 #define IORING_MAX_FIXED_FILES 1024 80 80 81 81 struct io_uring { ··· 84 84 }; 85 85 86 86 /* 87 - * This data is shared with the application through the mmap at offset 88 - * IORING_OFF_SQ_RING. 87 + * This data is shared with the application through the mmap at offsets 88 + * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING. 89 89 * 90 90 * The offsets to the member fields are published through struct 91 91 * io_sqring_offsets when calling io_uring_setup. 92 92 */ 93 - struct io_sq_ring { 93 + struct io_rings { 94 94 /* 95 95 * Head and tail offsets into the ring; the offsets need to be 96 96 * masked to get valid indices. 97 97 * 98 - * The kernel controls head and the application controls tail. 98 + * The kernel controls head of the sq ring and the tail of the cq ring, 99 + * and the application controls tail of the sq ring and the head of the 100 + * cq ring. 99 101 */ 100 - struct io_uring r; 102 + struct io_uring sq, cq; 101 103 /* 102 - * Bitmask to apply to head and tail offsets (constant, equals 104 + * Bitmasks to apply to head and tail offsets (constant, equals 103 105 * ring_entries - 1) 104 106 */ 105 - u32 ring_mask; 106 - /* Ring size (constant, power of 2) */ 107 - u32 ring_entries; 107 + u32 sq_ring_mask, cq_ring_mask; 108 + /* Ring sizes (constant, power of 2) */ 109 + u32 sq_ring_entries, cq_ring_entries; 108 110 /* 109 111 * Number of invalid entries dropped by the kernel due to 110 112 * invalid index stored in array ··· 119 117 * counter includes all submissions that were dropped reaching 120 118 * the new SQ head (and possibly more). 121 119 */ 122 - u32 dropped; 120 + u32 sq_dropped; 123 121 /* 124 122 * Runtime flags 125 123 * ··· 129 127 * The application needs a full memory barrier before checking 130 128 * for IORING_SQ_NEED_WAKEUP after updating the sq tail. 131 129 */ 132 - u32 flags; 133 - /* 134 - * Ring buffer of indices into array of io_uring_sqe, which is 135 - * mmapped by the application using the IORING_OFF_SQES offset. 136 - * 137 - * This indirection could e.g. be used to assign fixed 138 - * io_uring_sqe entries to operations and only submit them to 139 - * the queue when needed. 140 - * 141 - * The kernel modifies neither the indices array nor the entries 142 - * array. 143 - */ 144 - u32 array[]; 145 - }; 146 - 147 - /* 148 - * This data is shared with the application through the mmap at offset 149 - * IORING_OFF_CQ_RING. 150 - * 151 - * The offsets to the member fields are published through struct 152 - * io_cqring_offsets when calling io_uring_setup. 153 - */ 154 - struct io_cq_ring { 155 - /* 156 - * Head and tail offsets into the ring; the offsets need to be 157 - * masked to get valid indices. 158 - * 159 - * The application controls head and the kernel tail. 160 - */ 161 - struct io_uring r; 162 - /* 163 - * Bitmask to apply to head and tail offsets (constant, equals 164 - * ring_entries - 1) 165 - */ 166 - u32 ring_mask; 167 - /* Ring size (constant, power of 2) */ 168 - u32 ring_entries; 130 + u32 sq_flags; 169 131 /* 170 132 * Number of completion events lost because the queue was full; 171 133 * this should be avoided by the application by making sure ··· 143 177 * As completion events come in out of order this counter is not 144 178 * ordered with any other data. 145 179 */ 146 - u32 overflow; 180 + u32 cq_overflow; 147 181 /* 148 182 * Ring buffer of completion events. 149 183 * ··· 151 185 * produced, so the application is allowed to modify pending 152 186 * entries. 153 187 */ 154 - struct io_uring_cqe cqes[]; 188 + struct io_uring_cqe cqes[] ____cacheline_aligned_in_smp; 155 189 }; 156 190 157 191 struct io_mapped_ubuf { ··· 167 201 struct list_head list; 168 202 169 203 struct file *file; 170 - off_t io_end; 204 + off_t io_start; 171 205 size_t io_len; 172 206 }; 173 207 ··· 181 215 bool compat; 182 216 bool account_mem; 183 217 184 - /* SQ ring */ 185 - struct io_sq_ring *sq_ring; 218 + /* 219 + * Ring buffer of indices into array of io_uring_sqe, which is 220 + * mmapped by the application using the IORING_OFF_SQES offset. 221 + * 222 + * This indirection could e.g. be used to assign fixed 223 + * io_uring_sqe entries to operations and only submit them to 224 + * the queue when needed. 225 + * 226 + * The kernel modifies neither the indices array nor the entries 227 + * array. 228 + */ 229 + u32 *sq_array; 186 230 unsigned cached_sq_head; 187 231 unsigned sq_entries; 188 232 unsigned sq_mask; ··· 203 227 } ____cacheline_aligned_in_smp; 204 228 205 229 /* IO offload */ 206 - struct workqueue_struct *sqo_wq; 230 + struct workqueue_struct *sqo_wq[2]; 207 231 struct task_struct *sqo_thread; /* if using sq thread polling */ 208 232 struct mm_struct *sqo_mm; 209 233 wait_queue_head_t sqo_wait; 210 234 struct completion sqo_thread_started; 211 235 212 236 struct { 213 - /* CQ ring */ 214 - struct io_cq_ring *cq_ring; 215 237 unsigned cached_cq_tail; 216 238 unsigned cq_entries; 217 239 unsigned cq_mask; ··· 217 243 struct fasync_struct *cq_fasync; 218 244 struct eventfd_ctx *cq_ev_fd; 219 245 } ____cacheline_aligned_in_smp; 246 + 247 + struct io_rings *rings; 220 248 221 249 /* 222 250 * If used, fixed file set. Writers must ensure that ->refs is dead, ··· 264 288 struct sqe_submit { 265 289 const struct io_uring_sqe *sqe; 266 290 unsigned short index; 291 + u32 sequence; 267 292 bool has_user; 268 293 bool needs_lock; 269 294 bool needs_fixed_file; ··· 312 335 #define REQ_F_LINK 64 /* linked sqes */ 313 336 #define REQ_F_LINK_DONE 128 /* linked sqes done */ 314 337 #define REQ_F_FAIL_LINK 256 /* fail rest of links */ 338 + #define REQ_F_SHADOW_DRAIN 512 /* link-drain shadow req */ 315 339 u64 user_data; 316 340 u32 result; 317 341 u32 sequence; ··· 344 366 }; 345 367 346 368 static void io_sq_wq_submit_work(struct work_struct *work); 369 + static void __io_free_req(struct io_kiocb *req); 347 370 348 371 static struct kmem_cache *req_cachep; 349 372 ··· 409 430 if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) 410 431 return false; 411 432 412 - return req->sequence != ctx->cached_cq_tail + ctx->sq_ring->dropped; 433 + return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped; 413 434 } 414 435 415 436 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) ··· 430 451 431 452 static void __io_commit_cqring(struct io_ring_ctx *ctx) 432 453 { 433 - struct io_cq_ring *ring = ctx->cq_ring; 454 + struct io_rings *rings = ctx->rings; 434 455 435 - if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) { 456 + if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) { 436 457 /* order cqe stores with ring update */ 437 - smp_store_release(&ring->r.tail, ctx->cached_cq_tail); 458 + smp_store_release(&rings->cq.tail, ctx->cached_cq_tail); 438 459 439 460 if (wq_has_sleeper(&ctx->cq_wait)) { 440 461 wake_up_interruptible(&ctx->cq_wait); 441 462 kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); 442 463 } 443 464 } 465 + } 466 + 467 + static inline void io_queue_async_work(struct io_ring_ctx *ctx, 468 + struct io_kiocb *req) 469 + { 470 + int rw; 471 + 472 + switch (req->submit.sqe->opcode) { 473 + case IORING_OP_WRITEV: 474 + case IORING_OP_WRITE_FIXED: 475 + rw = !(req->rw.ki_flags & IOCB_DIRECT); 476 + break; 477 + default: 478 + rw = 0; 479 + break; 480 + } 481 + 482 + queue_work(ctx->sqo_wq[rw], &req->work); 444 483 } 445 484 446 485 static void io_commit_cqring(struct io_ring_ctx *ctx) ··· 468 471 __io_commit_cqring(ctx); 469 472 470 473 while ((req = io_get_deferred_req(ctx)) != NULL) { 474 + if (req->flags & REQ_F_SHADOW_DRAIN) { 475 + /* Just for drain, free it. */ 476 + __io_free_req(req); 477 + continue; 478 + } 471 479 req->flags |= REQ_F_IO_DRAINED; 472 - queue_work(ctx->sqo_wq, &req->work); 480 + io_queue_async_work(ctx, req); 473 481 } 474 482 } 475 483 476 484 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) 477 485 { 478 - struct io_cq_ring *ring = ctx->cq_ring; 486 + struct io_rings *rings = ctx->rings; 479 487 unsigned tail; 480 488 481 489 tail = ctx->cached_cq_tail; ··· 489 487 * control dependency is enough as we're using WRITE_ONCE to 490 488 * fill the cq entry 491 489 */ 492 - if (tail - READ_ONCE(ring->r.head) == ring->ring_entries) 490 + if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries) 493 491 return NULL; 494 492 495 493 ctx->cached_cq_tail++; 496 - return &ring->cqes[tail & ctx->cq_mask]; 494 + return &rings->cqes[tail & ctx->cq_mask]; 497 495 } 498 496 499 497 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, ··· 512 510 WRITE_ONCE(cqe->res, res); 513 511 WRITE_ONCE(cqe->flags, 0); 514 512 } else { 515 - unsigned overflow = READ_ONCE(ctx->cq_ring->overflow); 513 + unsigned overflow = READ_ONCE(ctx->rings->cq_overflow); 516 514 517 - WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1); 515 + WRITE_ONCE(ctx->rings->cq_overflow, overflow + 1); 518 516 } 519 517 } 520 518 ··· 637 635 638 636 nxt->flags |= REQ_F_LINK_DONE; 639 637 INIT_WORK(&nxt->work, io_sq_wq_submit_work); 640 - queue_work(req->ctx->sqo_wq, &nxt->work); 638 + io_queue_async_work(req->ctx, nxt); 641 639 } 642 640 } 643 641 ··· 681 679 io_free_req(req); 682 680 } 683 681 684 - static unsigned io_cqring_events(struct io_cq_ring *ring) 682 + static unsigned io_cqring_events(struct io_rings *rings) 685 683 { 686 684 /* See comment at the top of this file */ 687 685 smp_rmb(); 688 - return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head); 686 + return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head); 689 687 } 690 688 691 689 /* ··· 838 836 * If we do, we can potentially be spinning for commands that 839 837 * already triggered a CQE (eg in error). 840 838 */ 841 - if (io_cqring_events(ctx->cq_ring)) 839 + if (io_cqring_events(ctx->rings)) 842 840 break; 843 841 844 842 /* ··· 1189 1187 return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); 1190 1188 } 1191 1189 1190 + static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb) 1191 + { 1192 + if (al->file == kiocb->ki_filp) { 1193 + off_t start, end; 1194 + 1195 + /* 1196 + * Allow merging if we're anywhere in the range of the same 1197 + * page. Generally this happens for sub-page reads or writes, 1198 + * and it's beneficial to allow the first worker to bring the 1199 + * page in and the piggy backed work can then work on the 1200 + * cached page. 1201 + */ 1202 + start = al->io_start & PAGE_MASK; 1203 + end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK; 1204 + if (kiocb->ki_pos >= start && kiocb->ki_pos <= end) 1205 + return true; 1206 + } 1207 + 1208 + al->file = NULL; 1209 + return false; 1210 + } 1211 + 1192 1212 /* 1193 1213 * Make a note of the last file/offset/direction we punted to async 1194 1214 * context. We'll use this information to see if we can piggy back a ··· 1222 1198 struct async_list *async_list = &req->ctx->pending_async[rw]; 1223 1199 struct kiocb *kiocb = &req->rw; 1224 1200 struct file *filp = kiocb->ki_filp; 1225 - off_t io_end = kiocb->ki_pos + len; 1226 1201 1227 - if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) { 1202 + if (io_should_merge(async_list, kiocb)) { 1228 1203 unsigned long max_bytes; 1229 1204 1230 1205 /* Use 8x RA size as a decent limiter for both reads/writes */ ··· 1236 1213 req->flags |= REQ_F_SEQ_PREV; 1237 1214 async_list->io_len += len; 1238 1215 } else { 1239 - io_end = 0; 1240 - async_list->io_len = 0; 1216 + async_list->file = NULL; 1241 1217 } 1242 1218 } 1243 1219 1244 1220 /* New file? Reset state. */ 1245 1221 if (async_list->file != filp) { 1246 - async_list->io_len = 0; 1222 + async_list->io_start = kiocb->ki_pos; 1223 + async_list->io_len = len; 1247 1224 async_list->file = filp; 1248 1225 } 1249 - async_list->io_end = io_end; 1250 1226 } 1251 1227 1252 1228 static int io_read(struct io_kiocb *req, const struct sqe_submit *s, ··· 1557 1535 WRITE_ONCE(poll->canceled, true); 1558 1536 if (!list_empty(&poll->wait.entry)) { 1559 1537 list_del_init(&poll->wait.entry); 1560 - queue_work(req->ctx->sqo_wq, &req->work); 1538 + io_queue_async_work(req->ctx, req); 1561 1539 } 1562 1540 spin_unlock(&poll->head->lock); 1563 1541 ··· 1671 1649 io_cqring_ev_posted(ctx); 1672 1650 io_put_req(req); 1673 1651 } else { 1674 - queue_work(ctx->sqo_wq, &req->work); 1652 + io_queue_async_work(ctx, req); 1675 1653 } 1676 1654 1677 1655 return 1; ··· 2014 1992 */ 2015 1993 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req) 2016 1994 { 2017 - bool ret = false; 1995 + bool ret; 2018 1996 2019 1997 if (!list) 2020 1998 return false; ··· 2060 2038 flags = READ_ONCE(s->sqe->flags); 2061 2039 fd = READ_ONCE(s->sqe->fd); 2062 2040 2063 - if (flags & IOSQE_IO_DRAIN) { 2041 + if (flags & IOSQE_IO_DRAIN) 2064 2042 req->flags |= REQ_F_IO_DRAIN; 2065 - req->sequence = ctx->cached_sq_head - 1; 2066 - } 2043 + /* 2044 + * All io need record the previous position, if LINK vs DARIN, 2045 + * it can be used to mark the position of the first IO in the 2046 + * link list. 2047 + */ 2048 + req->sequence = s->sequence; 2067 2049 2068 2050 if (!io_op_needs_file(s->sqe)) 2069 2051 return 0; ··· 2089 2063 return 0; 2090 2064 } 2091 2065 2092 - static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 2093 - struct sqe_submit *s) 2066 + static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 2067 + struct sqe_submit *s, bool force_nonblock) 2094 2068 { 2095 2069 int ret; 2096 2070 2097 - ret = io_req_defer(ctx, req, s->sqe); 2098 - if (ret) { 2099 - if (ret != -EIOCBQUEUED) { 2100 - io_free_req(req); 2101 - io_cqring_add_event(ctx, s->sqe->user_data, ret); 2102 - } 2103 - return 0; 2104 - } 2105 - 2106 - ret = __io_submit_sqe(ctx, req, s, true); 2071 + ret = __io_submit_sqe(ctx, req, s, force_nonblock); 2107 2072 if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) { 2108 2073 struct io_uring_sqe *sqe_copy; 2109 2074 ··· 2111 2094 if (list) 2112 2095 atomic_inc(&list->cnt); 2113 2096 INIT_WORK(&req->work, io_sq_wq_submit_work); 2114 - queue_work(ctx->sqo_wq, &req->work); 2097 + io_queue_async_work(ctx, req); 2115 2098 } 2116 2099 2117 2100 /* ··· 2136 2119 return ret; 2137 2120 } 2138 2121 2122 + static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 2123 + struct sqe_submit *s, bool force_nonblock) 2124 + { 2125 + int ret; 2126 + 2127 + ret = io_req_defer(ctx, req, s->sqe); 2128 + if (ret) { 2129 + if (ret != -EIOCBQUEUED) { 2130 + io_free_req(req); 2131 + io_cqring_add_event(ctx, s->sqe->user_data, ret); 2132 + } 2133 + return 0; 2134 + } 2135 + 2136 + return __io_queue_sqe(ctx, req, s, force_nonblock); 2137 + } 2138 + 2139 + static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, 2140 + struct sqe_submit *s, struct io_kiocb *shadow, 2141 + bool force_nonblock) 2142 + { 2143 + int ret; 2144 + int need_submit = false; 2145 + 2146 + if (!shadow) 2147 + return io_queue_sqe(ctx, req, s, force_nonblock); 2148 + 2149 + /* 2150 + * Mark the first IO in link list as DRAIN, let all the following 2151 + * IOs enter the defer list. all IO needs to be completed before link 2152 + * list. 2153 + */ 2154 + req->flags |= REQ_F_IO_DRAIN; 2155 + ret = io_req_defer(ctx, req, s->sqe); 2156 + if (ret) { 2157 + if (ret != -EIOCBQUEUED) { 2158 + io_free_req(req); 2159 + io_cqring_add_event(ctx, s->sqe->user_data, ret); 2160 + return 0; 2161 + } 2162 + } else { 2163 + /* 2164 + * If ret == 0 means that all IOs in front of link io are 2165 + * running done. let's queue link head. 2166 + */ 2167 + need_submit = true; 2168 + } 2169 + 2170 + /* Insert shadow req to defer_list, blocking next IOs */ 2171 + spin_lock_irq(&ctx->completion_lock); 2172 + list_add_tail(&shadow->list, &ctx->defer_list); 2173 + spin_unlock_irq(&ctx->completion_lock); 2174 + 2175 + if (need_submit) 2176 + return __io_queue_sqe(ctx, req, s, force_nonblock); 2177 + 2178 + return 0; 2179 + } 2180 + 2139 2181 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK) 2140 2182 2141 2183 static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, 2142 - struct io_submit_state *state, struct io_kiocb **link) 2184 + struct io_submit_state *state, struct io_kiocb **link, 2185 + bool force_nonblock) 2143 2186 { 2144 2187 struct io_uring_sqe *sqe_copy; 2145 2188 struct io_kiocb *req; ··· 2252 2175 INIT_LIST_HEAD(&req->link_list); 2253 2176 *link = req; 2254 2177 } else { 2255 - io_queue_sqe(ctx, req, s); 2178 + io_queue_sqe(ctx, req, s, force_nonblock); 2256 2179 } 2257 2180 } 2258 2181 ··· 2282 2205 2283 2206 static void io_commit_sqring(struct io_ring_ctx *ctx) 2284 2207 { 2285 - struct io_sq_ring *ring = ctx->sq_ring; 2208 + struct io_rings *rings = ctx->rings; 2286 2209 2287 - if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) { 2210 + if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) { 2288 2211 /* 2289 2212 * Ensure any loads from the SQEs are done at this point, 2290 2213 * since once we write the new head, the application could 2291 2214 * write new data to them. 2292 2215 */ 2293 - smp_store_release(&ring->r.head, ctx->cached_sq_head); 2216 + smp_store_release(&rings->sq.head, ctx->cached_sq_head); 2294 2217 } 2295 2218 } 2296 2219 ··· 2304 2227 */ 2305 2228 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) 2306 2229 { 2307 - struct io_sq_ring *ring = ctx->sq_ring; 2230 + struct io_rings *rings = ctx->rings; 2231 + u32 *sq_array = ctx->sq_array; 2308 2232 unsigned head; 2309 2233 2310 2234 /* ··· 2318 2240 */ 2319 2241 head = ctx->cached_sq_head; 2320 2242 /* make sure SQ entry isn't read before tail */ 2321 - if (head == smp_load_acquire(&ring->r.tail)) 2243 + if (head == smp_load_acquire(&rings->sq.tail)) 2322 2244 return false; 2323 2245 2324 - head = READ_ONCE(ring->array[head & ctx->sq_mask]); 2246 + head = READ_ONCE(sq_array[head & ctx->sq_mask]); 2325 2247 if (head < ctx->sq_entries) { 2326 2248 s->index = head; 2327 2249 s->sqe = &ctx->sq_sqes[head]; 2250 + s->sequence = ctx->cached_sq_head; 2328 2251 ctx->cached_sq_head++; 2329 2252 return true; 2330 2253 } 2331 2254 2332 2255 /* drop invalid entries */ 2333 2256 ctx->cached_sq_head++; 2334 - ring->dropped++; 2257 + rings->sq_dropped++; 2335 2258 return false; 2336 2259 } 2337 2260 ··· 2341 2262 { 2342 2263 struct io_submit_state state, *statep = NULL; 2343 2264 struct io_kiocb *link = NULL; 2265 + struct io_kiocb *shadow_req = NULL; 2344 2266 bool prev_was_link = false; 2345 2267 int i, submitted = 0; 2346 2268 ··· 2356 2276 * that's the end of the chain. Submit the previous link. 2357 2277 */ 2358 2278 if (!prev_was_link && link) { 2359 - io_queue_sqe(ctx, link, &link->submit); 2279 + io_queue_link_head(ctx, link, &link->submit, shadow_req, 2280 + true); 2360 2281 link = NULL; 2361 2282 } 2362 2283 prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0; 2284 + 2285 + if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) { 2286 + if (!shadow_req) { 2287 + shadow_req = io_get_req(ctx, NULL); 2288 + shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); 2289 + refcount_dec(&shadow_req->refs); 2290 + } 2291 + shadow_req->sequence = sqes[i].sequence; 2292 + } 2363 2293 2364 2294 if (unlikely(mm_fault)) { 2365 2295 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ··· 2378 2288 sqes[i].has_user = has_user; 2379 2289 sqes[i].needs_lock = true; 2380 2290 sqes[i].needs_fixed_file = true; 2381 - io_submit_sqe(ctx, &sqes[i], statep, &link); 2291 + io_submit_sqe(ctx, &sqes[i], statep, &link, true); 2382 2292 submitted++; 2383 2293 } 2384 2294 } 2385 2295 2386 2296 if (link) 2387 - io_queue_sqe(ctx, link, &link->submit); 2297 + io_queue_link_head(ctx, link, &link->submit, shadow_req, true); 2388 2298 if (statep) 2389 2299 io_submit_state_end(&state); 2390 2300 ··· 2456 2366 TASK_INTERRUPTIBLE); 2457 2367 2458 2368 /* Tell userspace we may need a wakeup call */ 2459 - ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP; 2369 + ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP; 2460 2370 /* make sure to read SQ tail after writing flags */ 2461 2371 smp_mb(); 2462 2372 ··· 2470 2380 schedule(); 2471 2381 finish_wait(&ctx->sqo_wait, &wait); 2472 2382 2473 - ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; 2383 + ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP; 2474 2384 continue; 2475 2385 } 2476 2386 finish_wait(&ctx->sqo_wait, &wait); 2477 2387 2478 - ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; 2388 + ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP; 2479 2389 } 2480 2390 2481 2391 i = 0; ··· 2516 2426 return 0; 2517 2427 } 2518 2428 2519 - static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) 2429 + static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit, 2430 + bool block_for_last) 2520 2431 { 2521 2432 struct io_submit_state state, *statep = NULL; 2522 2433 struct io_kiocb *link = NULL; 2434 + struct io_kiocb *shadow_req = NULL; 2523 2435 bool prev_was_link = false; 2524 2436 int i, submit = 0; 2525 2437 ··· 2531 2439 } 2532 2440 2533 2441 for (i = 0; i < to_submit; i++) { 2442 + bool force_nonblock = true; 2534 2443 struct sqe_submit s; 2535 2444 2536 2445 if (!io_get_sqring(ctx, &s)) ··· 2542 2449 * that's the end of the chain. Submit the previous link. 2543 2450 */ 2544 2451 if (!prev_was_link && link) { 2545 - io_queue_sqe(ctx, link, &link->submit); 2452 + io_queue_link_head(ctx, link, &link->submit, shadow_req, 2453 + force_nonblock); 2546 2454 link = NULL; 2547 2455 } 2548 2456 prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0; 2457 + 2458 + if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) { 2459 + if (!shadow_req) { 2460 + shadow_req = io_get_req(ctx, NULL); 2461 + shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); 2462 + refcount_dec(&shadow_req->refs); 2463 + } 2464 + shadow_req->sequence = s.sequence; 2465 + } 2549 2466 2550 2467 s.has_user = true; 2551 2468 s.needs_lock = false; 2552 2469 s.needs_fixed_file = false; 2553 2470 submit++; 2554 - io_submit_sqe(ctx, &s, statep, &link); 2471 + 2472 + /* 2473 + * The caller will block for events after submit, submit the 2474 + * last IO non-blocking. This is either the only IO it's 2475 + * submitting, or it already submitted the previous ones. This 2476 + * improves performance by avoiding an async punt that we don't 2477 + * need to do. 2478 + */ 2479 + if (block_for_last && submit == to_submit) 2480 + force_nonblock = false; 2481 + 2482 + io_submit_sqe(ctx, &s, statep, &link, force_nonblock); 2555 2483 } 2556 2484 io_commit_sqring(ctx); 2557 2485 2558 2486 if (link) 2559 - io_queue_sqe(ctx, link, &link->submit); 2487 + io_queue_link_head(ctx, link, &link->submit, shadow_req, 2488 + block_for_last); 2560 2489 if (statep) 2561 2490 io_submit_state_end(statep); 2562 2491 ··· 2592 2477 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, 2593 2478 const sigset_t __user *sig, size_t sigsz) 2594 2479 { 2595 - struct io_cq_ring *ring = ctx->cq_ring; 2480 + struct io_rings *rings = ctx->rings; 2596 2481 int ret; 2597 2482 2598 - if (io_cqring_events(ring) >= min_events) 2483 + if (io_cqring_events(rings) >= min_events) 2599 2484 return 0; 2600 2485 2601 2486 if (sig) { ··· 2611 2496 return ret; 2612 2497 } 2613 2498 2614 - ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events); 2499 + ret = wait_event_interruptible(ctx->wait, io_cqring_events(rings) >= min_events); 2615 2500 restore_saved_sigmask_unless(ret == -ERESTARTSYS); 2616 2501 if (ret == -ERESTARTSYS) 2617 2502 ret = -EINTR; 2618 2503 2619 - return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0; 2504 + return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0; 2620 2505 } 2621 2506 2622 2507 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx) ··· 2666 2551 2667 2552 static void io_finish_async(struct io_ring_ctx *ctx) 2668 2553 { 2554 + int i; 2555 + 2669 2556 io_sq_thread_stop(ctx); 2670 2557 2671 - if (ctx->sqo_wq) { 2672 - destroy_workqueue(ctx->sqo_wq); 2673 - ctx->sqo_wq = NULL; 2558 + for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) { 2559 + if (ctx->sqo_wq[i]) { 2560 + destroy_workqueue(ctx->sqo_wq[i]); 2561 + ctx->sqo_wq[i] = NULL; 2562 + } 2674 2563 } 2675 2564 } 2676 2565 ··· 2882 2763 } 2883 2764 2884 2765 /* Do QD, or 2 * CPUS, whatever is smallest */ 2885 - ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE, 2766 + ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq", 2767 + WQ_UNBOUND | WQ_FREEZABLE, 2886 2768 min(ctx->sq_entries - 1, 2 * num_online_cpus())); 2887 - if (!ctx->sqo_wq) { 2769 + if (!ctx->sqo_wq[0]) { 2770 + ret = -ENOMEM; 2771 + goto err; 2772 + } 2773 + 2774 + /* 2775 + * This is for buffered writes, where we want to limit the parallelism 2776 + * due to file locking in file systems. As "normal" buffered writes 2777 + * should parellelize on writeout quite nicely, limit us to having 2 2778 + * pending. This avoids massive contention on the inode when doing 2779 + * buffered async writes. 2780 + */ 2781 + ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq", 2782 + WQ_UNBOUND | WQ_FREEZABLE, 2); 2783 + if (!ctx->sqo_wq[1]) { 2888 2784 ret = -ENOMEM; 2889 2785 goto err; 2890 2786 } 2891 2787 2892 2788 return 0; 2893 2789 err: 2894 - io_sq_thread_stop(ctx); 2790 + io_finish_async(ctx); 2895 2791 mmdrop(ctx->sqo_mm); 2896 2792 ctx->sqo_mm = NULL; 2897 2793 return ret; ··· 2955 2821 return (void *) __get_free_pages(gfp_flags, get_order(size)); 2956 2822 } 2957 2823 2824 + static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries, 2825 + size_t *sq_offset) 2826 + { 2827 + struct io_rings *rings; 2828 + size_t off, sq_array_size; 2829 + 2830 + off = struct_size(rings, cqes, cq_entries); 2831 + if (off == SIZE_MAX) 2832 + return SIZE_MAX; 2833 + 2834 + #ifdef CONFIG_SMP 2835 + off = ALIGN(off, SMP_CACHE_BYTES); 2836 + if (off == 0) 2837 + return SIZE_MAX; 2838 + #endif 2839 + 2840 + sq_array_size = array_size(sizeof(u32), sq_entries); 2841 + if (sq_array_size == SIZE_MAX) 2842 + return SIZE_MAX; 2843 + 2844 + if (check_add_overflow(off, sq_array_size, &off)) 2845 + return SIZE_MAX; 2846 + 2847 + if (sq_offset) 2848 + *sq_offset = off; 2849 + 2850 + return off; 2851 + } 2852 + 2958 2853 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries) 2959 2854 { 2960 - struct io_sq_ring *sq_ring; 2961 - struct io_cq_ring *cq_ring; 2962 - size_t bytes; 2855 + size_t pages; 2963 2856 2964 - bytes = struct_size(sq_ring, array, sq_entries); 2965 - bytes += array_size(sizeof(struct io_uring_sqe), sq_entries); 2966 - bytes += struct_size(cq_ring, cqes, cq_entries); 2857 + pages = (size_t)1 << get_order( 2858 + rings_size(sq_entries, cq_entries, NULL)); 2859 + pages += (size_t)1 << get_order( 2860 + array_size(sizeof(struct io_uring_sqe), sq_entries)); 2967 2861 2968 - return (bytes + PAGE_SIZE - 1) / PAGE_SIZE; 2862 + return pages; 2969 2863 } 2970 2864 2971 2865 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) ··· 3007 2845 struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 3008 2846 3009 2847 for (j = 0; j < imu->nr_bvecs; j++) 3010 - put_page(imu->bvec[j].bv_page); 2848 + put_user_page(imu->bvec[j].bv_page); 3011 2849 3012 2850 if (ctx->account_mem) 3013 2851 io_unaccount_mem(ctx->user, imu->nr_bvecs); ··· 3151 2989 * if we did partial map, or found file backed vmas, 3152 2990 * release any pages we did get 3153 2991 */ 3154 - if (pret > 0) { 3155 - for (j = 0; j < pret; j++) 3156 - put_page(pages[j]); 3157 - } 2992 + if (pret > 0) 2993 + put_user_pages(pages, pret); 3158 2994 if (ctx->account_mem) 3159 2995 io_unaccount_mem(ctx->user, nr_pages); 3160 2996 kvfree(imu->bvec); ··· 3238 3078 } 3239 3079 #endif 3240 3080 3241 - io_mem_free(ctx->sq_ring); 3081 + io_mem_free(ctx->rings); 3242 3082 io_mem_free(ctx->sq_sqes); 3243 - io_mem_free(ctx->cq_ring); 3244 3083 3245 3084 percpu_ref_exit(&ctx->refs); 3246 3085 if (ctx->account_mem) ··· 3260 3101 * io_commit_cqring 3261 3102 */ 3262 3103 smp_rmb(); 3263 - if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head != 3264 - ctx->sq_ring->ring_entries) 3104 + if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head != 3105 + ctx->rings->sq_ring_entries) 3265 3106 mask |= EPOLLOUT | EPOLLWRNORM; 3266 - if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail) 3107 + if (READ_ONCE(ctx->rings->sq.head) != ctx->cached_cq_tail) 3267 3108 mask |= EPOLLIN | EPOLLRDNORM; 3268 3109 3269 3110 return mask; ··· 3308 3149 3309 3150 switch (offset) { 3310 3151 case IORING_OFF_SQ_RING: 3311 - ptr = ctx->sq_ring; 3152 + case IORING_OFF_CQ_RING: 3153 + ptr = ctx->rings; 3312 3154 break; 3313 3155 case IORING_OFF_SQES: 3314 3156 ptr = ctx->sq_sqes; 3315 - break; 3316 - case IORING_OFF_CQ_RING: 3317 - ptr = ctx->cq_ring; 3318 3157 break; 3319 3158 default: 3320 3159 return -EINVAL; ··· 3356 3199 * Just return the requested submit count, and wake the thread if 3357 3200 * we were asked to. 3358 3201 */ 3202 + ret = 0; 3359 3203 if (ctx->flags & IORING_SETUP_SQPOLL) { 3360 3204 if (flags & IORING_ENTER_SQ_WAKEUP) 3361 3205 wake_up(&ctx->sqo_wait); 3362 3206 submitted = to_submit; 3363 - goto out_ctx; 3364 - } 3207 + } else if (to_submit) { 3208 + bool block_for_last = false; 3365 3209 3366 - ret = 0; 3367 - if (to_submit) { 3368 3210 to_submit = min(to_submit, ctx->sq_entries); 3369 3211 3212 + /* 3213 + * Allow last submission to block in a series, IFF the caller 3214 + * asked to wait for events and we don't currently have 3215 + * enough. This potentially avoids an async punt. 3216 + */ 3217 + if (to_submit == min_complete && 3218 + io_cqring_events(ctx->rings) < min_complete) 3219 + block_for_last = true; 3220 + 3370 3221 mutex_lock(&ctx->uring_lock); 3371 - submitted = io_ring_submit(ctx, to_submit); 3222 + submitted = io_ring_submit(ctx, to_submit, block_for_last); 3372 3223 mutex_unlock(&ctx->uring_lock); 3373 3224 } 3374 3225 if (flags & IORING_ENTER_GETEVENTS) { ··· 3391 3226 } 3392 3227 } 3393 3228 3394 - out_ctx: 3395 3229 io_ring_drop_ctx_refs(ctx, 1); 3396 3230 out_fput: 3397 3231 fdput(f); ··· 3407 3243 static int io_allocate_scq_urings(struct io_ring_ctx *ctx, 3408 3244 struct io_uring_params *p) 3409 3245 { 3410 - struct io_sq_ring *sq_ring; 3411 - struct io_cq_ring *cq_ring; 3412 - size_t size; 3246 + struct io_rings *rings; 3247 + size_t size, sq_array_offset; 3413 3248 3414 - sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries)); 3415 - if (!sq_ring) 3249 + size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset); 3250 + if (size == SIZE_MAX) 3251 + return -EOVERFLOW; 3252 + 3253 + rings = io_mem_alloc(size); 3254 + if (!rings) 3416 3255 return -ENOMEM; 3417 3256 3418 - ctx->sq_ring = sq_ring; 3419 - sq_ring->ring_mask = p->sq_entries - 1; 3420 - sq_ring->ring_entries = p->sq_entries; 3421 - ctx->sq_mask = sq_ring->ring_mask; 3422 - ctx->sq_entries = sq_ring->ring_entries; 3257 + ctx->rings = rings; 3258 + ctx->sq_array = (u32 *)((char *)rings + sq_array_offset); 3259 + rings->sq_ring_mask = p->sq_entries - 1; 3260 + rings->cq_ring_mask = p->cq_entries - 1; 3261 + rings->sq_ring_entries = p->sq_entries; 3262 + rings->cq_ring_entries = p->cq_entries; 3263 + ctx->sq_mask = rings->sq_ring_mask; 3264 + ctx->cq_mask = rings->cq_ring_mask; 3265 + ctx->sq_entries = rings->sq_ring_entries; 3266 + ctx->cq_entries = rings->cq_ring_entries; 3423 3267 3424 3268 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); 3425 3269 if (size == SIZE_MAX) ··· 3437 3265 if (!ctx->sq_sqes) 3438 3266 return -ENOMEM; 3439 3267 3440 - cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries)); 3441 - if (!cq_ring) 3442 - return -ENOMEM; 3443 - 3444 - ctx->cq_ring = cq_ring; 3445 - cq_ring->ring_mask = p->cq_entries - 1; 3446 - cq_ring->ring_entries = p->cq_entries; 3447 - ctx->cq_mask = cq_ring->ring_mask; 3448 - ctx->cq_entries = cq_ring->ring_entries; 3449 3268 return 0; 3450 3269 } 3451 3270 ··· 3540 3377 goto err; 3541 3378 3542 3379 memset(&p->sq_off, 0, sizeof(p->sq_off)); 3543 - p->sq_off.head = offsetof(struct io_sq_ring, r.head); 3544 - p->sq_off.tail = offsetof(struct io_sq_ring, r.tail); 3545 - p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask); 3546 - p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries); 3547 - p->sq_off.flags = offsetof(struct io_sq_ring, flags); 3548 - p->sq_off.dropped = offsetof(struct io_sq_ring, dropped); 3549 - p->sq_off.array = offsetof(struct io_sq_ring, array); 3380 + p->sq_off.head = offsetof(struct io_rings, sq.head); 3381 + p->sq_off.tail = offsetof(struct io_rings, sq.tail); 3382 + p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask); 3383 + p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries); 3384 + p->sq_off.flags = offsetof(struct io_rings, sq_flags); 3385 + p->sq_off.dropped = offsetof(struct io_rings, sq_dropped); 3386 + p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings; 3550 3387 3551 3388 memset(&p->cq_off, 0, sizeof(p->cq_off)); 3552 - p->cq_off.head = offsetof(struct io_cq_ring, r.head); 3553 - p->cq_off.tail = offsetof(struct io_cq_ring, r.tail); 3554 - p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask); 3555 - p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries); 3556 - p->cq_off.overflow = offsetof(struct io_cq_ring, overflow); 3557 - p->cq_off.cqes = offsetof(struct io_cq_ring, cqes); 3389 + p->cq_off.head = offsetof(struct io_rings, cq.head); 3390 + p->cq_off.tail = offsetof(struct io_rings, cq.tail); 3391 + p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask); 3392 + p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries); 3393 + p->cq_off.overflow = offsetof(struct io_rings, cq_overflow); 3394 + p->cq_off.cqes = offsetof(struct io_rings, cqes); 3395 + 3396 + p->features = IORING_FEAT_SINGLE_MMAP; 3558 3397 return ret; 3559 3398 err: 3560 3399 io_ring_ctx_wait_and_kill(ctx);
+7 -1
include/uapi/linux/io_uring.h
··· 128 128 __u32 flags; 129 129 __u32 sq_thread_cpu; 130 130 __u32 sq_thread_idle; 131 - __u32 resv[5]; 131 + __u32 features; 132 + __u32 resv[4]; 132 133 struct io_sqring_offsets sq_off; 133 134 struct io_cqring_offsets cq_off; 134 135 }; 136 + 137 + /* 138 + * io_uring_params->features flags 139 + */ 140 + #define IORING_FEAT_SINGLE_MMAP (1U << 0) 135 141 136 142 /* 137 143 * io_uring_register(2) opcodes and arguments