atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix cross-Io heap corruption: subscriber pg.Pool access from Evented fibers

subscriber fibers (Evented) were calling DiskPersist methods that acquire
pg.Pool connections via Threaded futex — NULL Thread.current() on Evented
fibers caused heap corruption (~16min crash cycle).

add MPSC host_ops queue (atomic spinlock, same pattern as BroadcastQueue):
- subscriber pushes ops instead of calling dp.* directly
- single background thread (std.Thread on pool_io) pops and executes
- covers: cursor flush (~450/s), failure tracking, status updates
- cursor loaded at spawn time (slurper passes last_seq through)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 3533416f fadf9b56

+209 -33
+151
src/host_ops.zig
··· 1 + //! host ops queue — MPSC ring buffer for cross-Io host bookkeeping 2 + //! 3 + //! subscriber fibers (Evented) push host operations into this queue. 4 + //! a single background thread (std.Thread on pool_io / Threaded) pops and 5 + //! executes them against DiskPersist (pg.Pool). this avoids Evented fibers 6 + //! calling pg.Pool.acquire(), which invokes a Threaded futex → NULL 7 + //! Thread.current() → heap corruption. 8 + //! 9 + //! same atomic spinlock MPSC pattern as BroadcastQueue (broadcaster.zig). 10 + 11 + const std = @import("std"); 12 + const event_log_mod = @import("event_log.zig"); 13 + 14 + const Io = std.Io; 15 + const log = std.log.scoped(.relay); 16 + 17 + pub const HostOp = struct { 18 + host_id: u64, 19 + kind: Kind, 20 + payload: Payload, 21 + 22 + pub const Kind = enum { 23 + flush_cursor, 24 + increment_failures, 25 + reset_failures, 26 + update_status, 27 + }; 28 + 29 + pub const Payload = union { 30 + seq: u64, 31 + /// pointer to subscriber's host_shutdown atomic — set by worker on exhaustion 32 + host_shutdown: *std.atomic.Value(bool), 33 + none: void, 34 + status: Status, 35 + 36 + pub const Status = struct { 37 + buf: [16]u8 = .{0} ** 16, 38 + len: u8 = 0, 39 + 40 + pub fn init(s: []const u8) Status { 41 + var result: Status = .{}; 42 + const n: u8 = @intCast(@min(s.len, 16)); 43 + @memcpy(result.buf[0..n], s[0..n]); 44 + result.len = n; 45 + return result; 46 + } 47 + 48 + pub fn slice(self: *const Status) []const u8 { 49 + return self.buf[0..self.len]; 50 + } 51 + }; 52 + }; 53 + }; 54 + 55 + pub const HostOpsQueue = struct { 56 + const CAPACITY = 4096; 57 + 58 + items: [CAPACITY]HostOp = undefined, 59 + head: std.atomic.Value(u32) = .{ .raw = 0 }, 60 + tail: std.atomic.Value(u32) = .{ .raw = 0 }, 61 + push_lock: std.atomic.Value(u32) = .{ .raw = 0 }, 62 + 63 + persist: *event_log_mod.DiskPersist, 64 + shutdown: *std.atomic.Value(bool), 65 + max_consecutive_failures: u32, 66 + 67 + /// push an op (called from any Io context — Evented fibers or Threaded threads). 68 + /// spins until space is available. 69 + pub fn push(self: *HostOpsQueue, op: HostOp) void { 70 + while (true) { 71 + // acquire spinlock 72 + while (self.push_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 73 + std.atomic.spinLoopHint(); 74 + } 75 + 76 + const tail = self.tail.load(.monotonic); 77 + const next_tail = (tail + 1) % CAPACITY; 78 + if (next_tail == self.head.load(.acquire)) { 79 + // full — release lock, yield, retry 80 + self.push_lock.store(0, .release); 81 + std.atomic.spinLoopHint(); 82 + continue; 83 + } 84 + 85 + self.items[tail] = op; 86 + self.tail.store(next_tail, .release); 87 + self.push_lock.store(0, .release); 88 + return; 89 + } 90 + } 91 + 92 + /// pop an op (single-consumer — worker thread only). 93 + fn pop(self: *HostOpsQueue) ?HostOp { 94 + const head = self.head.load(.monotonic); 95 + if (head == self.tail.load(.acquire)) return null; 96 + 97 + const item = self.items[head]; 98 + self.head.store((head + 1) % CAPACITY, .release); 99 + return item; 100 + } 101 + 102 + /// worker thread entry point. runs on pool_io (Threaded). 103 + /// pops ops and executes them against DiskPersist. 104 + pub fn run(self: *HostOpsQueue, pool_io: Io) void { 105 + while (!self.shutdown.load(.acquire)) { 106 + var drained: u32 = 0; 107 + while (self.pop()) |op| { 108 + self.execute(op); 109 + drained += 1; 110 + } 111 + 112 + if (drained == 0) { 113 + // nothing to do — brief sleep via pool_io (Threaded, safe from std.Thread) 114 + pool_io.sleep(Io.Duration.fromMilliseconds(10), .awake) catch return; 115 + } 116 + } 117 + 118 + // drain remaining ops on shutdown 119 + while (self.pop()) |op| { 120 + self.execute(op); 121 + } 122 + } 123 + 124 + fn execute(self: *HostOpsQueue, op: HostOp) void { 125 + switch (op.kind) { 126 + .flush_cursor => { 127 + self.persist.updateHostSeq(op.host_id, op.payload.seq) catch |err| { 128 + log.debug("host_ops: cursor flush failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 129 + }; 130 + }, 131 + .increment_failures => { 132 + const failures = self.persist.incrementHostFailures(op.host_id) catch 0; 133 + if (failures >= self.max_consecutive_failures) { 134 + log.warn("host_ops: host_id={d} exhausted after {d} failures", .{ op.host_id, failures }); 135 + self.persist.updateHostStatus(op.host_id, "exhausted") catch {}; 136 + op.payload.host_shutdown.store(true, .release); 137 + } 138 + }, 139 + .reset_failures => { 140 + self.persist.resetHostFailures(op.host_id) catch |err| { 141 + log.debug("host_ops: reset failures failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 142 + }; 143 + }, 144 + .update_status => { 145 + self.persist.updateHostStatus(op.host_id, op.payload.status.slice()) catch |err| { 146 + log.debug("host_ops: update status failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 147 + }; 148 + }, 149 + } 150 + } 151 + };
+17
src/main.zig
··· 35 35 const backfill_mod = @import("backfill.zig"); 36 36 const cleaner_mod = @import("cleaner.zig"); 37 37 const resync_mod = @import("resync.zig"); 38 + const host_ops_mod = @import("host_ops.zig"); 38 39 const api = @import("api.zig"); 39 40 const build_options = @import("build_options"); 40 41 const malloc_trim: ?*const fn (pad: usize) callconv(.c) c_int = if (builtin.os.tag == .linux) ··· 260 261 try resyncer.start(); 261 262 defer resyncer.deinit(); 262 263 264 + // init host ops queue — subscriber fibers (Evented) push DB ops here, 265 + // a background thread (Threaded) executes them. avoids cross-Io pg.Pool access. 266 + var host_ops_queue: host_ops_mod.HostOpsQueue = .{ 267 + .persist = &dp, 268 + .shutdown = &shutdown_flag, 269 + .max_consecutive_failures = 15, 270 + }; 271 + const host_ops_thread = std.Thread.spawn(.{}, host_ops_mod.HostOpsQueue.run, .{ &host_ops_queue, pool_io }) catch |err| { 272 + log.err("failed to start host ops thread: {s}", .{@errorName(err)}); 273 + return err; 274 + }; 275 + 263 276 // init slurper (multi-host crawl manager) 264 277 var slurper = slurper_mod.Slurper.init( 265 278 allocator, ··· 280 293 defer slurper.deinit(); 281 294 slurper.collection_index = &ci; 282 295 slurper.resyncer = &resyncer; 296 + slurper.host_ops = &host_ops_queue; 283 297 284 298 // start: loads active hosts from DB, spawns subscriber threads 285 299 try slurper.start(); ··· 368 382 // join GC thread — it checks shutdown_flag and will exit its sleep loop. 369 383 // must complete before dp.deinit() runs (dp is stack-owned). 370 384 gc_thread.join(); 385 + 386 + // join host ops thread — drains remaining ops before dp.deinit() 387 + host_ops_thread.join(); 371 388 372 389 // cancel broadcaster fiber (shutdown flag already set, it will drain remaining) 373 390 broadcast_future.cancel(io);
+7 -3
src/slurper.zig
··· 20 20 const collection_index_mod = @import("collection_index.zig"); 21 21 const resync_mod = @import("resync.zig"); 22 22 const frame_worker_mod = @import("frame_worker.zig"); 23 + const host_ops_mod = @import("host_ops.zig"); 23 24 24 25 const Allocator = std.mem.Allocator; 25 26 const log = std.log.scoped(.relay); ··· 228 229 persist: *event_log_mod.DiskPersist, 229 230 collection_index: ?*collection_index_mod.CollectionIndex = null, 230 231 resyncer: ?*resync_mod.Resyncer = null, 232 + host_ops: ?*host_ops_mod.HostOpsQueue = null, 231 233 shutdown: *std.atomic.Value(bool), 232 234 options: Options, 233 235 ··· 453 455 self.persist.updateHostStatus(host_info.id, "active") catch {}; 454 456 self.persist.resetHostFailures(host_info.id) catch {}; 455 457 456 - try self.spawnWorker(host_info.id, hostname); 458 + try self.spawnWorker(host_info.id, hostname, host_info.last_seq); 457 459 log.info("added host {s} (id={d})", .{ hostname, host_info.id }); 458 460 } 459 461 460 462 /// spawn a subscriber thread for a host 461 - fn spawnWorker(self: *Slurper, host_id: u64, hostname: []const u8) !void { 463 + fn spawnWorker(self: *Slurper, host_id: u64, hostname: []const u8, last_seq: u64) !void { 462 464 const hostname_duped = try self.allocator.dupe(u8, hostname); 463 465 errdefer self.allocator.free(hostname_duped); 464 466 ··· 486 488 sub.resyncer = self.resyncer; 487 489 if (self.frame_pool) |*fp| sub.pool = fp; 488 490 sub.pool_io = self.pool_io; 491 + sub.host_ops = self.host_ops; 492 + if (last_seq > 0) sub.last_upstream_seq = last_seq; 489 493 490 494 const future = try self.io.concurrent(runWorker, .{ self, host_id, sub }); 491 495 ··· 550 554 var spawned: usize = 0; 551 555 for (hosts) |host| { 552 556 if (self.shutdown.load(.acquire)) break; 553 - self.spawnWorker(host.id, host.hostname) catch |err| { 557 + self.spawnWorker(host.id, host.hostname, host.last_seq) catch |err| { 554 558 log.warn("failed to spawn worker for {s}: {s}", .{ host.hostname, @errorName(err) }); 555 559 }; 556 560 spawned += 1;
+34 -30
src/subscriber.zig
··· 15 15 const collection_index_mod = @import("collection_index.zig"); 16 16 const resync_mod = @import("resync.zig"); 17 17 const frame_worker_mod = @import("frame_worker.zig"); 18 + const host_ops_mod = @import("host_ops.zig"); 18 19 19 20 const Allocator = std.mem.Allocator; 20 21 const Io = std.Io; ··· 210 211 pool: ?*frame_worker_mod.FramePool = null, 211 212 /// dedicated Threaded io for frame workers — safe from plain OS threads 212 213 pool_io: ?Io = null, 214 + /// host ops queue — pushes DB ops to a background thread (avoids cross-Io pg.Pool access) 215 + host_ops: ?*host_ops_mod.HostOpsQueue = null, 213 216 shutdown: *std.atomic.Value(bool), 214 217 last_upstream_seq: ?u64 = null, 215 218 last_cursor_flush: i64 = 0, ··· 256 259 var backoff: u64 = 1; 257 260 const max_backoff: u64 = 60; 258 261 259 - // load cursor from DB if we have a host_id 260 - if (self.options.host_id > 0) { 261 - if (self.persist) |dp| { 262 - const host_info = dp.getOrCreateHost(self.options.hostname) catch null; 263 - if (host_info) |info| { 264 - if (info.last_seq > 0) { 265 - self.last_upstream_seq = info.last_seq; 266 - log.info("host {s}: resuming from cursor {d}", .{ self.options.hostname, info.last_seq }); 267 - } 268 - } 269 - } 262 + // cursor is set at spawn time by slurper (avoids cross-Io pg.Pool access) 263 + if (self.last_upstream_seq) |seq| { 264 + log.info("host {s}: resuming from cursor {d}", .{ self.options.hostname, seq }); 270 265 } 271 266 272 267 while (!self.shouldStop()) { ··· 279 274 280 275 if (self.shouldStop()) return; 281 276 282 - // track failures for this host 277 + // track failures for this host (pushed to background thread via host_ops queue) 283 278 if (self.options.host_id > 0) { 284 - if (self.persist) |dp| { 285 - const failures = dp.incrementHostFailures(self.options.host_id) catch 0; 286 - if (failures >= max_consecutive_failures) { 287 - log.warn("host {s}: exhausted after {d} failures, stopping", .{ self.options.hostname, failures }); 288 - dp.updateHostStatus(self.options.host_id, "exhausted") catch {}; 289 - return; 290 - } 279 + if (self.host_ops) |hq| { 280 + hq.push(.{ 281 + .host_id = self.options.host_id, 282 + .kind = .increment_failures, 283 + .payload = .{ .host_shutdown = &self.host_shutdown }, 284 + }); 291 285 } 292 286 } 293 287 ··· 302 296 } 303 297 } 304 298 305 - /// flush cursor position to the host table 299 + /// flush cursor position to the host table (via host_ops queue — avoids cross-Io pg.Pool) 306 300 fn flushCursor(self: *Subscriber) void { 307 301 if (self.options.host_id == 0) return; 308 302 const seq = self.last_upstream_seq orelse return; 309 - if (self.persist) |dp| { 310 - dp.updateHostSeq(self.options.host_id, seq) catch |err| { 311 - log.debug("host {s}: cursor flush failed: {s}", .{ self.options.hostname, @errorName(err) }); 312 - }; 303 + if (self.host_ops) |hq| { 304 + hq.push(.{ 305 + .host_id = self.options.host_id, 306 + .kind = .flush_cursor, 307 + .payload = .{ .seq = seq }, 308 + }); 313 309 } 314 310 } 315 311 ··· 348 344 try client.handshake(path, .{ .headers = host_header }); 349 345 log.info("host {s}: connected", .{self.options.hostname}); 350 346 351 - // reset failures on successful connect 347 + // reset failures on successful connect (via host_ops queue) 352 348 if (self.options.host_id > 0) { 353 - if (self.persist) |dp| { 354 - dp.resetHostFailures(self.options.host_id) catch {}; 349 + if (self.host_ops) |hq| { 350 + hq.push(.{ 351 + .host_id = self.options.host_id, 352 + .kind = .reset_failures, 353 + .payload = .{ .none = {} }, 354 + }); 355 355 } 356 356 } 357 357 ··· 400 400 log.warn("host {s}: error frame: {s}: {s}", .{ sub.options.hostname, err_name, err_msg }); 401 401 if (std.mem.eql(u8, err_name, "FutureCursor")) { 402 402 // our cursor is ahead of the PDS — set host to idle, stop this subscriber only 403 - if (sub.persist) |dp| { 404 - if (sub.options.host_id > 0) { 405 - dp.updateHostStatus(sub.options.host_id, "idle") catch {}; 403 + if (sub.options.host_id > 0) { 404 + if (sub.host_ops) |hq| { 405 + hq.push(.{ 406 + .host_id = sub.options.host_id, 407 + .kind = .update_status, 408 + .payload = .{ .status = host_ops_mod.HostOp.Payload.Status.init("idle") }, 409 + }); 406 410 } 407 411 } 408 412 sub.host_shutdown.store(true, .release);