atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 367 lines 12 kB view raw
1//! generic thread pool with key-partitioned queues 2//! 3//! each worker has its own bounded ring buffer + mutex + condvar. 4//! `submit(key, item, shutdown)` routes to `workers[key % N]` for per-key ordering. 5//! blocks when the target queue is full (backpressure to caller). 6//! items stored by value in pre-allocated ring buffer (zero alloc per submit). 7 8const std = @import("std"); 9const Io = std.Io; 10const Allocator = std.mem.Allocator; 11 12pub fn ThreadPool(comptime T: type, comptime processFn: fn (*T) void) type { 13 return struct { 14 const Self = @This(); 15 16 pub const Config = struct { 17 num_workers: u16 = 8, 18 queue_capacity: u16 = 4096, 19 stack_size: usize = 4 * 1024 * 1024, 20 }; 21 22 const Worker = struct { 23 // ring buffer stored as a slice of T 24 queue: []T, 25 capacity: u16, 26 head: u16 = 0, // next slot to read 27 tail: u16 = 0, // next slot to write 28 count: u16 = 0, 29 mutex: Io.Mutex = Io.Mutex.init, 30 cond: Io.Condition = Io.Condition.init, // "not empty" — workers wait here 31 not_full: Io.Condition = Io.Condition.init, // "not full" — submitters wait here 32 alive: bool = true, 33 thread: ?std.Thread = null, 34 io: Io, 35 }; 36 37 workers: []Worker, 38 allocator: Allocator, 39 io: Io, 40 41 pub fn init(allocator: Allocator, config: Config, io: Io) !Self { 42 const workers = try allocator.alloc(Worker, config.num_workers); 43 for (workers) |*w| { 44 w.* = .{ 45 .queue = try allocator.alloc(T, config.queue_capacity), 46 .capacity = config.queue_capacity, 47 .io = io, 48 }; 49 } 50 51 const self = Self{ 52 .workers = workers, 53 .allocator = allocator, 54 .io = io, 55 }; 56 57 // spawn worker threads 58 for (self.workers) |*w| { 59 w.thread = try std.Thread.spawn( 60 .{ .stack_size = config.stack_size }, 61 workerLoop, 62 .{w}, 63 ); 64 } 65 66 return self; 67 } 68 69 /// submit an item for processing, routed by key. 70 /// blocks if the target worker's queue is full (backpressure). 71 /// returns false only if shutdown was requested while waiting. 72 pub fn submit(self: *Self, key: u64, item: T, stop: *std.atomic.Value(bool)) bool { 73 const idx = key % self.workers.len; 74 const w = &self.workers[idx]; 75 76 w.mutex.lockUncancelable(w.io); 77 defer w.mutex.unlock(w.io); 78 79 while (w.count == w.capacity) { 80 if (stop.load(.acquire)) return false; 81 // poll: release mutex, sleep briefly, reacquire 82 // (Io.Condition has no timedWait — poll so stop check isn't starved) 83 w.mutex.unlock(w.io); 84 w.io.sleep(Io.Duration.fromMilliseconds(10), .awake) catch {}; 85 w.mutex.lockUncancelable(w.io); 86 } 87 88 w.queue[w.tail] = item; 89 w.tail = @intCast((@as(u32, w.tail) + 1) % @as(u32, w.capacity)); 90 w.count += 1; 91 w.cond.signal(w.io); 92 return true; 93 } 94 95 /// drain remaining items and join all worker threads. 96 pub fn shutdown(self: *Self) void { 97 // signal all workers to stop 98 for (self.workers) |*w| { 99 w.mutex.lockUncancelable(w.io); 100 w.alive = false; 101 w.cond.signal(w.io); 102 w.not_full.broadcast(w.io); // wake any blocked submitters 103 w.mutex.unlock(w.io); 104 } 105 // join all threads 106 for (self.workers) |*w| { 107 if (w.thread) |t| { 108 t.join(); 109 w.thread = null; 110 } 111 } 112 } 113 114 /// free queue storage. 115 pub fn deinit(self: *Self) void { 116 for (self.workers) |*w| { 117 self.allocator.free(w.queue); 118 } 119 self.allocator.free(self.workers); 120 } 121 122 /// total pending items across all workers (diagnostic). 123 pub fn pendingCount(self: *Self) usize { 124 var total: usize = 0; 125 for (self.workers) |*w| { 126 w.mutex.lockUncancelable(w.io); 127 defer w.mutex.unlock(w.io); 128 total += w.count; 129 } 130 return total; 131 } 132 133 fn workerLoop(w: *Worker) void { 134 while (true) { 135 var item: T = undefined; 136 137 { 138 w.mutex.lockUncancelable(w.io); 139 defer w.mutex.unlock(w.io); 140 141 while (w.count == 0 and w.alive) { 142 w.cond.waitUncancelable(w.io, &w.mutex); 143 } 144 145 if (w.count == 0 and !w.alive) return; 146 147 item = w.queue[w.head]; 148 w.head = @intCast((@as(u32, w.head) + 1) % @as(u32, w.capacity)); 149 w.count -= 1; 150 w.not_full.signal(w.io); // wake one blocked submitter 151 } 152 153 processFn(&item); 154 } 155 } 156 }; 157} 158 159// --- tests --- 160 161const testing = std.testing; 162 163test "basic submit and process" { 164 const Item = struct { 165 value: u32, 166 processed: *std.atomic.Value(u32), 167 }; 168 169 const S = struct { 170 fn process(item: *Item) void { 171 _ = item.processed.fetchAdd(item.value, .monotonic); 172 } 173 }; 174 175 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 176 var counter: std.atomic.Value(u32) = .{ .raw = 0 }; 177 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 178 .num_workers = 2, 179 .queue_capacity = 64, 180 .stack_size = 1 * 1024 * 1024, 181 }, std.testing.io); 182 183 // submit items 184 for (0..10) |i| { 185 const ok = pool.submit(i, .{ 186 .value = @intCast(i + 1), 187 .processed = &counter, 188 }, &shutdown); 189 try testing.expect(ok); 190 } 191 192 pool.shutdown(); 193 defer pool.deinit(); 194 195 // sum of 1..10 = 55 196 try testing.expectEqual(@as(u32, 55), counter.load(.acquire)); 197} 198 199test "per-key ordering preserved" { 200 // items with the same key should be processed in FIFO order 201 const Item = struct { 202 seq: u32, 203 results: *std.ArrayListUnmanaged(u32), 204 mutex: *Io.Mutex, 205 allocator: Allocator, 206 io: Io, 207 }; 208 209 const S = struct { 210 fn process(item: *Item) void { 211 item.mutex.lockUncancelable(item.io); 212 defer item.mutex.unlock(item.io); 213 item.results.append(item.allocator, item.seq) catch {}; 214 } 215 }; 216 217 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 218 var results: std.ArrayListUnmanaged(u32) = .empty; 219 defer results.deinit(testing.allocator); 220 var mutex: Io.Mutex = Io.Mutex.init; 221 222 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 223 .num_workers = 4, 224 .queue_capacity = 64, 225 .stack_size = 1 * 1024 * 1024, 226 }, std.testing.io); 227 228 // submit 20 items all with key=42 (same worker) 229 for (0..20) |i| { 230 const ok = pool.submit(42, .{ 231 .seq = @intCast(i), 232 .results = &results, 233 .mutex = &mutex, 234 .allocator = testing.allocator, 235 .io = std.testing.io, 236 }, &shutdown); 237 try testing.expect(ok); 238 } 239 240 pool.shutdown(); 241 defer pool.deinit(); 242 243 try testing.expectEqual(@as(usize, 20), results.items.len); 244 for (results.items, 0..) |val, i| { 245 try testing.expectEqual(@as(u32, @intCast(i)), val); 246 } 247} 248 249test "submit blocks when queue full, succeeds after drain" { 250 const Item = struct { 251 counter: *std.atomic.Value(u32), 252 io: Io, 253 }; 254 const S = struct { 255 fn process(item: *Item) void { 256 // slow worker — gives time for queue to fill 257 item.io.sleep(Io.Duration.fromMilliseconds(5), .awake) catch {}; 258 _ = item.counter.fetchAdd(1, .monotonic); 259 } 260 }; 261 262 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 263 var counter: std.atomic.Value(u32) = .{ .raw = 0 }; 264 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 265 .num_workers = 1, 266 .queue_capacity = 4, 267 .stack_size = 1 * 1024 * 1024, 268 }, std.testing.io); 269 270 // submit more items than capacity — submit blocks until slots open 271 for (0..20) |_| { 272 const ok = pool.submit(0, .{ .counter = &counter, .io = std.testing.io }, &shutdown); 273 try testing.expect(ok); 274 } 275 276 pool.shutdown(); 277 defer pool.deinit(); 278 279 // all 20 should have been processed (none dropped) 280 try testing.expectEqual(@as(u32, 20), counter.load(.acquire)); 281} 282 283test "submit returns false on shutdown" { 284 const Item = struct { 285 stop: *std.atomic.Value(bool), 286 io: Io, 287 }; 288 const S = struct { 289 fn process(item: *Item) void { 290 // poll until shutdown — allows worker to exit promptly 291 while (!item.stop.load(.acquire)) { 292 item.io.sleep(Io.Duration.fromMilliseconds(5), .awake) catch {}; 293 } 294 } 295 }; 296 297 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 298 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 299 .num_workers = 1, 300 .queue_capacity = 2, 301 .stack_size = 1 * 1024 * 1024, 302 }, std.testing.io); 303 304 // fill: 1 processing + 2 queued = capacity reached 305 _ = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown); 306 _ = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown); 307 _ = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown); 308 309 // signal shutdown — next submit should return false 310 shutdown.store(true, .release); 311 const ok = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown); 312 try testing.expect(!ok); 313 314 pool.shutdown(); 315 defer pool.deinit(); 316} 317 318test "pendingCount reflects queued items" { 319 const Item = struct { x: u32, io: Io }; 320 const S = struct { 321 fn process(item: *Item) void { 322 // slow worker so items accumulate 323 item.io.sleep(Io.Duration.fromMilliseconds(10), .awake) catch {}; 324 } 325 }; 326 327 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 328 .num_workers = 1, 329 .queue_capacity = 64, 330 .stack_size = 1 * 1024 * 1024, 331 }, std.testing.io); 332 333 // initially empty 334 try testing.expectEqual(@as(usize, 0), pool.pendingCount()); 335 336 pool.shutdown(); 337 defer pool.deinit(); 338} 339 340test "shutdown drains remaining items" { 341 const Item = struct { 342 counter: *std.atomic.Value(u32), 343 }; 344 const S = struct { 345 fn process(item: *Item) void { 346 _ = item.counter.fetchAdd(1, .monotonic); 347 } 348 }; 349 350 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 351 var counter: std.atomic.Value(u32) = .{ .raw = 0 }; 352 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 353 .num_workers = 2, 354 .queue_capacity = 64, 355 .stack_size = 1 * 1024 * 1024, 356 }, std.testing.io); 357 358 for (0..30) |i| { 359 _ = pool.submit(i, .{ .counter = &counter }, &shutdown); 360 } 361 362 pool.shutdown(); 363 defer pool.deinit(); 364 365 // all 30 should have been processed (shutdown drains) 366 try testing.expectEqual(@as(u32, 30), counter.load(.acquire)); 367}