atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

evented backend with single ordered publication path

flip backend from Io.Threaded to Io.Evented. frame workers run on a
dedicated Io.Threaded (pool_io) for CPU-heavy decode/validate, then
hand off to the broadcaster fiber via an MPSC ring buffer.

key design: one publication path for all relay-sequenced events.
workers, subscriber inline, and admin all use the same pattern:
persist_order spinlock → dp.persist → resequence → queue.push.
the broadcaster fiber drains FIFO (= seq order) and fans out.

persist_order is an atomic spinlock (not Io.Mutex) so both Threaded
workers and Evented admin can participate — Io.Mutex futex
implementations are incompatible across domains.

- DiskPersist initialized with pool_io (workers are its callers)
- BroadcastQueue: lossless spin-wait push (matches Indigo semantics)
- admin ban: same ordered path as workers, no direct broadcast()
- pool_io wired to HttpContext for future cross-domain use

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 39134d1d 16872782

+254 -96
+4 -4
build.zig.zon
··· 5 5 .minimum_zig_version = "0.16.0", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.org/zat.dev/zat/archive/v0.3.0-alpha.11.tar.gz", 9 - .hash = "zat-0.3.0-alpha.11-5PuC7nVhBQCNUnJEi_YUqQK6V8bbZacA-QN54nUunu4K", 8 + .url = "https://tangled.org/zat.dev/zat/archive/v0.3.0-alpha.15.tar.gz", 9 + .hash = "zat-0.3.0-alpha.15-5PuC7nVhBQCJEzz9LuzSbtLb68Wd0x_yjDgTP3EqV8dH", 10 10 }, 11 11 .websocket = .{ 12 - .url = "https://github.com/zzstoatzz/websocket.zig/archive/104608b.tar.gz", 13 - .hash = "websocket-0.1.0-ZPISdXjUAwC3rN7rT5NMG8HQJRug1NOboVWeX09SvSGv", 12 + .url = "https://github.com/zzstoatzz/websocket.zig/archive/ac3df25.tar.gz", 13 + .hash = "websocket-0.1.0-ZPISdUvvAwDQN3W3AYDxmzMj5ipuTnB3vpQinQPF9LqI", 14 14 }, 15 15 .pg = .{ 16 16 .url = "git+https://github.com/zzstoatzz/pg.zig?ref=dev#5ce2355b1d851075523709c7d3068dcdb0224322",
+16 -2
src/api/admin.zig
··· 74 74 log.debug("collection removeAll after ban failed: {s}", .{@errorName(err)}); 75 75 }; 76 76 77 - // emit #account event so downstream consumers see the takedown 77 + // emit #account event — same ordered publication path as workers: 78 + // persist_order spinlock → dp.persist → resequence → queue.push. 79 + // one publication path for all relay-sequenced events. 78 80 if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 81 + while (ctx.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 82 + std.atomic.spinLoopHint(); 83 + } 84 + 79 85 if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { 80 86 ctx.bc.stats.relay_seq.store(relay_seq, .release); 81 87 const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 82 - ctx.bc.broadcast(relay_seq, broadcast_data); 88 + const owned = ctx.persist.allocator.dupe(u8, broadcast_data) catch { 89 + ctx.bc.persist_order.store(0, .release); 90 + log.warn("admin: failed to alloc broadcast data for {s}", .{did}); 91 + h.respondJson(conn, .ok, "{\"success\":true}"); 92 + return; 93 + }; 94 + ctx.bc.broadcast_queue.push(relay_seq, owned); 95 + ctx.bc.persist_order.store(0, .release); 83 96 log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 84 97 } else |err| { 98 + ctx.bc.persist_order.store(0, .release); 85 99 log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); 86 100 } 87 101 }
+2
src/api/router.zig
··· 4 4 //! top-level request router that delegates to xrpc and admin handler modules. 5 5 6 6 const std = @import("std"); 7 + const Io = std.Io; 7 8 const websocket = @import("websocket"); 8 9 const broadcaster = @import("../broadcaster.zig"); 9 10 const validator_mod = @import("../validator.zig"); ··· 28 29 resyncer: *resync_mod.Resyncer, 29 30 bc: *broadcaster.Broadcaster, 30 31 validator: *validator_mod.Validator, 32 + pool_io: Io, 31 33 }; 32 34 33 35 /// top-level HTTP request router — installed as bc.http_fallback
+4
src/backfill.zig
··· 59 59 errdefer self.running.store(false, .release); 60 60 61 61 self.source = try self.allocator.dupe(u8, source); 62 + errdefer { 63 + self.allocator.free(self.source); 64 + self.source = ""; 65 + } 62 66 self.future = try self.io.concurrent(run, .{self}); 63 67 } 64 68
+122 -18
src/broadcaster.zig
··· 231 231 return result; 232 232 } 233 233 234 + // --- broadcast queue (worker → fiber handoff) --- 235 + 236 + /// item produced by frame workers, consumed by the broadcaster fiber. 237 + /// data is heap-allocated by the worker; the broadcaster fiber frees it after broadcast. 238 + pub const BroadcastItem = struct { 239 + seq: u64, 240 + data: []const u8, 241 + }; 242 + 243 + /// MPSC ring buffer: multiple frame worker threads push, one broadcaster fiber pops. 244 + /// uses an atomic spinlock for push (no Io.Mutex — works from any execution context). 245 + /// pop is single-consumer (broadcaster fiber only), no locking needed. 246 + pub const BroadcastQueue = struct { 247 + const CAPACITY = 8192; 248 + 249 + items: [CAPACITY]BroadcastItem = undefined, 250 + head: std.atomic.Value(u32) = .{ .raw = 0 }, 251 + tail: std.atomic.Value(u32) = .{ .raw = 0 }, 252 + push_lock: std.atomic.Value(u32) = .{ .raw = 0 }, 253 + allocator: Allocator, 254 + 255 + pub fn init(allocator: Allocator) BroadcastQueue { 256 + return .{ .allocator = allocator }; 257 + } 258 + 259 + /// push an item (called by worker threads). spins until space is available. 260 + /// matches Indigo semantics: every persisted event reaches live broadcast. 261 + /// the broadcaster fiber drains at memory speed (no I/O), so spin is brief. 262 + pub fn push(self: *BroadcastQueue, seq: u64, data: []const u8) void { 263 + while (true) { 264 + // acquire spinlock 265 + while (self.push_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 266 + std.atomic.spinLoopHint(); 267 + } 268 + 269 + const tail = self.tail.load(.monotonic); 270 + const next_tail = (tail + 1) % CAPACITY; 271 + if (next_tail == self.head.load(.acquire)) { 272 + // full — release lock, yield, retry 273 + self.push_lock.store(0, .release); 274 + std.atomic.spinLoopHint(); 275 + continue; 276 + } 277 + 278 + self.items[tail] = .{ .seq = seq, .data = data }; 279 + self.tail.store(next_tail, .release); 280 + self.push_lock.store(0, .release); 281 + return; 282 + } 283 + } 284 + 285 + /// pop an item (called by broadcaster fiber only). returns null if empty. 286 + pub fn pop(self: *BroadcastQueue) ?BroadcastItem { 287 + const head = self.head.load(.monotonic); 288 + if (head == self.tail.load(.acquire)) return null; // empty 289 + 290 + const item = self.items[head]; 291 + self.head.store((head + 1) % CAPACITY, .release); 292 + return item; 293 + } 294 + }; 295 + 234 296 // --- consumer --- 235 297 236 298 const ping_interval_ns: u64 = 30 * std.time.ns_per_s; ··· 370 432 allocator: Allocator, 371 433 consumers: std.ArrayListUnmanaged(*Consumer) = .empty, 372 434 consumers_mutex: Io.Mutex = Io.Mutex.init, 373 - broadcast_order: Io.Mutex = Io.Mutex.init, 435 + /// ordering spinlock for persist → queue push atomicity. 436 + /// atomic spinlock (not Io.Mutex) so both Threaded workers and Evented admin 437 + /// can participate in the same ordered publication path. 438 + persist_order: std.atomic.Value(u32) = .{ .raw = 0 }, 439 + /// worker → fiber handoff: frame workers push here, broadcaster fiber pops. 440 + broadcast_queue: BroadcastQueue, 374 441 history: FrameHistory, 375 442 persist: ?*event_log_mod.DiskPersist = null, 376 443 stats: Stats = .{}, ··· 378 445 http_fallback: ?HttpFallbackFn = null, 379 446 http_fallback_ctx: ?*anyopaque = null, 380 447 io: Io, 448 + shutdown: *std.atomic.Value(bool), 381 449 382 - pub fn init(allocator: Allocator, io: Io) Broadcaster { 450 + pub fn init(allocator: Allocator, io: Io, shutdown: *std.atomic.Value(bool)) Broadcaster { 383 451 return .{ 384 452 .allocator = allocator, 453 + .broadcast_queue = BroadcastQueue.init(allocator), 385 454 .history = FrameHistory.init(allocator, io), 386 455 .stats = .{ .start_time = timestamp(io) }, 387 456 .error_frame = buildErrorFrame(allocator), 388 457 .io = io, 458 + .shutdown = shutdown, 389 459 }; 390 460 } 391 461 ··· 562 632 self.consumers_mutex.lockUncancelable(self.io); 563 633 defer self.consumers_mutex.unlock(self.io); 564 634 return self.consumers.items.len; 635 + } 636 + 637 + /// broadcast loop — runs as an Evented fiber, drains the broadcast queue 638 + /// and calls broadcast() for each item. this is the only path that touches 639 + /// consumers_mutex / consumer.mutex / consumer.cond. 640 + pub fn runBroadcastLoop(self: *Broadcaster) void { 641 + log.info("broadcaster fiber started", .{}); 642 + while (!self.shutdown.load(.acquire)) { 643 + var drained: usize = 0; 644 + while (self.broadcast_queue.pop()) |item| { 645 + self.broadcast(item.seq, item.data); 646 + self.allocator.free(@constCast(item.data)); 647 + drained += 1; 648 + } 649 + if (drained == 0) { 650 + // no items — yield briefly so other fibers run 651 + self.io.sleep(Io.Duration.fromMilliseconds(1), .awake) catch return; 652 + } 653 + } 654 + // drain remaining items on shutdown 655 + while (self.broadcast_queue.pop()) |item| { 656 + self.broadcast(item.seq, item.data); 657 + self.allocator.free(@constCast(item.data)); 658 + } 659 + log.info("broadcaster fiber stopped", .{}); 565 660 } 566 661 }; 567 662 ··· 1068 1163 1069 1164 // --- tests --- 1070 1165 1166 + var test_shutdown: std.atomic.Value(bool) = .{ .raw = false }; 1167 + 1071 1168 test "broadcaster add and remove consumer" { 1072 - var b = Broadcaster.init(std.testing.allocator, std.testing.io); 1169 + var b = Broadcaster.init(std.testing.allocator, std.testing.io, &test_shutdown); 1073 1170 defer b.deinit(); 1074 1171 1075 1172 try std.testing.expectEqual(@as(u64, 0), b.stats.seq.load(.acquire)); ··· 1077 1174 } 1078 1175 1079 1176 test "broadcast updates stats and history" { 1080 - var b = Broadcaster.init(std.testing.allocator, std.testing.io); 1177 + var b = Broadcaster.init(std.testing.allocator, std.testing.io, &test_shutdown); 1081 1178 defer b.deinit(); 1082 1179 1083 1180 b.broadcast(1, "frame1"); ··· 1092 1189 } 1093 1190 1094 1191 test "frame history supports cursor replay" { 1095 - var b = Broadcaster.init(std.testing.allocator, std.testing.io); 1192 + var b = Broadcaster.init(std.testing.allocator, std.testing.io, &test_shutdown); 1096 1193 defer b.deinit(); 1097 1194 1098 1195 for (1..6) |i| { ··· 1304 1401 try std.testing.expect(p.getString("error") == null); 1305 1402 } 1306 1403 1307 - test "concurrent broadcast through ordering mutex produces monotonic sequences" { 1308 - // regression test: without broadcast_order serialization, concurrent 1309 - // subscriber threads can interleave persist (seq assignment) and broadcast, 1310 - // delivering frames out of order to consumers. 1311 - // 1312 - // this simulates the subscriber pattern: N threads each acquire the 1313 - // ordering lock, assign a seq (atomic increment, like persist), and 1314 - // broadcast. the ring buffer history must be strictly monotonic. 1404 + test "concurrent broadcast queue + drain produces monotonic sequences" { 1405 + // regression test: multiple worker threads push to the broadcast queue 1406 + // under persist_order lock, then the broadcaster fiber drains and broadcasts. 1407 + // the ring buffer history must be strictly monotonic. 1315 1408 1316 - var bc = Broadcaster.init(std.testing.allocator, std.testing.io); 1409 + var bc = Broadcaster.init(std.testing.allocator, std.testing.io, &test_shutdown); 1317 1410 defer bc.deinit(); 1318 1411 1319 1412 const num_threads = 8; ··· 1324 1417 const Worker = struct { 1325 1418 fn run(bc_ptr: *Broadcaster, counter: *std.atomic.Value(u64)) void { 1326 1419 for (0..frames_per_thread) |_| { 1327 - bc_ptr.broadcast_order.lockUncancelable(bc_ptr.io); 1328 - defer bc_ptr.broadcast_order.unlock(bc_ptr.io); 1329 - 1420 + while (bc_ptr.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 1421 + std.atomic.spinLoopHint(); 1422 + } 1330 1423 const seq = counter.fetchAdd(1, .monotonic) + 1; 1331 - bc_ptr.broadcast(seq, "x"); 1424 + const data = std.testing.allocator.dupe(u8, "x") catch { 1425 + bc_ptr.persist_order.store(0, .release); 1426 + continue; 1427 + }; 1428 + bc_ptr.broadcast_queue.push(seq, data); 1429 + bc_ptr.persist_order.store(0, .release); 1332 1430 } 1333 1431 } 1334 1432 }; ··· 1338 1436 t.* = std.Thread.spawn(.{}, Worker.run, .{ &bc, &seq_counter }) catch unreachable; 1339 1437 } 1340 1438 for (&threads) |*t| t.join(); 1439 + 1440 + // drain the queue (simulating the broadcaster fiber) 1441 + while (bc.broadcast_queue.pop()) |item| { 1442 + bc.broadcast(item.seq, item.data); 1443 + std.testing.allocator.free(@constCast(item.data)); 1444 + } 1341 1445 1342 1446 // verify: ring buffer history has strictly monotonic sequences 1343 1447 const frames = try bc.history.framesSince(std.testing.allocator, 0);
+16 -5
src/frame_worker.zig
··· 273 273 else 274 274 .identity; 275 275 276 - // persist and broadcast under ordering lock 276 + // persist under ordering lock, then push to broadcast queue. 277 + // the broadcaster fiber (Evented) drains the queue and does fan-out — 278 + // worker threads never touch consumer state directly. 277 279 if (work.persist) |dp| { 278 280 const relay_seq = blk: { 279 - work.bc.broadcast_order.lockUncancelable(work.io); 280 - defer work.bc.broadcast_order.unlock(work.io); 281 + while (work.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 282 + std.atomic.spinLoopHint(); 283 + } 281 284 282 285 const seq = dp.persist(kind, uid, data) catch |err| { 286 + work.bc.persist_order.store(0, .release); 283 287 log.warn("persist failed: {s}", .{@errorName(err)}); 284 288 return; 285 289 }; 286 290 work.bc.stats.relay_seq.store(seq, .release); 287 291 const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data; 288 - work.bc.broadcast(seq, broadcast_data); 292 + // dupe for the broadcast queue — arena will free broadcast_data 293 + const owned = work.allocator.dupe(u8, broadcast_data) catch { 294 + work.bc.persist_order.store(0, .release); 295 + return; 296 + }; 297 + work.bc.broadcast_queue.push(seq, owned); 298 + work.bc.persist_order.store(0, .release); 289 299 break :blk seq; 290 300 }; 291 301 _ = relay_seq; ··· 310 320 } 311 321 } else { 312 322 const upstream_seq = payload.getUint("seq") orelse 0; 313 - work.bc.broadcast(upstream_seq, data); 323 + const owned = work.allocator.dupe(u8, data) catch return; 324 + work.bc.broadcast_queue.push(upstream_seq, owned); 314 325 } 315 326 } 316 327
+57 -17
src/main.zig
··· 47 47 pub const default_stack_size = 8 * 1024 * 1024; 48 48 49 49 // -- Io backend selection -- 50 - // Evented backends (Io.Uring, Io.Dispatch, Io.Kqueue) use fiber-local state 51 - // for futex wait/wake. Plain std.Thread workers (frame pool) calling 52 - // Io.Mutex with an Evented io segfault because they lack that state. 53 - // Use Threaded until frame workers are migrated to io.concurrent or 54 - // cross-boundary mutexes get a dedicated Threaded sync_io. 55 - const Backend = Io.Threaded; 50 + // Evented (fibers): network orchestration, subscriber connections, WS server, broadcasting. 51 + // Worker threads use a dedicated pool_io (Threaded) for their sync — they never touch Evented io. 52 + // The broadcast queue bridges workers → broadcaster fiber (atomics only, no Io dependency). 53 + const Backend = Io.Evented; 56 54 57 55 var backend: Backend = undefined; 58 56 var debug_threaded_io: Io.Threaded = undefined; ··· 160 158 }; 161 159 const allocator = if (build_options.use_gpa) gpa.allocator() else std.heap.c_allocator; 162 160 161 + // shared options for both debug and primary runtimes 162 + const io_opts: Io.Threaded.InitOptions = .{ 163 + .stack_size = default_stack_size, // 8MB (default is 16MB) 164 + .concurrent_limit = Io.Limit.limited(4096), // safety rail (steady-state ~2,800 hosts) 165 + }; 166 + 163 167 // init debug io (for std.debug.print thread safety) 164 - debug_threaded_io = Io.Threaded.init(allocator, .{}); 168 + debug_threaded_io = Io.Threaded.init(allocator, io_opts); 165 169 166 - // init primary runtime 170 + // init primary runtime (Evented: fibers for network orchestration) 167 171 if (Backend == Io.Threaded) { 168 - backend = Io.Threaded.init(allocator, .{}); 172 + backend = Io.Threaded.init(allocator, io_opts); 169 173 } else { 170 174 try Backend.init(&backend, allocator, .{}); 171 175 } 172 176 const io = backend.io(); 173 177 178 + // dedicated Threaded runtime for the frame worker pool. 179 + // worker threads are plain std.Thread — they cannot use Evented io 180 + // (Evented futex calls ev.yield() which requires fiber context). 181 + // this io is used for: persist ordering mutex, timestamps, validator cache, 182 + // DID resolution HTTP, and thread pool internal sync. 183 + var pool_io_backend = Io.Threaded.init(allocator, .{ 184 + .stack_size = default_stack_size, 185 + }); 186 + const pool_io = pool_io_backend.io(); 187 + 174 188 log.info("io backend: {s}", .{if (Backend == Io.Threaded) "Threaded" else "Evented"}); 175 189 176 190 // parse config from env ··· 188 202 installSignalHandlers(); 189 203 190 204 // init components — pass io to network-facing modules 191 - var bc = broadcaster.Broadcaster.init(allocator, io); 205 + var bc = broadcaster.Broadcaster.init(allocator, io, &shutdown_flag); 192 206 defer bc.deinit(); 193 207 194 - var val = validator_mod.Validator.init(allocator, &bc.stats, io); 208 + // validator uses pool_io — its cache LRU and host resolvers are called from worker threads 209 + var val = validator_mod.Validator.init(allocator, &bc.stats, pool_io); 195 210 defer val.deinit(); 196 211 try val.start(); 197 212 198 213 // init disk persistence (indigo-compatible diskpersist format + Postgres index) 199 214 const database_url = getenv("DATABASE_URL") orelse "postgres://relay:relay@localhost:5432/relay"; 200 - var dp = event_log_mod.DiskPersist.init(allocator, data_dir, database_url, db_pool_size, io) catch |err| { 215 + var dp = event_log_mod.DiskPersist.init(allocator, data_dir, database_url, db_pool_size, pool_io) catch |err| { 201 216 log.err("failed to init disk persist at {s}: {s}", .{ data_dir, @errorName(err) }); 202 217 return err; 203 218 }; ··· 249 264 .frame_queue_capacity = frame_queue_capacity, 250 265 }, 251 266 io, 267 + pool_io, 252 268 ); 253 269 defer slurper.deinit(); 254 270 slurper.collection_index = &ci; ··· 257 273 // start: loads active hosts from DB, spawns subscriber threads 258 274 try slurper.start(); 259 275 276 + // start broadcaster fiber — drains broadcast queue, owns all consumer state. 277 + // this is the Evented-side sequencer: frame workers push results to the queue, 278 + // this fiber does the actual fan-out to downstream consumers. 279 + var broadcast_future = try io.concurrent(broadcaster.Broadcaster.runBroadcastLoop, .{&bc}); 280 + defer _ = broadcast_future.cancel(io); 281 + 260 282 // start GC loop (runs as background task — does disk I/O + malloc_trim) 261 283 var gc_future = try io.concurrent(gcLoop, .{ &dp, io }); 284 + defer _ = gc_future.cancel(io); 262 285 263 286 // wire HTTP fallback into broadcaster (all API endpoints served on WS port) 264 287 var http_context = api.HttpContext{ ··· 271 294 .resyncer = &resyncer, 272 295 .bc = &bc, 273 296 .validator = &val, 297 + .pool_io = pool_io, 274 298 }; 275 299 bc.http_fallback = api.handleHttpRequest; 276 300 bc.http_fallback_ctx = @ptrCast(&http_context); ··· 291 315 .slurper = &slurper, 292 316 }; 293 317 var metrics_future = try io.concurrent(MetricsServer.run, .{&metrics_srv}); 318 + defer _ = metrics_future.cancel(io); 294 319 295 320 // start downstream WebSocket server (also serves HTTP API via httpFallback) 296 321 log.info("relay listening on :{d} (ws+http), :{d} (metrics)", .{ port, metrics_port }); 297 322 log.info("seed host: {s}", .{upstream}); 298 323 log.info("data dir: {s} (retention: {d}h, max: {d} GB)", .{ data_dir, retention_hours, max_events_gb }); 299 324 300 - var server = try websocket.Server(broadcaster.Handler).init(allocator, .{ 325 + var server = try websocket.Server(broadcaster.Handler).init(allocator, io, .{ 301 326 .port = port, 302 327 .address = "0.0.0.0", 303 328 .max_conn = 4096, ··· 305 330 }); 306 331 defer server.deinit(); 307 332 308 - const server_thread = try server.listenInNewThread(&bc); 333 + // Io-native accept loop: fiber-based under Evented, thread-based under Threaded 334 + const ws_address = Io.net.Ip4Address.unspecified(port); 335 + var ws_listener = (Io.net.IpAddress{ .ip4 = ws_address }).listen(io, .{ .reuse_address = true }) catch |err| { 336 + log.err("websocket server failed to listen on :{d}: {s}", .{ port, @errorName(err) }); 337 + return err; 338 + }; 339 + var server_future = try io.concurrent(runWsServer, .{ &server, &ws_listener, &bc }); 340 + defer _ = server_future.cancel(io); 309 341 310 342 // wait for shutdown signal 311 343 while (!shutdown_flag.load(.acquire)) { ··· 314 346 315 347 log.info("shutdown signal received, stopping...", .{}); 316 348 317 - // stop WebSocket server (closes all downstream connections) 318 - server.stop(); 319 - server_thread.join(); 349 + // stop WebSocket server: close listener to unblock accept, then cancel task 350 + ws_listener.deinit(io); 351 + server_future.cancel(io); 320 352 321 353 // cancel GC task 322 354 gc_future.cancel(io); 355 + 356 + // cancel broadcaster fiber (shutdown flag already set, it will drain remaining) 357 + broadcast_future.cancel(io); 323 358 324 359 // close metrics listener to unblock accept(), then cancel task 325 360 metrics_srv.server.deinit(io); ··· 329 364 } 330 365 331 366 const builtin = @import("builtin"); 367 + 368 + /// concrete wrapper for runIo — io.concurrent needs ArgsTuple, which can't handle anytype 369 + fn runWsServer(server: *websocket.Server(broadcaster.Handler), listener: *Io.net.Server, bc: *broadcaster.Broadcaster) void { 370 + server.runIo(listener, bc); 371 + } 332 372 333 373 fn gcLoop(dp: *event_log_mod.DiskPersist, io: Io) void { 334 374 const gc_interval: u64 = 10 * 60; // 10 minutes in seconds
+8 -2
src/slurper.zig
··· 247 247 crawl_future: ?Io.Future(void) = null, 248 248 249 249 io: Io, 250 + /// dedicated Threaded io for the frame worker pool — safe from plain OS threads 251 + pool_io: Io, 250 252 251 253 pub fn init( 252 254 allocator: Allocator, ··· 256 258 shutdown: *std.atomic.Value(bool), 257 259 options: Options, 258 260 io: Io, 261 + pool_io: Io, 259 262 ) Slurper { 260 263 return .{ 261 264 .allocator = allocator, ··· 265 268 .shutdown = shutdown, 266 269 .options = options, 267 270 .io = io, 271 + .pool_io = pool_io, 268 272 }; 269 273 } 270 274 ··· 277 281 self.ca_bundle = bundle; 278 282 log.info("loaded shared CA bundle", .{}); 279 283 280 - // create frame processing pool — worker threads handle heavy decode/validate/persist 284 + // create frame processing pool — worker threads use pool_io (Threaded), 285 + // safe from plain OS threads even when the app uses Evented io 281 286 self.frame_pool = try frame_worker_mod.FramePool.init(self.allocator, .{ 282 287 .num_workers = self.options.frame_workers, 283 288 .queue_capacity = self.options.frame_queue_capacity, 284 289 .stack_size = @import("main.zig").default_stack_size, 285 - }, self.io); 290 + }, self.pool_io); 286 291 log.info("frame pool started: {d} workers, queue capacity {d}", .{ self.options.frame_workers, self.options.frame_queue_capacity }); 287 292 288 293 // spawn worker startup in background so HTTP server + probes come up immediately. ··· 476 481 sub.collection_index = self.collection_index; 477 482 sub.resyncer = self.resyncer; 478 483 if (self.frame_pool) |*fp| sub.pool = fp; 484 + sub.pool_io = self.pool_io; 479 485 480 486 const future = try self.io.concurrent(runWorker, .{ self, host_id, sub }); 481 487
+25 -48
src/subscriber.zig
··· 208 208 collection_index: ?*collection_index_mod.CollectionIndex = null, 209 209 resyncer: ?*resync_mod.Resyncer = null, 210 210 pool: ?*frame_worker_mod.FramePool = null, 211 + /// dedicated Threaded io for frame workers — safe from plain OS threads 212 + pool_io: ?Io = null, 211 213 shutdown: *std.atomic.Value(bool), 212 214 last_upstream_seq: ?u64 = null, 213 215 last_cursor_flush: i64 = 0, ··· 351 353 .subscriber = self, 352 354 }; 353 355 354 - // spawn keepalive ping task (Go relay: 30s ticker goroutine in consumer.go) 355 - // prevents intermediate proxies (e.g. Cloudflare) from killing idle connections 356 - var ping_future = self.io.concurrent(pingLoop, .{ &client, self }) catch |err| { 357 - log.warn("host {s}: failed to spawn ping task: {s}", .{ self.options.hostname, @errorName(err) }); 358 - return err; 359 - }; 360 - 361 - defer ping_future.cancel(self.io); 362 - try client.readLoop(&handler); 363 - } 364 - 365 - /// periodic WebSocket ping to keep the connection alive. 366 - /// matches indigo relay: 30s interval, close after 4 consecutive failures. 367 - fn pingLoop(client: *websocket.Client, self: *Subscriber) void { 368 - var fail_count: u32 = 0; 369 - while (!self.shouldStop()) { 370 - // sleep in 1s increments so we can check shutdown. 371 - // return on any sleep error — critically, error.Canceled from 372 - // ping_future.cancel() must not be swallowed, otherwise deinit 373 - // frees the client while we're still running. 374 - var elapsed: u32 = 0; 375 - while (elapsed < ping_interval_sec and !self.shouldStop()) { 376 - self.io.sleep(Io.Duration.fromSeconds(1), .awake) catch return; 377 - elapsed += 1; 378 - } 379 - if (self.shouldStop() or client.isClosed()) return; 380 - 381 - client.writeFrame(.ping, &.{}) catch { 382 - fail_count += 1; 383 - log.warn("host {s}: ping failed ({d}/{d})", .{ self.options.hostname, fail_count, max_ping_failures }); 384 - if (fail_count >= max_ping_failures) { 385 - log.err("host {s}: too many ping failures, closing connection", .{self.options.hostname}); 386 - client.close(.{}) catch {}; 387 - return; 388 - } 389 - continue; 390 - }; 391 - fail_count = 0; 392 - } 356 + // heartbeat read loop: merged keepalive + frame reading in a single task. 357 + // SO_RCVTIMEO fires after interval_ms, triggering a ping. closes after 358 + // max_failures consecutive idle intervals with no frames received. 359 + try client.readLoopWithHeartbeat(&handler, .{ 360 + .interval_ms = ping_interval_sec * 1000, 361 + .max_failures = max_ping_failures, 362 + }); 393 363 } 394 364 }; 395 365 ··· 503 473 .host_id = sub.options.host_id, 504 474 .hostname = sub.options.hostname, 505 475 .allocator = sub.allocator, 506 - .io = sub.io, 476 + .io = sub.pool_io orelse sub.io, // pool_io (Threaded) for worker-safe ops 507 477 .bc = sub.bc, 508 478 .validator = sub.validator, 509 479 .persist = sub.persist, ··· 728 698 else // is_identity (unknown types already filtered above) 729 699 .identity; 730 700 731 - // persist and get relay-assigned seq, broadcast raw bytes. 732 - // ordering mutex ensures frames are broadcast in seq order — 733 - // without it, concurrent subscriber threads can interleave 734 - // persist (seq assignment) and broadcast, delivering out-of-order. 701 + // persist and push to broadcast queue (broadcaster fiber handles fan-out). 702 + // ordering spinlock ensures frames are persisted + enqueued in seq order. 735 703 if (sub.persist) |dp| { 736 704 const relay_seq = blk: { 737 - sub.bc.broadcast_order.lockUncancelable(sub.io); 738 - defer sub.bc.broadcast_order.unlock(sub.io); 705 + while (sub.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 706 + std.atomic.spinLoopHint(); 707 + } 739 708 740 709 const seq = dp.persist(kind, uid, data) catch |err| { 710 + sub.bc.persist_order.store(0, .release); 741 711 log.warn("persist failed: {s}", .{@errorName(err)}); 742 712 return; 743 713 }; 744 714 sub.bc.stats.relay_seq.store(seq, .release); 745 715 const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data; 746 - sub.bc.broadcast(seq, broadcast_data); 716 + const owned = sub.allocator.dupe(u8, broadcast_data) catch { 717 + sub.bc.persist_order.store(0, .release); 718 + return; 719 + }; 720 + sub.bc.broadcast_queue.push(seq, owned); 721 + sub.bc.persist_order.store(0, .release); 747 722 break :blk seq; 748 723 }; 749 724 _ = relay_seq; ··· 767 742 } 768 743 } 769 744 } else { 770 - sub.bc.broadcast(upstream_seq orelse 0, data); 745 + const seq = upstream_seq orelse 0; 746 + const owned = sub.allocator.dupe(u8, data) catch return; 747 + sub.bc.broadcast_queue.push(seq, owned); 771 748 } 772 749 } 773 750