Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge tag 'io_uring-5.11-2021-01-10' of git://git.kernel.dk/linux-block

Pull io_uring fixes from Jens Axboe:
"A bit larger than I had hoped at this point, but it's all changes that
will be directed towards stable anyway. In detail:

- Fix a merge window regression on error return (Matthew)

- Remove useless variable declaration/assignment (Ye Bin)

- IOPOLL fixes (Pavel)

- Exit and cancelation fixes (Pavel)

- fasync lockdep complaint fix (Pavel)

- Ensure SQPOLL is synchronized with creator life time (Pavel)"

* tag 'io_uring-5.11-2021-01-10' of git://git.kernel.dk/linux-block:
io_uring: stop SQPOLL submit on creator's death
io_uring: add warn_once for io_uring_flush()
io_uring: inline io_uring_attempt_task_drop()
io_uring: io_rw_reissue lockdep annotations
io_uring: synchronise ev_posted() with waitqueues
io_uring: dont kill fasync under completion_lock
io_uring: trigger eventfd for IOPOLL
io_uring: Fix return value from alloc_fixed_file_ref_node
io_uring: Delete useless variable ‘id’ in io_prep_async_work
io_uring: cancel more aggressively in exit_work
io_uring: drop file refs after task cancel
io_uring: patch up IOPOLL overflow_flush sync
io_uring: synchronise IOPOLL on task_submit fail

+167 -89
+167 -89
fs/io_uring.c
··· 262 262 unsigned int drain_next: 1; 263 263 unsigned int eventfd_async: 1; 264 264 unsigned int restricted: 1; 265 + unsigned int sqo_dead: 1; 265 266 266 267 /* 267 268 * Ring buffer of indices into array of io_uring_sqe, which is ··· 993 992 ACCT_PINNED, 994 993 }; 995 994 995 + static void __io_uring_cancel_task_requests(struct io_ring_ctx *ctx, 996 + struct task_struct *task); 997 + 996 998 static void destroy_fixed_file_ref_node(struct fixed_file_ref_node *ref_node); 997 999 static struct fixed_file_ref_node *alloc_fixed_file_ref_node( 998 1000 struct io_ring_ctx *ctx); ··· 1346 1342 1347 1343 /* order cqe stores with ring update */ 1348 1344 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail); 1349 - 1350 - if (wq_has_sleeper(&ctx->cq_wait)) { 1351 - wake_up_interruptible(&ctx->cq_wait); 1352 - kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); 1353 - } 1354 1345 } 1355 1346 1356 1347 static void io_put_identity(struct io_uring_task *tctx, struct io_kiocb *req) ··· 1519 1520 { 1520 1521 const struct io_op_def *def = &io_op_defs[req->opcode]; 1521 1522 struct io_ring_ctx *ctx = req->ctx; 1522 - struct io_identity *id; 1523 1523 1524 1524 io_req_init_async(req); 1525 - id = req->work.identity; 1526 1525 1527 1526 if (req->flags & REQ_F_FORCE_ASYNC) 1528 1527 req->work.flags |= IO_WQ_WORK_CONCURRENT; ··· 1701 1704 1702 1705 static void io_cqring_ev_posted(struct io_ring_ctx *ctx) 1703 1706 { 1707 + /* see waitqueue_active() comment */ 1708 + smp_mb(); 1709 + 1704 1710 if (waitqueue_active(&ctx->wait)) 1705 1711 wake_up(&ctx->wait); 1706 1712 if (ctx->sq_data && waitqueue_active(&ctx->sq_data->wait)) 1707 1713 wake_up(&ctx->sq_data->wait); 1708 1714 if (io_should_trigger_evfd(ctx)) 1709 1715 eventfd_signal(ctx->cq_ev_fd, 1); 1716 + if (waitqueue_active(&ctx->cq_wait)) { 1717 + wake_up_interruptible(&ctx->cq_wait); 1718 + kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); 1719 + } 1720 + } 1721 + 1722 + static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx) 1723 + { 1724 + /* see waitqueue_active() comment */ 1725 + smp_mb(); 1726 + 1727 + if (ctx->flags & IORING_SETUP_SQPOLL) { 1728 + if (waitqueue_active(&ctx->wait)) 1729 + wake_up(&ctx->wait); 1730 + } 1731 + if (io_should_trigger_evfd(ctx)) 1732 + eventfd_signal(ctx->cq_ev_fd, 1); 1733 + if (waitqueue_active(&ctx->cq_wait)) { 1734 + wake_up_interruptible(&ctx->cq_wait); 1735 + kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); 1736 + } 1710 1737 } 1711 1738 1712 1739 /* Returns true if there are no backlogged entries after the flush */ 1713 - static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, 1714 - struct task_struct *tsk, 1715 - struct files_struct *files) 1740 + static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, 1741 + struct task_struct *tsk, 1742 + struct files_struct *files) 1716 1743 { 1717 1744 struct io_rings *rings = ctx->rings; 1718 1745 struct io_kiocb *req, *tmp; ··· 1787 1766 } 1788 1767 1789 1768 return all_flushed; 1769 + } 1770 + 1771 + static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, 1772 + struct task_struct *tsk, 1773 + struct files_struct *files) 1774 + { 1775 + if (test_bit(0, &ctx->cq_check_overflow)) { 1776 + /* iopoll syncs against uring_lock, not completion_lock */ 1777 + if (ctx->flags & IORING_SETUP_IOPOLL) 1778 + mutex_lock(&ctx->uring_lock); 1779 + __io_cqring_overflow_flush(ctx, force, tsk, files); 1780 + if (ctx->flags & IORING_SETUP_IOPOLL) 1781 + mutex_unlock(&ctx->uring_lock); 1782 + } 1790 1783 } 1791 1784 1792 1785 static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags) ··· 2162 2127 { 2163 2128 struct io_ring_ctx *ctx = req->ctx; 2164 2129 2165 - if (!__io_sq_thread_acquire_mm(ctx) && 2166 - !__io_sq_thread_acquire_files(ctx)) { 2167 - mutex_lock(&ctx->uring_lock); 2130 + mutex_lock(&ctx->uring_lock); 2131 + if (!ctx->sqo_dead && 2132 + !__io_sq_thread_acquire_mm(ctx) && 2133 + !__io_sq_thread_acquire_files(ctx)) 2168 2134 __io_queue_sqe(req, NULL); 2169 - mutex_unlock(&ctx->uring_lock); 2170 - } else { 2135 + else 2171 2136 __io_req_task_cancel(req, -EFAULT); 2172 - } 2137 + mutex_unlock(&ctx->uring_lock); 2173 2138 } 2174 2139 2175 2140 static void io_req_task_submit(struct callback_head *cb) ··· 2348 2313 io_free_req(req); 2349 2314 } 2350 2315 2351 - static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush) 2316 + static unsigned io_cqring_events(struct io_ring_ctx *ctx) 2352 2317 { 2353 - if (test_bit(0, &ctx->cq_check_overflow)) { 2354 - /* 2355 - * noflush == true is from the waitqueue handler, just ensure 2356 - * we wake up the task, and the next invocation will flush the 2357 - * entries. We cannot safely to it from here. 2358 - */ 2359 - if (noflush) 2360 - return -1U; 2361 - 2362 - io_cqring_overflow_flush(ctx, false, NULL, NULL); 2363 - } 2364 - 2365 2318 /* See comment at the top of this file */ 2366 2319 smp_rmb(); 2367 2320 return __io_cqring_events(ctx); ··· 2447 2424 } 2448 2425 2449 2426 io_commit_cqring(ctx); 2450 - if (ctx->flags & IORING_SETUP_SQPOLL) 2451 - io_cqring_ev_posted(ctx); 2427 + io_cqring_ev_posted_iopoll(ctx); 2452 2428 io_req_free_batch_finish(ctx, &rb); 2453 2429 2454 2430 if (!list_empty(&again)) ··· 2573 2551 * If we do, we can potentially be spinning for commands that 2574 2552 * already triggered a CQE (eg in error). 2575 2553 */ 2576 - if (io_cqring_events(ctx, false)) 2554 + if (test_bit(0, &ctx->cq_check_overflow)) 2555 + __io_cqring_overflow_flush(ctx, false, NULL, NULL); 2556 + if (io_cqring_events(ctx)) 2577 2557 break; 2578 2558 2579 2559 /* ··· 2691 2667 return false; 2692 2668 if ((res != -EAGAIN && res != -EOPNOTSUPP) || io_wq_current_is_worker()) 2693 2669 return false; 2670 + 2671 + lockdep_assert_held(&req->ctx->uring_lock); 2694 2672 2695 2673 ret = io_sq_thread_acquire_mm_files(req->ctx, req); 2696 2674 ··· 6852 6826 6853 6827 /* if we have a backlog and couldn't flush it all, return BUSY */ 6854 6828 if (test_bit(0, &ctx->sq_check_overflow)) { 6855 - if (!io_cqring_overflow_flush(ctx, false, NULL, NULL)) 6829 + if (!__io_cqring_overflow_flush(ctx, false, NULL, NULL)) 6856 6830 return -EBUSY; 6857 6831 } 6858 6832 ··· 6954 6928 if (!list_empty(&ctx->iopoll_list)) 6955 6929 io_do_iopoll(ctx, &nr_events, 0); 6956 6930 6957 - if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs))) 6931 + if (to_submit && !ctx->sqo_dead && 6932 + likely(!percpu_ref_is_dying(&ctx->refs))) 6958 6933 ret = io_submit_sqes(ctx, to_submit); 6959 6934 mutex_unlock(&ctx->uring_lock); 6960 6935 } ··· 7116 7089 unsigned nr_timeouts; 7117 7090 }; 7118 7091 7119 - static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush) 7092 + static inline bool io_should_wake(struct io_wait_queue *iowq) 7120 7093 { 7121 7094 struct io_ring_ctx *ctx = iowq->ctx; 7122 7095 ··· 7125 7098 * started waiting. For timeouts, we always want to return to userspace, 7126 7099 * regardless of event count. 7127 7100 */ 7128 - return io_cqring_events(ctx, noflush) >= iowq->to_wait || 7101 + return io_cqring_events(ctx) >= iowq->to_wait || 7129 7102 atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts; 7130 7103 } 7131 7104 ··· 7135 7108 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, 7136 7109 wq); 7137 7110 7138 - /* use noflush == true, as we can't safely rely on locking context */ 7139 - if (!io_should_wake(iowq, true)) 7140 - return -1; 7141 - 7142 - return autoremove_wake_function(curr, mode, wake_flags, key); 7111 + /* 7112 + * Cannot safely flush overflowed CQEs from here, ensure we wake up 7113 + * the task, and the next invocation will do it. 7114 + */ 7115 + if (io_should_wake(iowq) || test_bit(0, &iowq->ctx->cq_check_overflow)) 7116 + return autoremove_wake_function(curr, mode, wake_flags, key); 7117 + return -1; 7143 7118 } 7144 7119 7145 7120 static int io_run_task_work_sig(void) ··· 7178 7149 int ret = 0; 7179 7150 7180 7151 do { 7181 - if (io_cqring_events(ctx, false) >= min_events) 7152 + io_cqring_overflow_flush(ctx, false, NULL, NULL); 7153 + if (io_cqring_events(ctx) >= min_events) 7182 7154 return 0; 7183 7155 if (!io_run_task_work()) 7184 7156 break; ··· 7207 7177 iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); 7208 7178 trace_io_uring_cqring_wait(ctx, min_events); 7209 7179 do { 7180 + io_cqring_overflow_flush(ctx, false, NULL, NULL); 7210 7181 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, 7211 7182 TASK_INTERRUPTIBLE); 7212 7183 /* make sure we run task_work before checking for signals */ ··· 7216 7185 continue; 7217 7186 else if (ret < 0) 7218 7187 break; 7219 - if (io_should_wake(&iowq, false)) 7188 + if (io_should_wake(&iowq)) 7220 7189 break; 7190 + if (test_bit(0, &ctx->cq_check_overflow)) 7191 + continue; 7221 7192 if (uts) { 7222 7193 timeout = schedule_timeout(timeout); 7223 7194 if (timeout == 0) { ··· 7717 7684 7718 7685 ref_node = kzalloc(sizeof(*ref_node), GFP_KERNEL); 7719 7686 if (!ref_node) 7720 - return ERR_PTR(-ENOMEM); 7687 + return NULL; 7721 7688 7722 7689 if (percpu_ref_init(&ref_node->refs, io_file_data_ref_zero, 7723 7690 0, GFP_KERNEL)) { 7724 7691 kfree(ref_node); 7725 - return ERR_PTR(-ENOMEM); 7692 + return NULL; 7726 7693 } 7727 7694 INIT_LIST_HEAD(&ref_node->node); 7728 7695 INIT_LIST_HEAD(&ref_node->file_list); ··· 7816 7783 } 7817 7784 7818 7785 ref_node = alloc_fixed_file_ref_node(ctx); 7819 - if (IS_ERR(ref_node)) { 7786 + if (!ref_node) { 7820 7787 io_sqe_files_unregister(ctx); 7821 - return PTR_ERR(ref_node); 7788 + return -ENOMEM; 7822 7789 } 7823 7790 7824 7791 io_sqe_files_set_node(file_data, ref_node); ··· 7918 7885 return -EINVAL; 7919 7886 7920 7887 ref_node = alloc_fixed_file_ref_node(ctx); 7921 - if (IS_ERR(ref_node)) 7922 - return PTR_ERR(ref_node); 7888 + if (!ref_node) 7889 + return -ENOMEM; 7923 7890 7924 7891 done = 0; 7925 7892 fds = u64_to_user_ptr(up->fds); ··· 8657 8624 smp_rmb(); 8658 8625 if (!io_sqring_full(ctx)) 8659 8626 mask |= EPOLLOUT | EPOLLWRNORM; 8660 - if (io_cqring_events(ctx, false)) 8627 + io_cqring_overflow_flush(ctx, false, NULL, NULL); 8628 + if (io_cqring_events(ctx)) 8661 8629 mask |= EPOLLIN | EPOLLRDNORM; 8662 8630 8663 8631 return mask; ··· 8697 8663 * as nobody else will be looking for them. 8698 8664 */ 8699 8665 do { 8700 - io_iopoll_try_reap_events(ctx); 8666 + __io_uring_cancel_task_requests(ctx, NULL); 8701 8667 } while (!wait_for_completion_timeout(&ctx->ref_comp, HZ/20)); 8702 8668 io_ring_ctx_free(ctx); 8703 8669 } ··· 8713 8679 { 8714 8680 mutex_lock(&ctx->uring_lock); 8715 8681 percpu_ref_kill(&ctx->refs); 8682 + 8683 + if (WARN_ON_ONCE((ctx->flags & IORING_SETUP_SQPOLL) && !ctx->sqo_dead)) 8684 + ctx->sqo_dead = 1; 8685 + 8716 8686 /* if force is set, the ring is going away. always drop after that */ 8717 8687 ctx->cq_overflow_flushed = 1; 8718 8688 if (ctx->rings) 8719 - io_cqring_overflow_flush(ctx, true, NULL, NULL); 8689 + __io_cqring_overflow_flush(ctx, true, NULL, NULL); 8720 8690 mutex_unlock(&ctx->uring_lock); 8721 8691 8722 8692 io_kill_timeouts(ctx, NULL, NULL); ··· 8856 8818 enum io_wq_cancel cret; 8857 8819 bool ret = false; 8858 8820 8859 - cret = io_wq_cancel_cb(ctx->io_wq, io_cancel_task_cb, &cancel, true); 8860 - if (cret != IO_WQ_CANCEL_NOTFOUND) 8861 - ret = true; 8821 + if (ctx->io_wq) { 8822 + cret = io_wq_cancel_cb(ctx->io_wq, io_cancel_task_cb, 8823 + &cancel, true); 8824 + ret |= (cret != IO_WQ_CANCEL_NOTFOUND); 8825 + } 8862 8826 8863 8827 /* SQPOLL thread does its own polling */ 8864 8828 if (!(ctx->flags & IORING_SETUP_SQPOLL)) { ··· 8879 8839 } 8880 8840 } 8881 8841 8842 + static void io_disable_sqo_submit(struct io_ring_ctx *ctx) 8843 + { 8844 + WARN_ON_ONCE(ctx->sqo_task != current); 8845 + 8846 + mutex_lock(&ctx->uring_lock); 8847 + ctx->sqo_dead = 1; 8848 + mutex_unlock(&ctx->uring_lock); 8849 + 8850 + /* make sure callers enter the ring to get error */ 8851 + io_ring_set_wakeup_flag(ctx); 8852 + } 8853 + 8882 8854 /* 8883 8855 * We need to iteratively cancel requests, in case a request has dependent 8884 8856 * hard links. These persist even for failure of cancelations, hence keep ··· 8902 8850 struct task_struct *task = current; 8903 8851 8904 8852 if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) { 8853 + /* for SQPOLL only sqo_task has task notes */ 8854 + io_disable_sqo_submit(ctx); 8905 8855 task = ctx->sq_data->thread; 8906 8856 atomic_inc(&task->io_uring->in_idle); 8907 8857 io_sq_thread_park(ctx->sq_data); 8908 8858 } 8909 8859 8910 8860 io_cancel_defer_files(ctx, task, files); 8911 - io_ring_submit_lock(ctx, (ctx->flags & IORING_SETUP_IOPOLL)); 8912 8861 io_cqring_overflow_flush(ctx, true, task, files); 8913 - io_ring_submit_unlock(ctx, (ctx->flags & IORING_SETUP_IOPOLL)); 8914 8862 8915 8863 if (!files) 8916 8864 __io_uring_cancel_task_requests(ctx, task); ··· 8983 8931 fput(file); 8984 8932 } 8985 8933 8986 - /* 8987 - * Drop task note for this file if we're the only ones that hold it after 8988 - * pending fput() 8989 - */ 8990 - static void io_uring_attempt_task_drop(struct file *file) 8934 + static void io_uring_remove_task_files(struct io_uring_task *tctx) 8991 8935 { 8992 - if (!current->io_uring) 8993 - return; 8994 - /* 8995 - * fput() is pending, will be 2 if the only other ref is our potential 8996 - * task file note. If the task is exiting, drop regardless of count. 8997 - */ 8998 - if (fatal_signal_pending(current) || (current->flags & PF_EXITING) || 8999 - atomic_long_read(&file->f_count) == 2) 8936 + struct file *file; 8937 + unsigned long index; 8938 + 8939 + xa_for_each(&tctx->xa, index, file) 9000 8940 io_uring_del_task_file(file); 9001 8941 } 9002 8942 ··· 9000 8956 9001 8957 /* make sure overflow events are dropped */ 9002 8958 atomic_inc(&tctx->in_idle); 9003 - 9004 - xa_for_each(&tctx->xa, index, file) { 9005 - struct io_ring_ctx *ctx = file->private_data; 9006 - 9007 - io_uring_cancel_task_requests(ctx, files); 9008 - if (files) 9009 - io_uring_del_task_file(file); 9010 - } 9011 - 8959 + xa_for_each(&tctx->xa, index, file) 8960 + io_uring_cancel_task_requests(file->private_data, files); 9012 8961 atomic_dec(&tctx->in_idle); 8962 + 8963 + if (files) 8964 + io_uring_remove_task_files(tctx); 9013 8965 } 9014 8966 9015 8967 static s64 tctx_inflight(struct io_uring_task *tctx) ··· 9068 9028 } while (1); 9069 9029 9070 9030 atomic_dec(&tctx->in_idle); 9031 + 9032 + io_uring_remove_task_files(tctx); 9071 9033 } 9072 9034 9073 9035 static int io_uring_flush(struct file *file, void *data) 9074 9036 { 9075 - io_uring_attempt_task_drop(file); 9037 + struct io_uring_task *tctx = current->io_uring; 9038 + struct io_ring_ctx *ctx = file->private_data; 9039 + 9040 + if (!tctx) 9041 + return 0; 9042 + 9043 + /* we should have cancelled and erased it before PF_EXITING */ 9044 + WARN_ON_ONCE((current->flags & PF_EXITING) && 9045 + xa_load(&tctx->xa, (unsigned long)file)); 9046 + 9047 + /* 9048 + * fput() is pending, will be 2 if the only other ref is our potential 9049 + * task file note. If the task is exiting, drop regardless of count. 9050 + */ 9051 + if (atomic_long_read(&file->f_count) != 2) 9052 + return 0; 9053 + 9054 + if (ctx->flags & IORING_SETUP_SQPOLL) { 9055 + /* there is only one file note, which is owned by sqo_task */ 9056 + WARN_ON_ONCE((ctx->sqo_task == current) == 9057 + !xa_load(&tctx->xa, (unsigned long)file)); 9058 + 9059 + io_disable_sqo_submit(ctx); 9060 + } 9061 + 9062 + if (!(ctx->flags & IORING_SETUP_SQPOLL) || ctx->sqo_task == current) 9063 + io_uring_del_task_file(file); 9076 9064 return 0; 9077 9065 } 9078 9066 ··· 9174 9106 9175 9107 #endif /* !CONFIG_MMU */ 9176 9108 9177 - static void io_sqpoll_wait_sq(struct io_ring_ctx *ctx) 9109 + static int io_sqpoll_wait_sq(struct io_ring_ctx *ctx) 9178 9110 { 9111 + int ret = 0; 9179 9112 DEFINE_WAIT(wait); 9180 9113 9181 9114 do { ··· 9185 9116 9186 9117 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); 9187 9118 9119 + if (unlikely(ctx->sqo_dead)) { 9120 + ret = -EOWNERDEAD; 9121 + goto out; 9122 + } 9123 + 9188 9124 if (!io_sqring_full(ctx)) 9189 9125 break; 9190 9126 ··· 9197 9123 } while (!signal_pending(current)); 9198 9124 9199 9125 finish_wait(&ctx->sqo_sq_wait, &wait); 9126 + out: 9127 + return ret; 9200 9128 } 9201 9129 9202 9130 static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz, ··· 9270 9194 */ 9271 9195 ret = 0; 9272 9196 if (ctx->flags & IORING_SETUP_SQPOLL) { 9273 - if (!list_empty_careful(&ctx->cq_overflow_list)) { 9274 - bool needs_lock = ctx->flags & IORING_SETUP_IOPOLL; 9197 + io_cqring_overflow_flush(ctx, false, NULL, NULL); 9275 9198 9276 - io_ring_submit_lock(ctx, needs_lock); 9277 - io_cqring_overflow_flush(ctx, false, NULL, NULL); 9278 - io_ring_submit_unlock(ctx, needs_lock); 9279 - } 9199 + ret = -EOWNERDEAD; 9200 + if (unlikely(ctx->sqo_dead)) 9201 + goto out; 9280 9202 if (flags & IORING_ENTER_SQ_WAKEUP) 9281 9203 wake_up(&ctx->sq_data->wait); 9282 - if (flags & IORING_ENTER_SQ_WAIT) 9283 - io_sqpoll_wait_sq(ctx); 9204 + if (flags & IORING_ENTER_SQ_WAIT) { 9205 + ret = io_sqpoll_wait_sq(ctx); 9206 + if (ret) 9207 + goto out; 9208 + } 9284 9209 submitted = to_submit; 9285 9210 } else if (to_submit) { 9286 9211 ret = io_uring_add_task_file(ctx, f.file); ··· 9708 9631 trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags); 9709 9632 return ret; 9710 9633 err: 9634 + io_disable_sqo_submit(ctx); 9711 9635 io_ring_ctx_wait_and_kill(ctx); 9712 9636 return ret; 9713 9637 }