Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Waiting for completion events
4 */
5#include <linux/kernel.h>
6#include <linux/sched/signal.h>
7#include <linux/io_uring.h>
8
9#include <trace/events/io_uring.h>
10
11#include <uapi/linux/io_uring.h>
12
13#include "io_uring.h"
14#include "napi.h"
15#include "wait.h"
16
17static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
18 int wake_flags, void *key)
19{
20 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
21
22 /*
23 * Cannot safely flush overflowed CQEs from here, ensure we wake up
24 * the task, and the next invocation will do it.
25 */
26 if (io_should_wake(iowq) || io_has_work(iowq->ctx))
27 return autoremove_wake_function(curr, mode, wake_flags, key);
28 return -1;
29}
30
31int io_run_task_work_sig(struct io_ring_ctx *ctx)
32{
33 if (io_local_work_pending(ctx)) {
34 __set_current_state(TASK_RUNNING);
35 if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
36 return 0;
37 }
38 if (io_run_task_work() > 0)
39 return 0;
40 if (task_sigpending(current))
41 return -EINTR;
42 return 0;
43}
44
45static bool current_pending_io(void)
46{
47 struct io_uring_task *tctx = current->io_uring;
48
49 if (!tctx)
50 return false;
51 return percpu_counter_read_positive(&tctx->inflight);
52}
53
54static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
55{
56 struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
57
58 WRITE_ONCE(iowq->hit_timeout, 1);
59 iowq->min_timeout = 0;
60 wake_up_process(iowq->wq.private);
61 return HRTIMER_NORESTART;
62}
63
64/*
65 * Doing min_timeout portion. If we saw any timeouts, events, or have work,
66 * wake up. If not, and we have a normal timeout, switch to that and keep
67 * sleeping.
68 */
69static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
70{
71 struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
72 struct io_ring_ctx *ctx = iowq->ctx;
73
74 /* no general timeout, or shorter (or equal), we are done */
75 if (iowq->timeout == KTIME_MAX ||
76 ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
77 goto out_wake;
78 /* work we may need to run, wake function will see if we need to wake */
79 if (io_has_work(ctx))
80 goto out_wake;
81 /* got events since we started waiting, min timeout is done */
82 scoped_guard(rcu) {
83 struct io_rings *rings = io_get_rings(ctx);
84
85 if (iowq->cq_min_tail != READ_ONCE(rings->cq.tail))
86 goto out_wake;
87 /* if we have any events and min timeout expired, we're done */
88 if (io_cqring_events(ctx))
89 goto out_wake;
90 }
91 /*
92 * If using deferred task_work running and application is waiting on
93 * more than one request, ensure we reset it now where we are switching
94 * to normal sleeps. Any request completion post min_wait should wake
95 * the task and return.
96 */
97 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
98 atomic_set(&ctx->cq_wait_nr, 1);
99 smp_mb();
100 if (!llist_empty(&ctx->work_llist))
101 goto out_wake;
102 }
103
104 /* any generated CQE posted past this time should wake us up */
105 iowq->cq_tail = iowq->cq_min_tail;
106
107 hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
108 hrtimer_set_expires(timer, iowq->timeout);
109 return HRTIMER_RESTART;
110out_wake:
111 return io_cqring_timer_wakeup(timer);
112}
113
114static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
115 clockid_t clock_id, ktime_t start_time)
116{
117 ktime_t timeout;
118
119 if (iowq->min_timeout) {
120 timeout = ktime_add_ns(iowq->min_timeout, start_time);
121 hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
122 HRTIMER_MODE_ABS);
123 } else {
124 timeout = iowq->timeout;
125 hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
126 HRTIMER_MODE_ABS);
127 }
128
129 hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
130 hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
131
132 if (!READ_ONCE(iowq->hit_timeout))
133 schedule();
134
135 hrtimer_cancel(&iowq->t);
136 destroy_hrtimer_on_stack(&iowq->t);
137 __set_current_state(TASK_RUNNING);
138
139 return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
140}
141
142static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
143 struct io_wait_queue *iowq,
144 struct ext_arg *ext_arg,
145 ktime_t start_time)
146{
147 int ret = 0;
148
149 /*
150 * Mark us as being in io_wait if we have pending requests, so cpufreq
151 * can take into account that the task is waiting for IO - turns out
152 * to be important for low QD IO.
153 */
154 if (ext_arg->iowait && current_pending_io())
155 current->in_iowait = 1;
156 if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
157 ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
158 else
159 schedule();
160 current->in_iowait = 0;
161 return ret;
162}
163
164/* If this returns > 0, the caller should retry */
165static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
166 struct io_wait_queue *iowq,
167 struct ext_arg *ext_arg,
168 ktime_t start_time)
169{
170 if (unlikely(READ_ONCE(ctx->check_cq)))
171 return 1;
172 if (unlikely(io_local_work_pending(ctx)))
173 return 1;
174 if (unlikely(task_work_pending(current)))
175 return 1;
176 if (unlikely(task_sigpending(current)))
177 return -EINTR;
178 if (unlikely(io_should_wake(iowq)))
179 return 0;
180
181 return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
182}
183
184/*
185 * Wait until events become available, if we don't already have some. The
186 * application must reap them itself, as they reside on the shared cq ring.
187 */
188int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
189 struct ext_arg *ext_arg)
190{
191 struct io_wait_queue iowq;
192 struct io_rings *rings;
193 ktime_t start_time;
194 int ret, nr_wait;
195
196 min_events = min_t(int, min_events, ctx->cq_entries);
197
198 if (!io_allowed_run_tw(ctx))
199 return -EEXIST;
200 if (io_local_work_pending(ctx))
201 io_run_local_work(ctx, min_events,
202 max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
203 io_run_task_work();
204
205 if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
206 io_cqring_do_overflow_flush(ctx);
207
208 rcu_read_lock();
209 rings = io_get_rings(ctx);
210 if (__io_cqring_events_user(ctx) >= min_events) {
211 rcu_read_unlock();
212 return 0;
213 }
214
215 init_waitqueue_func_entry(&iowq.wq, io_wake_function);
216 iowq.wq.private = current;
217 INIT_LIST_HEAD(&iowq.wq.entry);
218 iowq.ctx = ctx;
219 iowq.cq_tail = READ_ONCE(rings->cq.head) + min_events;
220 iowq.cq_min_tail = READ_ONCE(rings->cq.tail);
221 nr_wait = (int) iowq.cq_tail - READ_ONCE(rings->cq.tail);
222 rcu_read_unlock();
223 rings = NULL;
224 iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
225 iowq.hit_timeout = 0;
226 iowq.min_timeout = ext_arg->min_time;
227 iowq.timeout = KTIME_MAX;
228 start_time = io_get_time(ctx);
229
230 if (ext_arg->ts_set) {
231 iowq.timeout = timespec64_to_ktime(ext_arg->ts);
232 if (!(flags & IORING_ENTER_ABS_TIMER))
233 iowq.timeout = ktime_add(iowq.timeout, start_time);
234 }
235
236 if (ext_arg->sig) {
237#ifdef CONFIG_COMPAT
238 if (in_compat_syscall())
239 ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
240 ext_arg->argsz);
241 else
242#endif
243 ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
244
245 if (ret)
246 return ret;
247 }
248
249 io_napi_busy_loop(ctx, &iowq);
250
251 trace_io_uring_cqring_wait(ctx, min_events);
252 do {
253 unsigned long check_cq;
254
255 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
256 atomic_set(&ctx->cq_wait_nr, nr_wait);
257 set_current_state(TASK_INTERRUPTIBLE);
258 } else {
259 prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
260 TASK_INTERRUPTIBLE);
261 }
262
263 ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
264 __set_current_state(TASK_RUNNING);
265 atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
266
267 /*
268 * Run task_work after scheduling and before io_should_wake().
269 * If we got woken because of task_work being processed, run it
270 * now rather than let the caller do another wait loop.
271 */
272 if (io_local_work_pending(ctx))
273 io_run_local_work(ctx, nr_wait, nr_wait);
274 io_run_task_work();
275
276 /*
277 * Non-local task_work will be run on exit to userspace, but
278 * if we're using DEFER_TASKRUN, then we could have waited
279 * with a timeout for a number of requests. If the timeout
280 * hits, we could have some requests ready to process. Ensure
281 * this break is _after_ we have run task_work, to avoid
282 * deferring running potentially pending requests until the
283 * next time we wait for events.
284 */
285 if (ret < 0)
286 break;
287
288 check_cq = READ_ONCE(ctx->check_cq);
289 if (unlikely(check_cq)) {
290 /* let the caller flush overflows, retry */
291 if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
292 io_cqring_do_overflow_flush(ctx);
293 if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
294 ret = -EBADR;
295 break;
296 }
297 }
298
299 if (io_should_wake(&iowq)) {
300 ret = 0;
301 break;
302 }
303 cond_resched();
304
305 /* if min timeout has been hit, don't reset wait count */
306 if (!iowq.hit_timeout)
307 scoped_guard(rcu)
308 nr_wait = (int) iowq.cq_tail -
309 READ_ONCE(io_get_rings(ctx)->cq.tail);
310 else
311 nr_wait = 1;
312 } while (1);
313
314 if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
315 finish_wait(&ctx->cq_wait, &iowq.wq);
316 restore_saved_sigmask_unless(ret == -EINTR);
317
318 guard(rcu)();
319 return READ_ONCE(io_get_rings(ctx)->cq.head) == READ_ONCE(io_get_rings(ctx)->cq.tail) ? ret : 0;
320}