Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0
2
3/*
4 * Test module for stress and performance analysis of workqueue.
5 *
6 * Benchmarks queue_work() throughput on an unbound workqueue to measure
7 * pool->lock contention under different affinity scope configurations
8 * (e.g., cache vs cache_shard).
9 *
10 * The affinity scope is changed between runs via the workqueue's sysfs
11 * affinity_scope attribute (WQ_SYSFS).
12 *
13 * Copyright (c) 2026 Meta Platforms, Inc. and affiliates
14 * Copyright (c) 2026 Breno Leitao <leitao@debian.org>
15 *
16 */
17#include <linux/init.h>
18#include <linux/kernel.h>
19#include <linux/module.h>
20#include <linux/workqueue.h>
21#include <linux/kthread.h>
22#include <linux/moduleparam.h>
23#include <linux/completion.h>
24#include <linux/atomic.h>
25#include <linux/slab.h>
26#include <linux/ktime.h>
27#include <linux/cpumask.h>
28#include <linux/sched.h>
29#include <linux/sort.h>
30#include <linux/fs.h>
31
32#define WQ_NAME "bench_wq"
33#define SCOPE_PATH "/sys/bus/workqueue/devices/" WQ_NAME "/affinity_scope"
34
35static int nr_threads;
36module_param(nr_threads, int, 0444);
37MODULE_PARM_DESC(nr_threads,
38 "Number of threads to spawn (default: 0 = num_online_cpus())");
39
40static int wq_items = 50000;
41module_param(wq_items, int, 0444);
42MODULE_PARM_DESC(wq_items,
43 "Number of work items each thread queues (default: 50000)");
44
45static struct workqueue_struct *bench_wq;
46static atomic_t threads_done;
47static DECLARE_COMPLETION(start_comp);
48static DECLARE_COMPLETION(all_done_comp);
49
50struct thread_ctx {
51 struct completion work_done;
52 struct work_struct work;
53 u64 *latencies;
54 int cpu;
55 int items;
56};
57
58static void bench_work_fn(struct work_struct *work)
59{
60 struct thread_ctx *ctx = container_of(work, struct thread_ctx, work);
61
62 complete(&ctx->work_done);
63}
64
65static int bench_kthread_fn(void *data)
66{
67 struct thread_ctx *ctx = data;
68 ktime_t t_start, t_end;
69 int i;
70
71 /* Wait for all threads to be ready */
72 wait_for_completion(&start_comp);
73
74 if (kthread_should_stop())
75 return 0;
76
77 for (i = 0; i < ctx->items; i++) {
78 reinit_completion(&ctx->work_done);
79 INIT_WORK(&ctx->work, bench_work_fn);
80
81 t_start = ktime_get();
82 queue_work(bench_wq, &ctx->work);
83 t_end = ktime_get();
84
85 ctx->latencies[i] = ktime_to_ns(ktime_sub(t_end, t_start));
86 wait_for_completion(&ctx->work_done);
87 }
88
89 if (atomic_dec_and_test(&threads_done))
90 complete(&all_done_comp);
91
92 /*
93 * Wait for kthread_stop() so the module text isn't freed
94 * while we're still executing.
95 */
96 while (!kthread_should_stop())
97 schedule();
98
99 return 0;
100}
101
102static int cmp_u64(const void *a, const void *b)
103{
104 u64 va = *(const u64 *)a;
105 u64 vb = *(const u64 *)b;
106
107 if (va < vb)
108 return -1;
109 if (va > vb)
110 return 1;
111 return 0;
112}
113
114static int __init set_affn_scope(const char *scope)
115{
116 struct file *f;
117 loff_t pos = 0;
118 ssize_t ret;
119
120 f = filp_open(SCOPE_PATH, O_WRONLY, 0);
121 if (IS_ERR(f)) {
122 pr_err("test_workqueue: open %s failed: %ld\n",
123 SCOPE_PATH, PTR_ERR(f));
124 return PTR_ERR(f);
125 }
126
127 ret = kernel_write(f, scope, strlen(scope), &pos);
128 filp_close(f, NULL);
129
130 if (ret < 0) {
131 pr_err("test_workqueue: write '%s' failed: %zd\n", scope, ret);
132 return ret;
133 }
134
135 return 0;
136}
137
138static int __init run_bench(int n_threads, const char *scope, const char *label)
139{
140 struct task_struct **tasks;
141 unsigned long total_items;
142 struct thread_ctx *ctxs;
143 u64 *all_latencies;
144 ktime_t start, end;
145 int cpu, i, j, ret;
146 s64 elapsed_us;
147
148 ret = set_affn_scope(scope);
149 if (ret)
150 return ret;
151
152 ctxs = kcalloc(n_threads, sizeof(*ctxs), GFP_KERNEL);
153 if (!ctxs)
154 return -ENOMEM;
155
156 tasks = kcalloc(n_threads, sizeof(*tasks), GFP_KERNEL);
157 if (!tasks) {
158 kfree(ctxs);
159 return -ENOMEM;
160 }
161
162 total_items = (unsigned long)n_threads * wq_items;
163 all_latencies = kvmalloc_array(total_items, sizeof(u64), GFP_KERNEL);
164 if (!all_latencies) {
165 kfree(tasks);
166 kfree(ctxs);
167 return -ENOMEM;
168 }
169
170 /* Allocate per-thread latency arrays */
171 for (i = 0; i < n_threads; i++) {
172 ctxs[i].latencies = kvmalloc_array(wq_items, sizeof(u64),
173 GFP_KERNEL);
174 if (!ctxs[i].latencies) {
175 while (--i >= 0)
176 kvfree(ctxs[i].latencies);
177 kvfree(all_latencies);
178 kfree(tasks);
179 kfree(ctxs);
180 return -ENOMEM;
181 }
182 }
183
184 atomic_set(&threads_done, n_threads);
185 reinit_completion(&all_done_comp);
186 reinit_completion(&start_comp);
187
188 /* Create kthreads, each bound to a different online CPU */
189 i = 0;
190 for_each_online_cpu(cpu) {
191 if (i >= n_threads)
192 break;
193
194 ctxs[i].cpu = cpu;
195 ctxs[i].items = wq_items;
196 init_completion(&ctxs[i].work_done);
197
198 tasks[i] = kthread_create(bench_kthread_fn, &ctxs[i],
199 "wq_bench/%d", cpu);
200 if (IS_ERR(tasks[i])) {
201 ret = PTR_ERR(tasks[i]);
202 pr_err("test_workqueue: failed to create kthread %d: %d\n",
203 i, ret);
204 /* Unblock threads waiting on start_comp before stopping them */
205 complete_all(&start_comp);
206 while (--i >= 0)
207 kthread_stop(tasks[i]);
208 goto out_free;
209 }
210
211 kthread_bind(tasks[i], cpu);
212 wake_up_process(tasks[i]);
213 i++;
214 }
215
216 /* Start timing and release all threads */
217 start = ktime_get();
218 complete_all(&start_comp);
219
220 /* Wait for all threads to finish the benchmark */
221 wait_for_completion(&all_done_comp);
222
223 /* Drain any remaining work */
224 flush_workqueue(bench_wq);
225
226 /* Ensure all kthreads have fully exited before module memory is freed */
227 for (i = 0; i < n_threads; i++)
228 kthread_stop(tasks[i]);
229
230 end = ktime_get();
231 elapsed_us = ktime_us_delta(end, start);
232
233 /* Merge all per-thread latencies and sort for percentile calculation */
234 j = 0;
235 for (i = 0; i < n_threads; i++) {
236 memcpy(&all_latencies[j], ctxs[i].latencies,
237 wq_items * sizeof(u64));
238 j += wq_items;
239 }
240
241 sort(all_latencies, total_items, sizeof(u64), cmp_u64, NULL);
242
243 pr_info("test_workqueue: %-16s %llu items/sec\tp50=%llu\tp90=%llu\tp95=%llu ns\n",
244 label,
245 elapsed_us ? div_u64(total_items * 1000000ULL, elapsed_us) : 0,
246 all_latencies[total_items * 50 / 100],
247 all_latencies[total_items * 90 / 100],
248 all_latencies[total_items * 95 / 100]);
249
250 ret = 0;
251out_free:
252 for (i = 0; i < n_threads; i++)
253 kvfree(ctxs[i].latencies);
254 kvfree(all_latencies);
255 kfree(tasks);
256 kfree(ctxs);
257
258 return ret;
259}
260
261static const char * const bench_scopes[] = {
262 "cpu", "smt", "cache_shard", "cache", "numa", "system",
263};
264
265static int __init test_workqueue_init(void)
266{
267 int n_threads = min(nr_threads ?: num_online_cpus(), num_online_cpus());
268 int i;
269
270 if (wq_items <= 0) {
271 pr_err("test_workqueue: wq_items must be > 0\n");
272 return -EINVAL;
273 }
274
275 bench_wq = alloc_workqueue(WQ_NAME, WQ_UNBOUND | WQ_SYSFS, 0);
276 if (!bench_wq)
277 return -ENOMEM;
278
279 pr_info("test_workqueue: running %d threads, %d items/thread\n",
280 n_threads, wq_items);
281
282 for (i = 0; i < ARRAY_SIZE(bench_scopes); i++)
283 run_bench(n_threads, bench_scopes[i], bench_scopes[i]);
284
285 destroy_workqueue(bench_wq);
286
287 /* Return -EAGAIN so the module doesn't stay loaded after the benchmark */
288 return -EAGAIN;
289}
290
291module_init(test_workqueue_init);
292MODULE_AUTHOR("Breno Leitao <leitao@debian.org>");
293MODULE_DESCRIPTION("Stress/performance benchmark for workqueue subsystem");
294MODULE_LICENSE("GPL");