//! generic thread pool with key-partitioned queues //! //! each worker has its own bounded ring buffer + mutex + condvar. //! `submit(key, item, shutdown)` routes to `workers[key % N]` for per-key ordering. //! blocks when the target queue is full (backpressure to caller). //! items stored by value in pre-allocated ring buffer (zero alloc per submit). const std = @import("std"); const Io = std.Io; const Allocator = std.mem.Allocator; pub fn ThreadPool(comptime T: type, comptime processFn: fn (*T) void) type { return struct { const Self = @This(); pub const Config = struct { num_workers: u16 = 8, queue_capacity: u16 = 4096, stack_size: usize = 4 * 1024 * 1024, }; const Worker = struct { // ring buffer stored as a slice of T queue: []T, capacity: u16, head: u16 = 0, // next slot to read tail: u16 = 0, // next slot to write count: u16 = 0, mutex: Io.Mutex = Io.Mutex.init, cond: Io.Condition = Io.Condition.init, // "not empty" — workers wait here not_full: Io.Condition = Io.Condition.init, // "not full" — submitters wait here alive: bool = true, thread: ?std.Thread = null, io: Io, }; workers: []Worker, allocator: Allocator, io: Io, pub fn init(allocator: Allocator, config: Config, io: Io) !Self { const workers = try allocator.alloc(Worker, config.num_workers); for (workers) |*w| { w.* = .{ .queue = try allocator.alloc(T, config.queue_capacity), .capacity = config.queue_capacity, .io = io, }; } const self = Self{ .workers = workers, .allocator = allocator, .io = io, }; // spawn worker threads for (self.workers) |*w| { w.thread = try std.Thread.spawn( .{ .stack_size = config.stack_size }, workerLoop, .{w}, ); } return self; } /// submit an item for processing, routed by key. /// blocks if the target worker's queue is full (backpressure). /// returns false only if shutdown was requested while waiting. pub fn submit(self: *Self, key: u64, item: T, stop: *std.atomic.Value(bool)) bool { const idx = key % self.workers.len; const w = &self.workers[idx]; w.mutex.lockUncancelable(w.io); defer w.mutex.unlock(w.io); while (w.count == w.capacity) { if (stop.load(.acquire)) return false; // poll: release mutex, sleep briefly, reacquire // (Io.Condition has no timedWait — poll so stop check isn't starved) w.mutex.unlock(w.io); w.io.sleep(Io.Duration.fromMilliseconds(10), .awake) catch {}; w.mutex.lockUncancelable(w.io); } w.queue[w.tail] = item; w.tail = @intCast((@as(u32, w.tail) + 1) % @as(u32, w.capacity)); w.count += 1; w.cond.signal(w.io); return true; } /// drain remaining items and join all worker threads. pub fn shutdown(self: *Self) void { // signal all workers to stop for (self.workers) |*w| { w.mutex.lockUncancelable(w.io); w.alive = false; w.cond.signal(w.io); w.not_full.broadcast(w.io); // wake any blocked submitters w.mutex.unlock(w.io); } // join all threads for (self.workers) |*w| { if (w.thread) |t| { t.join(); w.thread = null; } } } /// free queue storage. pub fn deinit(self: *Self) void { for (self.workers) |*w| { self.allocator.free(w.queue); } self.allocator.free(self.workers); } /// total pending items across all workers (diagnostic). pub fn pendingCount(self: *Self) usize { var total: usize = 0; for (self.workers) |*w| { w.mutex.lockUncancelable(w.io); defer w.mutex.unlock(w.io); total += w.count; } return total; } fn workerLoop(w: *Worker) void { while (true) { var item: T = undefined; { w.mutex.lockUncancelable(w.io); defer w.mutex.unlock(w.io); while (w.count == 0 and w.alive) { w.cond.waitUncancelable(w.io, &w.mutex); } if (w.count == 0 and !w.alive) return; item = w.queue[w.head]; w.head = @intCast((@as(u32, w.head) + 1) % @as(u32, w.capacity)); w.count -= 1; w.not_full.signal(w.io); // wake one blocked submitter } processFn(&item); } } }; } // --- tests --- const testing = std.testing; test "basic submit and process" { const Item = struct { value: u32, processed: *std.atomic.Value(u32), }; const S = struct { fn process(item: *Item) void { _ = item.processed.fetchAdd(item.value, .monotonic); } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var counter: std.atomic.Value(u32) = .{ .raw = 0 }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 2, .queue_capacity = 64, .stack_size = 1 * 1024 * 1024, }, std.testing.io); // submit items for (0..10) |i| { const ok = pool.submit(i, .{ .value = @intCast(i + 1), .processed = &counter, }, &shutdown); try testing.expect(ok); } pool.shutdown(); defer pool.deinit(); // sum of 1..10 = 55 try testing.expectEqual(@as(u32, 55), counter.load(.acquire)); } test "per-key ordering preserved" { // items with the same key should be processed in FIFO order const Item = struct { seq: u32, results: *std.ArrayListUnmanaged(u32), mutex: *Io.Mutex, allocator: Allocator, io: Io, }; const S = struct { fn process(item: *Item) void { item.mutex.lockUncancelable(item.io); defer item.mutex.unlock(item.io); item.results.append(item.allocator, item.seq) catch {}; } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var results: std.ArrayListUnmanaged(u32) = .empty; defer results.deinit(testing.allocator); var mutex: Io.Mutex = Io.Mutex.init; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 4, .queue_capacity = 64, .stack_size = 1 * 1024 * 1024, }, std.testing.io); // submit 20 items all with key=42 (same worker) for (0..20) |i| { const ok = pool.submit(42, .{ .seq = @intCast(i), .results = &results, .mutex = &mutex, .allocator = testing.allocator, .io = std.testing.io, }, &shutdown); try testing.expect(ok); } pool.shutdown(); defer pool.deinit(); try testing.expectEqual(@as(usize, 20), results.items.len); for (results.items, 0..) |val, i| { try testing.expectEqual(@as(u32, @intCast(i)), val); } } test "submit blocks when queue full, succeeds after drain" { const Item = struct { counter: *std.atomic.Value(u32), io: Io, }; const S = struct { fn process(item: *Item) void { // slow worker — gives time for queue to fill item.io.sleep(Io.Duration.fromMilliseconds(5), .awake) catch {}; _ = item.counter.fetchAdd(1, .monotonic); } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var counter: std.atomic.Value(u32) = .{ .raw = 0 }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 1, .queue_capacity = 4, .stack_size = 1 * 1024 * 1024, }, std.testing.io); // submit more items than capacity — submit blocks until slots open for (0..20) |_| { const ok = pool.submit(0, .{ .counter = &counter, .io = std.testing.io }, &shutdown); try testing.expect(ok); } pool.shutdown(); defer pool.deinit(); // all 20 should have been processed (none dropped) try testing.expectEqual(@as(u32, 20), counter.load(.acquire)); } test "submit returns false on shutdown" { const Item = struct { stop: *std.atomic.Value(bool), io: Io, }; const S = struct { fn process(item: *Item) void { // poll until shutdown — allows worker to exit promptly while (!item.stop.load(.acquire)) { item.io.sleep(Io.Duration.fromMilliseconds(5), .awake) catch {}; } } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 1, .queue_capacity = 2, .stack_size = 1 * 1024 * 1024, }, std.testing.io); // fill: 1 processing + 2 queued = capacity reached _ = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown); _ = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown); _ = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown); // signal shutdown — next submit should return false shutdown.store(true, .release); const ok = pool.submit(0, .{ .stop = &shutdown, .io = std.testing.io }, &shutdown); try testing.expect(!ok); pool.shutdown(); defer pool.deinit(); } test "pendingCount reflects queued items" { const Item = struct { x: u32, io: Io }; const S = struct { fn process(item: *Item) void { // slow worker so items accumulate item.io.sleep(Io.Duration.fromMilliseconds(10), .awake) catch {}; } }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 1, .queue_capacity = 64, .stack_size = 1 * 1024 * 1024, }, std.testing.io); // initially empty try testing.expectEqual(@as(usize, 0), pool.pendingCount()); pool.shutdown(); defer pool.deinit(); } test "shutdown drains remaining items" { const Item = struct { counter: *std.atomic.Value(u32), }; const S = struct { fn process(item: *Item) void { _ = item.counter.fetchAdd(1, .monotonic); } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var counter: std.atomic.Value(u32) = .{ .raw = 0 }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 2, .queue_capacity = 64, .stack_size = 1 * 1024 * 1024, }, std.testing.io); for (0..30) |i| { _ = pool.submit(i, .{ .counter = &counter }, &shutdown); } pool.shutdown(); defer pool.deinit(); // all 30 should have been processed (shutdown drains) try testing.expectEqual(@as(u32, 30), counter.load(.acquire)); }