atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

port I/O-bound thread spawns to io.concurrent

phase 2 of the 0.16 migration — long-lived I/O loops now run as
std.Io concurrent tasks instead of OS threads. CPU-bound frame
worker pool stays on std.Thread.

- subscriber.zig: ping thread → ping future
- slurper.zig: per-host workers, startup, crawl queue → futures
- broadcaster.zig: per-consumer write loop → future
- validator.zig: resolver threads → resolver futures
- event_log.zig: flush thread → flush future
- main.zig: GC loop, metrics server → futures
- backfill.zig, resync.zig, cleaner.zig: thread → future
- all shutdown paths: thread.join() → future.cancel(io)

remaining std.Thread.spawn (intentional):
- thread_pool.zig: CPU-bound keyed frame workers
- broadcaster.zig: fire-and-forget slow consumer cleanup (detached)
- broadcaster.zig: test helper

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+60 -68
+4 -4
src/backfill.zig
··· 26 26 collection_index: *collection_index_mod.CollectionIndex, 27 27 db: *pg.Pool, 28 28 running: std.atomic.Value(bool), 29 - thread: ?std.Thread, 29 + future: ?Io.Future(void), 30 30 source: []const u8, 31 31 io: Io, 32 32 ··· 41 41 .collection_index = collection_index, 42 42 .db = db, 43 43 .running = .{ .raw = false }, 44 - .thread = null, 44 + .future = null, 45 45 .source = "", 46 46 .io = io, 47 47 }; ··· 59 59 errdefer self.running.store(false, .release); 60 60 61 61 self.source = try self.allocator.dupe(u8, source); 62 - self.thread = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, run, .{self}); 62 + self.future = try self.io.concurrent(run, .{self}); 63 63 } 64 64 65 65 fn run(self: *Backfiller) void { 66 66 defer { 67 67 self.allocator.free(self.source); 68 68 self.source = ""; 69 - self.thread = null; 69 + self.future = null; 70 70 self.running.store(false, .release); 71 71 } 72 72
+4 -4
src/broadcaster.zig
··· 248 248 alive: std.atomic.Value(bool) = .{ .raw = true }, 249 249 mutex: Io.Mutex = Io.Mutex.init, 250 250 cond: Io.Condition = Io.Condition.init, 251 - thread: ?std.Thread = null, 251 + future: ?Io.Future(void) = null, 252 252 last_send_time: i128 = 0, 253 253 io: Io, 254 254 ··· 346 346 pub fn shutdown(self: *Consumer) void { 347 347 self.alive.store(false, .release); 348 348 self.cond.signal(self.io); 349 - if (self.thread) |t| t.join(); 350 - self.thread = null; 349 + if (self.future) |*f| f.cancel(self.io); 350 + self.future = null; 351 351 } 352 352 }; 353 353 ··· 408 408 .allocator = self.allocator, 409 409 .io = self.io, 410 410 }; 411 - consumer.thread = std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, Consumer.writeLoop, .{consumer}) catch { 411 + consumer.future = self.io.concurrent(Consumer.writeLoop, .{consumer}) catch { 412 412 self.allocator.destroy(consumer); 413 413 return error.ThreadSpawnFailed; 414 414 };
+4 -4
src/cleaner.zig
··· 18 18 collection_index: *collection_index_mod.CollectionIndex, 19 19 db: *pg.Pool, 20 20 running: std.atomic.Value(bool), 21 - thread: ?std.Thread, 21 + future: ?Io.Future(void), 22 22 scanned: std.atomic.Value(u64), 23 23 removed: std.atomic.Value(u64), 24 24 ··· 34 34 .collection_index = collection_index, 35 35 .db = db, 36 36 .running = .{ .raw = false }, 37 - .thread = null, 37 + .future = null, 38 38 .scanned = .{ .raw = 0 }, 39 39 .removed = .{ .raw = 0 }, 40 40 }; ··· 53 53 54 54 self.scanned.store(0, .release); 55 55 self.removed.store(0, .release); 56 - self.thread = try std.Thread.spawn(.{}, run, .{self}); 56 + self.future = try self.io.concurrent(run, .{self}); 57 57 } 58 58 59 59 fn run(self: *Cleaner) void { 60 60 defer { 61 - self.thread = null; 61 + self.future = null; 62 62 self.running.store(false, .release); 63 63 } 64 64
+5 -5
src/event_log.zig
··· 103 103 evtbuf: std.ArrayListUnmanaged(PersistJob) = .empty, 104 104 mutex: Io.Mutex = Io.Mutex.init, 105 105 106 - // flush thread 107 - flush_thread: ?std.Thread = null, 106 + // flush task 107 + flush_future: ?Io.Future(void) = null, 108 108 alive: std.atomic.Value(bool) = .{ .raw = true }, 109 109 110 110 io: Io, ··· 253 253 } 254 254 255 255 pub fn deinit(self: *DiskPersist) void { 256 - // stop flush thread (loop checks alive after each sleep interval) 256 + // stop flush task (loop checks alive after each sleep interval) 257 257 self.alive.store(false, .release); 258 - if (self.flush_thread) |t| t.join(); 258 + if (self.flush_future) |*f| f.cancel(self.io); 259 259 260 260 // flush remaining 261 261 self.mutex.lockUncancelable(self.io); ··· 278 278 279 279 /// start the background flush thread 280 280 pub fn start(self: *DiskPersist) !void { 281 - self.flush_thread = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, flushLoop, .{self}); 281 + self.flush_future = try self.io.concurrent(flushLoop, .{self}); 282 282 } 283 283 284 284 pub const UidResult = struct {
+7 -7
src/main.zig
··· 250 250 // start: loads active hosts from DB, spawns subscriber threads 251 251 try slurper.start(); 252 252 253 - // start GC loop (runs as background thread — does disk I/O + malloc_trim) 254 - const gc_thread = try std.Thread.spawn(.{}, gcLoop, .{ &dp, io }); 253 + // start GC loop (runs as background task — does disk I/O + malloc_trim) 254 + var gc_future = try io.concurrent(gcLoop, .{ &dp, io }); 255 255 256 256 // wire HTTP fallback into broadcaster (all API endpoints served on WS port) 257 257 var http_context = api.HttpContext{ ··· 283 283 .bc = &bc, 284 284 .slurper = &slurper, 285 285 }; 286 - const metrics_thread = try std.Thread.spawn(.{}, MetricsServer.run, .{&metrics_srv}); 286 + var metrics_future = try io.concurrent(MetricsServer.run, .{&metrics_srv}); 287 287 288 288 // start downstream WebSocket server (also serves HTTP API via httpFallback) 289 289 log.info("relay listening on :{d} (ws+http), :{d} (metrics)", .{ port, metrics_port }); ··· 311 311 server.stop(); 312 312 server_thread.join(); 313 313 314 - // wait for GC thread 315 - gc_thread.join(); 314 + // cancel GC task 315 + gc_future.cancel(io); 316 316 317 - // close metrics listener to unblock accept(), then join 317 + // close metrics listener to unblock accept(), then cancel task 318 318 metrics_srv.server.deinit(io); 319 - metrics_thread.join(); 319 + metrics_future.cancel(io); 320 320 321 321 log.info("relay stopped cleanly", .{}); 322 322 }
+8 -12
src/resync.zig
··· 46 46 cond: Io.Condition, 47 47 48 48 running: std.atomic.Value(bool), 49 - thread: ?std.Thread, 49 + future: ?Io.Future(void), 50 50 51 51 // stats 52 52 processed: std.atomic.Value(u64), ··· 69 69 .mutex = Io.Mutex.init, 70 70 .cond = Io.Condition.init, 71 71 .running = .{ .raw = false }, 72 - .thread = null, 72 + .future = null, 73 73 .processed = .{ .raw = 0 }, 74 74 .failed = .{ .raw = 0 }, 75 75 .dropped = .{ .raw = 0 }, ··· 79 79 /// start the background worker thread. 80 80 pub fn start(self: *Resyncer) !void { 81 81 self.running.store(true, .release); 82 - self.thread = try std.Thread.spawn( 83 - .{}, 84 - run, 85 - .{self}, 86 - ); 82 + self.future = try self.io.concurrent(run, .{self}); 87 83 } 88 84 89 85 /// enqueue a DID for resync. non-blocking, drops if queue full or inputs too long. ··· 232 228 233 229 pub fn deinit(self: *Resyncer) void { 234 230 self.stop(); 235 - if (self.thread) |t| t.join(); 236 - self.thread = null; 231 + if (self.future) |*f| f.cancel(self.io); 232 + self.future = null; 237 233 } 238 234 239 235 const DescribeRepoResponse = struct { ··· 256 252 .mutex = Io.Mutex.init, 257 253 .cond = Io.Condition.init, 258 254 .running = .{ .raw = true }, 259 - .thread = null, 255 + .future = null, 260 256 .processed = .{ .raw = 0 }, 261 257 .failed = .{ .raw = 0 }, 262 258 .dropped = .{ .raw = 0 }, ··· 284 280 .mutex = Io.Mutex.init, 285 281 .cond = Io.Condition.init, 286 282 .running = .{ .raw = true }, 287 - .thread = null, 283 + .future = null, 288 284 .processed = .{ .raw = 0 }, 289 285 .failed = .{ .raw = 0 }, 290 286 .dropped = .{ .raw = 0 }, ··· 306 302 .mutex = Io.Mutex.init, 307 303 .cond = Io.Condition.init, 308 304 .running = .{ .raw = true }, 309 - .thread = null, 305 + .future = null, 310 306 .processed = .{ .raw = 0 }, 311 307 .failed = .{ .raw = 0 }, 312 308 .dropped = .{ .raw = 0 },
+17 -17
src/slurper.zig
··· 213 213 } 214 214 215 215 const WorkerEntry = struct { 216 - thread: std.Thread, 216 + future: Io.Future(void), 217 217 subscriber: *subscriber_mod.Subscriber, 218 218 }; 219 219 ··· 242 242 crawl_mutex: Io.Mutex = Io.Mutex.init, 243 243 crawl_cond: Io.Condition = Io.Condition.init, 244 244 245 - // background threads 246 - startup_thread: ?std.Thread = null, 247 - crawl_thread: ?std.Thread = null, 245 + // background tasks 246 + startup_future: ?Io.Future(void) = null, 247 + crawl_future: ?Io.Future(void) = null, 248 248 249 249 io: Io, 250 250 ··· 287 287 288 288 // spawn worker startup in background so HTTP server + probes come up immediately. 289 289 // pullHosts + listActiveHosts + spawnWorker all happen in the background thread. 290 - self.startup_thread = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, spawnWorkers, .{self}); 291 - self.crawl_thread = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, processCrawlQueue, .{self}); 290 + self.startup_future = try self.io.concurrent(spawnWorkers, .{self}); 291 + self.crawl_future = try self.io.concurrent(processCrawlQueue, .{self}); 292 292 } 293 293 294 294 /// pull PDS host list from the seed relay's com.atproto.sync.listHosts endpoint. ··· 477 477 sub.resyncer = self.resyncer; 478 478 if (self.frame_pool) |*fp| sub.pool = fp; 479 479 480 - const thread = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, runWorker, .{ self, host_id, sub }); 480 + const future = try self.io.concurrent(runWorker, .{ self, host_id, sub }); 481 481 482 482 self.workers_mutex.lockUncancelable(self.io); 483 483 defer self.workers_mutex.unlock(self.io); 484 484 try self.workers.put(self.allocator, host_id, .{ 485 - .thread = thread, 485 + .future = future, 486 486 .subscriber = sub, 487 487 }); 488 488 _ = self.bc.stats.connected_inbound.fetchAdd(1, .monotonic); ··· 590 590 591 591 /// shutdown all workers and clean up 592 592 pub fn deinit(self: *Slurper) void { 593 - // join background threads 594 - if (self.startup_thread) |t| t.join(); 593 + // cancel background tasks 594 + if (self.startup_future) |*f| f.cancel(self.io); 595 595 self.crawl_cond.signal(self.io); 596 - if (self.crawl_thread) |t| t.join(); 596 + if (self.crawl_future) |*f| f.cancel(self.io); 597 597 598 - // collect threads to join (can't join while holding workers_mutex) 599 - var threads_to_join: std.ArrayListUnmanaged(std.Thread) = .empty; 600 - defer threads_to_join.deinit(self.allocator); 598 + // collect futures to cancel (can't cancel while holding workers_mutex) 599 + var futures_to_cancel: std.ArrayListUnmanaged(Io.Future(void)) = .empty; 600 + defer futures_to_cancel.deinit(self.allocator); 601 601 602 602 { 603 603 self.workers_mutex.lockUncancelable(self.io); 604 604 defer self.workers_mutex.unlock(self.io); 605 605 var it = self.workers.iterator(); 606 606 while (it.next()) |entry| { 607 - threads_to_join.append(self.allocator, entry.value_ptr.thread) catch {}; 607 + futures_to_cancel.append(self.allocator, entry.value_ptr.future) catch {}; 608 608 } 609 609 } 610 610 611 - // join all reader threads FIRST (they stop submitting to pool) 612 - for (threads_to_join.items) |t| t.join(); 611 + // cancel all subscriber tasks FIRST (they stop submitting to pool) 612 + for (futures_to_cancel.items) |*f| f.cancel(self.io); 613 613 614 614 // then drain + join pool workers (processes remaining queued frames) 615 615 if (self.frame_pool) |*fp| {
+4 -8
src/subscriber.zig
··· 351 351 .subscriber = self, 352 352 }; 353 353 354 - // spawn keepalive ping thread (Go relay: 30s ticker goroutine in consumer.go) 354 + // spawn keepalive ping task (Go relay: 30s ticker goroutine in consumer.go) 355 355 // prevents intermediate proxies (e.g. Cloudflare) from killing idle connections 356 - const ping_thread = std.Thread.spawn( 357 - .{ .stack_size = @import("main.zig").default_stack_size }, 358 - pingLoop, 359 - .{ &client, self }, 360 - ) catch |err| { 361 - log.warn("host {s}: failed to spawn ping thread: {s}", .{ self.options.hostname, @errorName(err) }); 356 + var ping_future = self.io.concurrent(pingLoop, .{ &client, self }) catch |err| { 357 + log.warn("host {s}: failed to spawn ping task: {s}", .{ self.options.hostname, @errorName(err) }); 362 358 return err; 363 359 }; 364 360 365 - defer ping_thread.join(); 361 + defer ping_future.cancel(self.io); 366 362 try client.readLoop(&handler); 367 363 } 368 364
+7 -7
src/validator.zig
··· 55 55 queued_set: std.StringHashMapUnmanaged(void) = .empty, 56 56 queue_mutex: Io.Mutex = Io.Mutex.init, 57 57 queue_cond: Io.Condition = Io.Condition.init, 58 - resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, 58 + resolver_futures: [max_resolver_threads]?Io.Future(void) = .{null} ** max_resolver_threads, 59 59 alive: std.atomic.Value(bool) = .{ .raw = true }, 60 60 max_cache_size: u32 = 250_000, 61 61 io: Io, ··· 88 88 pub fn deinit(self: *Validator) void { 89 89 self.alive.store(false, .release); 90 90 self.queue_cond.broadcast(self.io); 91 - for (&self.resolver_threads) |*t| { 92 - if (t.*) |thread| { 93 - thread.join(); 94 - t.* = null; 91 + for (&self.resolver_futures) |*slot| { 92 + if (slot.*) |*f| { 93 + f.cancel(self.io); 95 94 } 95 + slot.* = null; 96 96 } 97 97 98 98 if (self.host_resolver_inited) { ··· 118 118 self.cache.capacity = self.max_cache_size; 119 119 const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads); 120 120 const count = @min(n, max_resolver_threads); 121 - for (self.resolver_threads[0..count]) |*t| { 122 - t.* = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, resolveLoop, .{self}); 121 + for (self.resolver_futures[0..count]) |*slot| { 122 + slot.* = try self.io.concurrent(resolveLoop, .{self}); 123 123 } 124 124 125 125 // init host authority resolver pool (reused across calls)