Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "funnel-workqueue.h"
7
8#include <linux/atomic.h>
9#include <linux/cache.h>
10#include <linux/completion.h>
11#include <linux/err.h>
12#include <linux/kthread.h>
13#include <linux/percpu.h>
14
15#include "funnel-queue.h"
16#include "logger.h"
17#include "memory-alloc.h"
18#include "numeric.h"
19#include "permassert.h"
20#include "string-utils.h"
21
22#include "completion.h"
23#include "status-codes.h"
24
25static DEFINE_PER_CPU(unsigned int, service_queue_rotor);
26
27/**
28 * DOC: Work queue definition.
29 *
30 * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
31 * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
32 * Externally, both are represented via the same common sub-structure, though there's actually not
33 * a great deal of overlap between the two types internally.
34 */
35struct vdo_work_queue {
36 /* Name of just the work queue (e.g., "cpuQ12") */
37 char *name;
38 bool round_robin_mode;
39 struct vdo_thread *owner;
40 /* Life cycle functions, etc */
41 const struct vdo_work_queue_type *type;
42};
43
44struct simple_work_queue {
45 struct vdo_work_queue common;
46 struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1];
47 void *private;
48
49 /*
50 * The fields above are unchanged after setup but often read, and are good candidates for
51 * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
52 * The fields below are often modified as we sleep and wake, so we want a separate cache
53 * line for performance.
54 */
55
56 /* Any (0 or 1) worker threads waiting for new work to do */
57 wait_queue_head_t waiting_worker_threads ____cacheline_aligned;
58 /* Hack to reduce wakeup calls if the worker thread is running */
59 atomic_t idle;
60
61 /* These are infrequently used so in terms of performance we don't care where they land. */
62 struct task_struct *thread;
63 /* Notify creator once worker has initialized */
64 struct completion *started;
65};
66
67struct round_robin_work_queue {
68 struct vdo_work_queue common;
69 struct simple_work_queue **service_queues;
70 unsigned int num_service_queues;
71};
72
73static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
74{
75 return ((queue == NULL) ?
76 NULL : container_of(queue, struct simple_work_queue, common));
77}
78
79static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue)
80{
81 return ((queue == NULL) ?
82 NULL :
83 container_of(queue, struct round_robin_work_queue, common));
84}
85
86/* Processing normal completions. */
87
88/*
89 * Dequeue and return the next waiting completion, if any.
90 *
91 * We scan the funnel queues from highest priority to lowest, once; there is therefore a race
92 * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
93 * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
94 * enforcement of priorities becomes necessary, this function will need fixing.
95 */
96static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
97{
98 int i;
99
100 for (i = queue->common.type->max_priority; i >= 0; i--) {
101 struct funnel_queue_entry *link = vdo_funnel_queue_poll(queue->priority_lists[i]);
102
103 if (link != NULL)
104 return container_of(link, struct vdo_completion, work_queue_entry_link);
105 }
106
107 return NULL;
108}
109
110static void enqueue_work_queue_completion(struct simple_work_queue *queue,
111 struct vdo_completion *completion)
112{
113 VDO_ASSERT_LOG_ONLY(completion->my_queue == NULL,
114 "completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
115 completion, completion->callback, queue, completion->my_queue);
116 if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY)
117 completion->priority = queue->common.type->default_priority;
118
119 if (VDO_ASSERT(completion->priority <= queue->common.type->max_priority,
120 "priority is in range for queue") != VDO_SUCCESS)
121 completion->priority = 0;
122
123 completion->my_queue = &queue->common;
124
125 /* Funnel queue handles the synchronization for the put. */
126 vdo_funnel_queue_put(queue->priority_lists[completion->priority],
127 &completion->work_queue_entry_link);
128
129 /*
130 * Due to how funnel queue synchronization is handled (just atomic operations), the
131 * simplest safe implementation here would be to wake-up any waiting threads after
132 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
133 * item to the queue, the consumer thread may not see this since it is not guaranteed to
134 * have the same view of the queue as a producer thread.
135 *
136 * However, the above is wasteful so instead we attempt to minimize the number of thread
137 * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be
138 * able to determine when the worker thread might be asleep or going to sleep. We use
139 * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for
140 * waking the worker thread, so multiple wakeups aren't tried at once.
141 *
142 * This was tuned for some x86 boxes that were handy; it's untested whether doing the read
143 * first is any better or worse for other platforms, even other x86 configurations.
144 */
145 smp_mb();
146 if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
147 return;
148
149 /* There's a maximum of one thread in this list. */
150 wake_up(&queue->waiting_worker_threads);
151}
152
153static void run_start_hook(struct simple_work_queue *queue)
154{
155 if (queue->common.type->start != NULL)
156 queue->common.type->start(queue->private);
157}
158
159static void run_finish_hook(struct simple_work_queue *queue)
160{
161 if (queue->common.type->finish != NULL)
162 queue->common.type->finish(queue->private);
163}
164
165/*
166 * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
167 * for us to shut down.
168 *
169 * If kthread_should_stop says it's time to stop but we have pending completions return a
170 * completion.
171 *
172 * Also update statistics relating to scheduler interactions.
173 */
174static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
175{
176 struct vdo_completion *completion;
177 DEFINE_WAIT(wait);
178
179 while (true) {
180 prepare_to_wait(&queue->waiting_worker_threads, &wait,
181 TASK_INTERRUPTIBLE);
182 /*
183 * Don't set the idle flag until a wakeup will not be lost.
184 *
185 * Force synchronization between setting the idle flag and checking the funnel
186 * queue; the producer side will do them in the reverse order. (There's still a
187 * race condition we've chosen to allow, because we've got a timeout below that
188 * unwedges us if we hit it, but this may narrow the window a little.)
189 */
190 atomic_set(&queue->idle, 1);
191 smp_mb(); /* store-load barrier between "idle" and funnel queue */
192
193 completion = poll_for_completion(queue);
194 if (completion != NULL)
195 break;
196
197 /*
198 * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up
199 * above. Otherwise, schedule() will put the thread to sleep and might miss a
200 * wakeup from kthread_stop() call in vdo_finish_work_queue().
201 */
202 if (kthread_should_stop())
203 break;
204
205 schedule();
206
207 /*
208 * Most of the time when we wake, it should be because there's work to do. If it
209 * was a spurious wakeup, continue looping.
210 */
211 completion = poll_for_completion(queue);
212 if (completion != NULL)
213 break;
214 }
215
216 finish_wait(&queue->waiting_worker_threads, &wait);
217 atomic_set(&queue->idle, 0);
218
219 return completion;
220}
221
222static void process_completion(struct simple_work_queue *queue,
223 struct vdo_completion *completion)
224{
225 if (VDO_ASSERT(completion->my_queue == &queue->common,
226 "completion %px from queue %px marked as being in this queue (%px)",
227 completion, queue, completion->my_queue) == VDO_SUCCESS)
228 completion->my_queue = NULL;
229
230 vdo_run_completion(completion);
231}
232
233static void service_work_queue(struct simple_work_queue *queue)
234{
235 run_start_hook(queue);
236
237 while (true) {
238 struct vdo_completion *completion = poll_for_completion(queue);
239
240 if (completion == NULL)
241 completion = wait_for_next_completion(queue);
242
243 if (completion == NULL) {
244 /* No completions but kthread_should_stop() was triggered. */
245 break;
246 }
247
248 process_completion(queue, completion);
249
250 /*
251 * Be friendly to a CPU that has other work to do, if the kernel has told us to.
252 * This speeds up some performance tests; that "other work" might include other VDO
253 * threads.
254 */
255 cond_resched();
256 }
257
258 run_finish_hook(queue);
259}
260
261static int work_queue_runner(void *ptr)
262{
263 struct simple_work_queue *queue = ptr;
264
265 complete(queue->started);
266 service_work_queue(queue);
267 return 0;
268}
269
270/* Creation & teardown */
271
272static void free_simple_work_queue(struct simple_work_queue *queue)
273{
274 unsigned int i;
275
276 for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++)
277 vdo_free_funnel_queue(queue->priority_lists[i]);
278 vdo_free(queue->common.name);
279 vdo_free(queue);
280}
281
282static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
283{
284 struct simple_work_queue **queue_table = queue->service_queues;
285 unsigned int count = queue->num_service_queues;
286 unsigned int i;
287
288 queue->service_queues = NULL;
289
290 for (i = 0; i < count; i++)
291 free_simple_work_queue(queue_table[i]);
292 vdo_free(queue_table);
293 vdo_free(queue->common.name);
294 vdo_free(queue);
295}
296
297void vdo_free_work_queue(struct vdo_work_queue *queue)
298{
299 if (queue == NULL)
300 return;
301
302 vdo_finish_work_queue(queue);
303
304 if (queue->round_robin_mode)
305 free_round_robin_work_queue(as_round_robin_work_queue(queue));
306 else
307 free_simple_work_queue(as_simple_work_queue(queue));
308}
309
310static int make_simple_work_queue(const char *thread_name_prefix, const char *name,
311 struct vdo_thread *owner, void *private,
312 const struct vdo_work_queue_type *type,
313 struct simple_work_queue **queue_ptr)
314{
315 DECLARE_COMPLETION_ONSTACK(started);
316 struct simple_work_queue *queue;
317 int i;
318 struct task_struct *thread = NULL;
319 int result;
320
321 VDO_ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY),
322 "queue priority count %u within limit %u", type->max_priority,
323 VDO_WORK_Q_MAX_PRIORITY);
324
325 result = vdo_allocate(1, "simple work queue", &queue);
326 if (result != VDO_SUCCESS)
327 return result;
328
329 queue->private = private;
330 queue->started = &started;
331 queue->common.type = type;
332 queue->common.owner = owner;
333 init_waitqueue_head(&queue->waiting_worker_threads);
334
335 result = vdo_duplicate_string(name, "queue name", &queue->common.name);
336 if (result != VDO_SUCCESS) {
337 vdo_free(queue);
338 return -ENOMEM;
339 }
340
341 for (i = 0; i <= type->max_priority; i++) {
342 result = vdo_make_funnel_queue(&queue->priority_lists[i]);
343 if (result != VDO_SUCCESS) {
344 free_simple_work_queue(queue);
345 return result;
346 }
347 }
348
349 thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix,
350 queue->common.name);
351 if (IS_ERR(thread)) {
352 free_simple_work_queue(queue);
353 return (int) PTR_ERR(thread);
354 }
355
356 queue->thread = thread;
357
358 /*
359 * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to
360 * errors elsewhere) could cause it to never get as far as running VDO, skipping the
361 * cleanup code.
362 *
363 * Eventually we should just make that path safe too, and then we won't need this
364 * synchronization.
365 */
366 wait_for_completion(&started);
367
368 *queue_ptr = queue;
369 return VDO_SUCCESS;
370}
371
372/**
373 * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
374 * be distributed to them in round-robin fashion.
375 * @thread_name_prefix: A prefix for the thread names to identify them as a vdo thread.
376 * @name: A base name to identify this queue.
377 * @owner: The vdo_thread structure to manage this queue.
378 * @type: The type of queue to create.
379 * @thread_count: The number of actual threads handling this queue.
380 * @thread_privates: An array of private contexts, one for each thread; may be NULL.
381 * @queue_ptr: A pointer to return the new work queue.
382 *
383 * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
384 * of the actual number of queues and threads allocated here, code outside of the queue
385 * implementation will treat this as a single zone.
386 */
387int vdo_make_work_queue(const char *thread_name_prefix, const char *name,
388 struct vdo_thread *owner, const struct vdo_work_queue_type *type,
389 unsigned int thread_count, void *thread_privates[],
390 struct vdo_work_queue **queue_ptr)
391{
392 struct round_robin_work_queue *queue;
393 int result;
394 char thread_name[TASK_COMM_LEN];
395 unsigned int i;
396
397 if (thread_count == 1) {
398 struct simple_work_queue *simple_queue;
399 void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL);
400
401 result = make_simple_work_queue(thread_name_prefix, name, owner, context,
402 type, &simple_queue);
403 if (result == VDO_SUCCESS)
404 *queue_ptr = &simple_queue->common;
405 return result;
406 }
407
408 result = vdo_allocate(1, "round-robin work queue", &queue);
409 if (result != VDO_SUCCESS)
410 return result;
411
412 result = vdo_allocate(thread_count, "subordinate work queues", &queue->service_queues);
413 if (result != VDO_SUCCESS) {
414 vdo_free(queue);
415 return result;
416 }
417
418 queue->num_service_queues = thread_count;
419 queue->common.round_robin_mode = true;
420 queue->common.owner = owner;
421
422 result = vdo_duplicate_string(name, "queue name", &queue->common.name);
423 if (result != VDO_SUCCESS) {
424 vdo_free(queue->service_queues);
425 vdo_free(queue);
426 return -ENOMEM;
427 }
428
429 *queue_ptr = &queue->common;
430
431 for (i = 0; i < thread_count; i++) {
432 void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL);
433
434 snprintf(thread_name, sizeof(thread_name), "%s%u", name, i);
435 result = make_simple_work_queue(thread_name_prefix, thread_name, owner,
436 context, type, &queue->service_queues[i]);
437 if (result != VDO_SUCCESS) {
438 queue->num_service_queues = i;
439 /* Destroy previously created subordinates. */
440 vdo_free_work_queue(vdo_forget(*queue_ptr));
441 return result;
442 }
443 }
444
445 return VDO_SUCCESS;
446}
447
448static void finish_simple_work_queue(struct simple_work_queue *queue)
449{
450 if (queue->thread == NULL)
451 return;
452
453 /* Tells the worker thread to shut down and waits for it to exit. */
454 kthread_stop(queue->thread);
455 queue->thread = NULL;
456}
457
458static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
459{
460 struct simple_work_queue **queue_table = queue->service_queues;
461 unsigned int count = queue->num_service_queues;
462 unsigned int i;
463
464 for (i = 0; i < count; i++)
465 finish_simple_work_queue(queue_table[i]);
466}
467
468/* No enqueueing of completions should be done once this function is called. */
469void vdo_finish_work_queue(struct vdo_work_queue *queue)
470{
471 if (queue == NULL)
472 return;
473
474 if (queue->round_robin_mode)
475 finish_round_robin_work_queue(as_round_robin_work_queue(queue));
476 else
477 finish_simple_work_queue(as_simple_work_queue(queue));
478}
479
480/* Debugging dumps */
481
482static void dump_simple_work_queue(struct simple_work_queue *queue)
483{
484 const char *thread_status = "no threads";
485 char task_state_report = '-';
486
487 if (queue->thread != NULL) {
488 task_state_report = task_state_to_char(queue->thread);
489 thread_status = atomic_read(&queue->idle) ? "idle" : "running";
490 }
491
492 vdo_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name,
493 thread_status, task_state_report);
494
495 /* ->waiting_worker_threads wait queue status? anyone waiting? */
496}
497
498/*
499 * Write to the buffer some info about the completion, for logging. Since the common use case is
500 * dumping info about a lot of completions to syslog all at once, the format favors brevity over
501 * readability.
502 */
503void vdo_dump_work_queue(struct vdo_work_queue *queue)
504{
505 if (queue->round_robin_mode) {
506 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
507 unsigned int i;
508
509 for (i = 0; i < round_robin->num_service_queues; i++)
510 dump_simple_work_queue(round_robin->service_queues[i]);
511 } else {
512 dump_simple_work_queue(as_simple_work_queue(queue));
513 }
514}
515
516static void get_function_name(void *pointer, char *buffer, size_t buffer_length)
517{
518 if (pointer == NULL) {
519 /*
520 * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We
521 * sometimes use this when logging lots of data; don't be so verbose.
522 */
523 strscpy(buffer, "-", buffer_length);
524 } else {
525 /*
526 * Use a pragma to defeat gcc's format checking, which doesn't understand that
527 * "%ps" actually does support a precision spec in Linux kernel code.
528 */
529 char *space;
530
531#pragma GCC diagnostic push
532#pragma GCC diagnostic ignored "-Wformat"
533 snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer);
534#pragma GCC diagnostic pop
535
536 space = strchr(buffer, ' ');
537 if (space != NULL)
538 *space = '\0';
539 }
540}
541
542void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer,
543 size_t length)
544{
545 size_t current_length =
546 scnprintf(buffer, length, "%.*s/", TASK_COMM_LEN,
547 (completion->my_queue == NULL ? "-" : completion->my_queue->name));
548
549 if (current_length < length - 1) {
550 get_function_name((void *) completion->callback, buffer + current_length,
551 length - current_length);
552 }
553}
554
555/* Completion submission */
556/*
557 * If the completion has a timeout that has already passed, the timeout handler function may be
558 * invoked by this function.
559 */
560void vdo_enqueue_work_queue(struct vdo_work_queue *queue,
561 struct vdo_completion *completion)
562{
563 /*
564 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
565 * on.
566 */
567 struct simple_work_queue *simple_queue = NULL;
568
569 if (!queue->round_robin_mode) {
570 simple_queue = as_simple_work_queue(queue);
571 } else {
572 struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
573
574 /*
575 * It shouldn't be a big deal if the same rotor gets used for multiple work queues.
576 * Any patterns that might develop are likely to be disrupted by random ordering of
577 * multiple completions and migration between cores, unless the load is so light as
578 * to be regular in ordering of tasks and the threads are confined to individual
579 * cores; with a load that light we won't care.
580 */
581 unsigned int rotor = this_cpu_inc_return(service_queue_rotor);
582 unsigned int index = rotor % round_robin->num_service_queues;
583
584 simple_queue = round_robin->service_queues[index];
585 }
586
587 enqueue_work_queue_completion(simple_queue, completion);
588}
589
590/* Misc */
591
592/*
593 * Return the work queue pointer recorded at initialization time in the work-queue stack handle
594 * initialized on the stack of the current thread, if any.
595 */
596static struct simple_work_queue *get_current_thread_work_queue(void)
597{
598 /*
599 * In interrupt context, if a vdo thread is what got interrupted, the calls below will find
600 * the queue for the thread which was interrupted. However, the interrupted thread may have
601 * been processing a completion, in which case starting to process another would violate
602 * our concurrency assumptions.
603 */
604 if (in_interrupt())
605 return NULL;
606
607 if (kthread_func(current) != work_queue_runner)
608 /* Not a VDO work queue thread. */
609 return NULL;
610
611 return kthread_data(current);
612}
613
614struct vdo_work_queue *vdo_get_current_work_queue(void)
615{
616 struct simple_work_queue *queue = get_current_thread_work_queue();
617
618 return (queue == NULL) ? NULL : &queue->common;
619}
620
621struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
622{
623 return queue->owner;
624}
625
626/**
627 * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
628 * queue, or NULL if none or if the current thread is not a
629 * work queue thread.
630 */
631void *vdo_get_work_queue_private_data(void)
632{
633 struct simple_work_queue *queue = get_current_thread_work_queue();
634
635 return (queue != NULL) ? queue->private : NULL;
636}
637
638bool vdo_work_queue_type_is(struct vdo_work_queue *queue,
639 const struct vdo_work_queue_type *type)
640{
641 return (queue->type == type);
642}