atproto relay implementation in zig
zlay.waow.tech
1//! generic thread pool with key-partitioned queues
2//!
3//! each worker has its own bounded ring buffer + mutex + condvar.
4//! `submit(key, item, shutdown)` routes to `workers[key % N]` for per-key ordering.
5//! blocks when the target queue is full (backpressure to caller).
6//! items stored by value in pre-allocated ring buffer (zero alloc per submit).
7
8const std = @import("std");
9const Io = std.Io;
10const Allocator = std.mem.Allocator;
11
12pub fn ThreadPool(comptime T: type, comptime processFn: fn (*T) void) type {
13 return struct {
14 const Self = @This();
15
16 pub const Config = struct {
17 num_workers: u16 = 8,
18 queue_capacity: u16 = 4096,
19 stack_size: usize = 4 * 1024 * 1024,
20 };
21
22 const Worker = struct {
23 // ring buffer stored as a slice of T
24 queue: []T,
25 capacity: u16,
26 head: u16 = 0, // next slot to read
27 tail: u16 = 0, // next slot to write
28 count: u16 = 0,
29 mutex: Io.Mutex = Io.Mutex.init,
30 cond: Io.Condition = Io.Condition.init, // "not empty" — workers wait here
31 not_full: Io.Condition = Io.Condition.init, // "not full" — submitters wait here
32 alive: bool = true,
33 thread: ?std.Thread = null,
34 io: Io,
35 };
36
37 workers: []Worker,
38 allocator: Allocator,
39 io: Io,
40
41 pub fn init(allocator: Allocator, config: Config, io: Io) !Self {
42 const workers = try allocator.alloc(Worker, config.num_workers);
43 for (workers) |*w| {
44 w.* = .{
45 .queue = try allocator.alloc(T, config.queue_capacity),
46 .capacity = config.queue_capacity,
47 .io = io,
48 };
49 }
50
51 const self = Self{
52 .workers = workers,
53 .allocator = allocator,
54 .io = io,
55 };
56
57 // spawn worker threads
58 for (self.workers) |*w| {
59 w.thread = try std.Thread.spawn(
60 .{ .stack_size = config.stack_size },
61 workerLoop,
62 .{w},
63 );
64 }
65
66 return self;
67 }
68
69 /// submit an item for processing, routed by key.
70 /// blocks if the target worker's queue is full (backpressure).
71 /// returns false only if shutdown was requested while waiting.
72 pub fn submit(self: *Self, key: u64, item: T, stop: *std.atomic.Value(bool)) bool {
73 const idx = key % self.workers.len;
74 const w = &self.workers[idx];
75
76 w.mutex.lockUncancelable(w.io);
77 defer w.mutex.unlock(w.io);
78
79 while (w.count == w.capacity) {
80 if (stop.load(.acquire)) return false;
81 // poll: release mutex, sleep briefly, reacquire
82 // (Io.Condition has no timedWait — poll so stop check isn't starved)
83 w.mutex.unlock(w.io);
84 w.io.sleep(Io.Duration.fromMilliseconds(10), .awake) catch {};
85 w.mutex.lockUncancelable(w.io);
86 }
87
88 w.queue[w.tail] = item;
89 w.tail = @intCast((@as(u32, w.tail) + 1) % @as(u32, w.capacity));
90 w.count += 1;
91 w.cond.signal(w.io);
92 return true;
93 }
94
95 /// drain remaining items and join all worker threads.
96 pub fn shutdown(self: *Self) void {
97 // signal all workers to stop
98 for (self.workers) |*w| {
99 w.mutex.lockUncancelable(w.io);
100 w.alive = false;
101 w.cond.signal(w.io);
102 w.not_full.broadcast(w.io); // wake any blocked submitters
103 w.mutex.unlock(w.io);
104 }
105 // join all threads
106 for (self.workers) |*w| {
107 if (w.thread) |t| {
108 t.join();
109 w.thread = null;
110 }
111 }
112 }
113
114 /// free queue storage.
115 pub fn deinit(self: *Self) void {
116 for (self.workers) |*w| {
117 self.allocator.free(w.queue);
118 }
119 self.allocator.free(self.workers);
120 }
121
122 /// total pending items across all workers (diagnostic).
123 pub fn pendingCount(self: *Self) usize {
124 var total: usize = 0;
125 for (self.workers) |*w| {
126 w.mutex.lockUncancelable(w.io);
127 defer w.mutex.unlock(w.io);
128 total += w.count;
129 }
130 return total;
131 }
132
133 fn workerLoop(w: *Worker) void {
134 while (true) {
135 var item: T = undefined;
136
137 {
138 w.mutex.lockUncancelable(w.io);
139 defer w.mutex.unlock(w.io);
140
141 while (w.count == 0 and w.alive) {
142 w.cond.waitUncancelable(w.io, &w.mutex);
143 }
144
145 if (w.count == 0 and !w.alive) return;
146
147 item = w.queue[w.head];
148 w.head = @intCast((@as(u32, w.head) + 1) % @as(u32, w.capacity));
149 w.count -= 1;
150 w.not_full.signal(w.io); // wake one blocked submitter
151 }
152
153 processFn(&item);
154 }
155 }
156 };
157}
158
159// --- tests ---
160
161const testing = std.testing;
162
163test "basic submit and process" {
164 const Item = struct {
165 value: u32,
166 processed: *std.atomic.Value(u32),
167 };
168
169 const S = struct {
170 fn process(item: *Item) void {
171 _ = item.processed.fetchAdd(item.value, .monotonic);
172 }
173 };
174
175 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
176 var counter: std.atomic.Value(u32) = .{ .raw = 0 };
177 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
178 .num_workers = 2,
179 .queue_capacity = 64,
180 .stack_size = 1 * 1024 * 1024,
181 }, std.testing.io);
182
183 // submit items
184 for (0..10) |i| {
185 const ok = pool.submit(i, .{
186 .value = @intCast(i + 1),
187 .processed = &counter,
188 }, &shutdown);
189 try testing.expect(ok);
190 }
191
192 pool.shutdown();
193 defer pool.deinit();
194
195 // sum of 1..10 = 55
196 try testing.expectEqual(@as(u32, 55), counter.load(.acquire));
197}
198
199test "per-key ordering preserved" {
200 // items with the same key should be processed in FIFO order
201 const Item = struct {
202 seq: u32,
203 results: *std.ArrayListUnmanaged(u32),
204 mutex: *Io.Mutex,
205 allocator: Allocator,
206 io: Io,
207 };
208
209 const S = struct {
210 fn process(item: *Item) void {
211 item.mutex.lockUncancelable(item.io);
212 defer item.mutex.unlock(item.io);
213 item.results.append(item.allocator, item.seq) catch {};
214 }
215 };
216
217 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
218 var results: std.ArrayListUnmanaged(u32) = .empty;
219 defer results.deinit(testing.allocator);
220 var mutex: Io.Mutex = Io.Mutex.init;
221
222 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
223 .num_workers = 4,
224 .queue_capacity = 64,
225 .stack_size = 1 * 1024 * 1024,
226 }, std.testing.io);
227
228 // submit 20 items all with key=42 (same worker)
229 for (0..20) |i| {
230 const ok = pool.submit(42, .{
231 .seq = @intCast(i),
232 .results = &results,
233 .mutex = &mutex,
234 .allocator = testing.allocator,
235 .io = std.testing.io,
236 }, &shutdown);
237 try testing.expect(ok);
238 }
239
240 pool.shutdown();
241 defer pool.deinit();
242
243 try testing.expectEqual(@as(usize, 20), results.items.len);
244 for (results.items, 0..) |val, i| {
245 try testing.expectEqual(@as(u32, @intCast(i)), val);
246 }
247}
248
249test "submit blocks when queue full, succeeds after drain" {
250 const Item = struct {
251 counter: *std.atomic.Value(u32),
252 io: Io,
253 };
254 const S = struct {
255 fn process(item: *Item) void {
256 // slow worker — gives time for queue to fill
257 item.io.sleep(Io.Duration.fromMilliseconds(5), .awake) catch {};
258 _ = item.counter.fetchAdd(1, .monotonic);
259 }
260 };
261
262 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
263 var counter: std.atomic.Value(u32) = .{ .raw = 0 };
264 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
265 .num_workers = 1,
266 .queue_capacity = 4,
267 .stack_size = 1 * 1024 * 1024,
268 }, std.testing.io);
269
270 // submit more items than capacity — submit blocks until slots open
271 for (0..20) |_| {
272 const ok = pool.submit(0, .{ .counter = &counter, .io = std.testing.io }, &shutdown);
273 try testing.expect(ok);
274 }
275
276 pool.shutdown();
277 defer pool.deinit();
278
279 // all 20 should have been processed (none dropped)
280 try testing.expectEqual(@as(u32, 20), counter.load(.acquire));
281}
282
283test "submit returns false on shutdown" {
284 const Item = struct {
285 stop: *std.atomic.Value(bool),
286 io: Io,
287 };
288 const S = struct {
289 fn process(item: *Item) void {
290 // poll until shutdown — allows worker to exit promptly
291 while (!item.stop.load(.acquire)) {
292 item.io.sleep(Io.Duration.fromMilliseconds(5), .awake) catch {};
293 }
294 }
295 };
296
297 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
298 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
299 .num_workers = 1,
300 .queue_capacity = 2,
301 .stack_size = 1 * 1024 * 1024,
302 }, std.testing.io);
303
304 // fill: 1 processing + 2 queued = capacity reached
305 _ = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown);
306 _ = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown);
307 _ = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown);
308
309 // signal shutdown — next submit should return false
310 shutdown.store(true, .release);
311 const ok = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown);
312 try testing.expect(!ok);
313
314 pool.shutdown();
315 defer pool.deinit();
316}
317
318test "pendingCount reflects queued items" {
319 const Item = struct { x: u32, io: Io };
320 const S = struct {
321 fn process(item: *Item) void {
322 // slow worker so items accumulate
323 item.io.sleep(Io.Duration.fromMilliseconds(10), .awake) catch {};
324 }
325 };
326
327 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
328 .num_workers = 1,
329 .queue_capacity = 64,
330 .stack_size = 1 * 1024 * 1024,
331 }, std.testing.io);
332
333 // initially empty
334 try testing.expectEqual(@as(usize, 0), pool.pendingCount());
335
336 pool.shutdown();
337 defer pool.deinit();
338}
339
340test "shutdown drains remaining items" {
341 const Item = struct {
342 counter: *std.atomic.Value(u32),
343 };
344 const S = struct {
345 fn process(item: *Item) void {
346 _ = item.counter.fetchAdd(1, .monotonic);
347 }
348 };
349
350 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
351 var counter: std.atomic.Value(u32) = .{ .raw = 0 };
352 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
353 .num_workers = 2,
354 .queue_capacity = 64,
355 .stack_size = 1 * 1024 * 1024,
356 }, std.testing.io);
357
358 for (0..30) |i| {
359 _ = pool.submit(i, .{ .counter = &counter }, &shutdown);
360 }
361
362 pool.shutdown();
363 defer pool.deinit();
364
365 // all 30 should have been processed (shutdown drains)
366 try testing.expectEqual(@as(u32, 30), counter.load(.acquire));
367}