atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix CPU regression: coalesce cursor flushes instead of FIFO queuing

the host_ops MPSC queue was processing every cursor flush (~348/sec at
1,391 hosts) as an individual DB write. when the queue filled, Evented
producers busy-spun — causing 5.4 cores sustained CPU.

split into two mechanisms:
- CursorMap: subscribers atomically store latest seq (one store, no lock).
worker thread sweeps every 5s, batch-flushes only changed cursors.
- HostOpsQueue: kept for rare ops only (failures, status updates).

also adds slot reuse via free list — unregister on subscriber exit
prevents slot exhaustion from host churn.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz cde39569 ed90a1b2

+167 -30
+139 -20
src/host_ops.zig
··· 1 - //! host ops queue — MPSC ring buffer for cross-Io host bookkeeping 1 + //! host ops — cross-Io host bookkeeping without pg.Pool from Evented fibers 2 2 //! 3 - //! subscriber fibers (Evented) push host operations into this queue. 4 - //! a single background thread (std.Thread on pool_io / Threaded) pops and 5 - //! executes them against DiskPersist (pg.Pool). this avoids Evented fibers 6 - //! calling pg.Pool.acquire(), which invokes a Threaded futex → NULL 7 - //! Thread.current() → heap corruption. 3 + //! two mechanisms: 8 4 //! 9 - //! same atomic spinlock MPSC pattern as BroadcastQueue (broadcaster.zig). 5 + //! 1. CursorMap — coalescing cursor store. subscribers atomically write their 6 + //! latest seq (nanoseconds, no lock). the worker thread sweeps every 5s and 7 + //! batch-flushes only changed cursors to the DB. this replaces the old FIFO 8 + //! cursor queue that caused ~350 individual UPDATEs/sec and busy-spin on full. 9 + //! 10 + //! 2. HostOpsQueue — MPSC ring for rare ops (increment_failures, reset_failures, 11 + //! update_status). same atomic spinlock pattern as BroadcastQueue. these ops 12 + //! are infrequent (connect/disconnect/error), so the queue never fills. 10 13 11 14 const std = @import("std"); 12 15 const event_log_mod = @import("event_log.zig"); ··· 14 17 const Io = std.Io; 15 18 const log = std.log.scoped(.relay); 16 19 20 + // --------------------------------------------------------------------------- 21 + // cursor coalescing map 22 + // --------------------------------------------------------------------------- 23 + 24 + pub const CursorMap = struct { 25 + const MAX_SLOTS = 4096; 26 + /// sentinel: slot is inactive (freed or never used) — worker skips these 27 + const EMPTY: u64 = 0; 28 + 29 + host_ids: [MAX_SLOTS]u64 = @splat(0), 30 + seqs: [MAX_SLOTS]std.atomic.Value(u64) = @splat(.{ .raw = EMPTY }), 31 + /// last value successfully written to DB (worker-thread only, no atomics) 32 + last_flushed: [MAX_SLOTS]u64 = @splat(0), 33 + /// high-water mark — slots [0..count) have been used at least once 34 + count: std.atomic.Value(u32) = .{ .raw = 0 }, 35 + 36 + /// free list for slot reuse (protected by free_lock spinlock) 37 + free_stack: [MAX_SLOTS]u32 = undefined, 38 + free_top: u32 = 0, 39 + free_lock: std.atomic.Value(u32) = .{ .raw = 0 }, 40 + 41 + /// register a host and return its slot index. called at spawn time (slurper). 42 + pub fn register(self: *CursorMap, host_id: u64, initial_seq: u64) ?u32 { 43 + // try reusing a freed slot first 44 + const slot = self.popFree() orelse blk: { 45 + const s = self.count.fetchAdd(1, .acq_rel); 46 + if (s >= MAX_SLOTS) { 47 + _ = self.count.fetchSub(1, .monotonic); 48 + log.warn("cursor_map: out of slots (max {d})", .{MAX_SLOTS}); 49 + return null; 50 + } 51 + break :blk s; 52 + }; 53 + // write host_id and last_flushed before making the slot visible via seqs 54 + self.host_ids[slot] = host_id; 55 + self.last_flushed[slot] = initial_seq; 56 + // release: makes host_ids write visible to worker thread's acquire load 57 + self.seqs[slot].store(initial_seq, .release); 58 + return slot; 59 + } 60 + 61 + /// return a slot to the free list. called when a subscriber exits (slurper). 62 + pub fn unregister(self: *CursorMap, slot: u32) void { 63 + // mark inactive first — worker sees EMPTY on next sweep and skips 64 + self.seqs[slot].store(EMPTY, .release); 65 + self.host_ids[slot] = 0; 66 + self.pushFree(slot); 67 + } 68 + 69 + /// update cursor (called from subscriber — any Io context). single atomic store. 70 + pub fn update(self: *CursorMap, slot: u32, seq: u64) void { 71 + self.seqs[slot].store(seq, .release); 72 + } 73 + 74 + /// sweep all slots, flush changed cursors to DB. called from worker thread only. 75 + fn flush(self: *CursorMap, persist: *event_log_mod.DiskPersist) void { 76 + const n = self.count.load(.acquire); 77 + var flushed: u32 = 0; 78 + for (0..n) |i| { 79 + const current = self.seqs[i].load(.acquire); 80 + // EMPTY means freed or never-used slot — skip 81 + if (current == EMPTY) continue; 82 + if (current != self.last_flushed[i]) { 83 + persist.updateHostSeq(self.host_ids[i], current) catch |err| { 84 + log.debug("cursor flush failed for host_id={d}: {s}", .{ self.host_ids[i], @errorName(err) }); 85 + continue; 86 + }; 87 + self.last_flushed[i] = current; 88 + flushed += 1; 89 + } 90 + } 91 + if (flushed > 0) { 92 + log.debug("cursor_map: flushed {d}/{d} cursors", .{ flushed, n }); 93 + } 94 + } 95 + 96 + fn popFree(self: *CursorMap) ?u32 { 97 + self.lockFree(); 98 + defer self.unlockFree(); 99 + if (self.free_top == 0) return null; 100 + self.free_top -= 1; 101 + return self.free_stack[self.free_top]; 102 + } 103 + 104 + fn pushFree(self: *CursorMap, slot: u32) void { 105 + self.lockFree(); 106 + defer self.unlockFree(); 107 + self.free_stack[self.free_top] = slot; 108 + self.free_top += 1; 109 + } 110 + 111 + fn lockFree(self: *CursorMap) void { 112 + while (self.free_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 113 + std.atomic.spinLoopHint(); 114 + } 115 + } 116 + 117 + fn unlockFree(self: *CursorMap) void { 118 + self.free_lock.store(0, .release); 119 + } 120 + }; 121 + 122 + // --------------------------------------------------------------------------- 123 + // rare ops queue (failures, status) 124 + // --------------------------------------------------------------------------- 125 + 17 126 pub const HostOp = struct { 18 127 host_id: u64, 19 128 kind: Kind, 20 129 payload: Payload, 21 130 22 131 pub const Kind = enum { 23 - flush_cursor, 24 132 increment_failures, 25 133 reset_failures, 26 134 update_status, 27 135 }; 28 136 29 137 pub const Payload = union { 30 - seq: u64, 31 138 /// pointer to subscriber's host_shutdown atomic — set by worker on exhaustion 32 139 host_shutdown: *std.atomic.Value(bool), 33 140 none: void, ··· 54 161 55 162 pub const HostOpsQueue = struct { 56 163 const CAPACITY = 4096; 164 + const CURSOR_FLUSH_INTERVAL_SEC: i64 = 5; 57 165 58 166 items: [CAPACITY]HostOp = undefined, 59 167 head: std.atomic.Value(u32) = .{ .raw = 0 }, ··· 61 169 push_lock: std.atomic.Value(u32) = .{ .raw = 0 }, 62 170 63 171 persist: *event_log_mod.DiskPersist, 172 + cursor_map: *CursorMap, 64 173 shutdown: *std.atomic.Value(bool), 65 174 max_consecutive_failures: u32, 66 175 67 - /// push an op (called from any Io context — Evented fibers or Threaded threads). 68 - /// spins until space is available. 176 + /// push a rare op (called from any Io context). spins until space is available. 177 + /// only used for failures/status — cursors go through CursorMap. 69 178 pub fn push(self: *HostOpsQueue, op: HostOp) void { 70 179 while (true) { 71 180 // acquire spinlock ··· 100 209 } 101 210 102 211 /// worker thread entry point. runs on pool_io (Threaded). 103 - /// pops ops and executes them against DiskPersist. 212 + /// drains rare ops immediately, sweeps cursor map every 5s. 104 213 pub fn run(self: *HostOpsQueue, pool_io: Io) void { 214 + var last_cursor_flush: i64 = timestamp(pool_io); 215 + 105 216 while (!self.shutdown.load(.acquire)) { 217 + // drain rare ops (failures, status) 106 218 var drained: u32 = 0; 107 219 while (self.pop()) |op| { 108 220 self.execute(op); 109 221 drained += 1; 110 222 } 111 223 224 + // periodic cursor sweep 225 + const now = timestamp(pool_io); 226 + if (now - last_cursor_flush >= CURSOR_FLUSH_INTERVAL_SEC) { 227 + self.cursor_map.flush(self.persist); 228 + last_cursor_flush = now; 229 + } 230 + 112 231 if (drained == 0) { 113 - // nothing to do — brief sleep via pool_io (Threaded, safe from std.Thread) 114 - pool_io.sleep(Io.Duration.fromMilliseconds(10), .awake) catch return; 232 + // brief sleep — rare ops are infrequent, cursor sweep is timer-driven 233 + pool_io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch return; 115 234 } 116 235 } 117 236 118 - // drain remaining ops on shutdown 237 + // final cursor flush + drain remaining ops on shutdown 238 + self.cursor_map.flush(self.persist); 119 239 while (self.pop()) |op| { 120 240 self.execute(op); 121 241 } ··· 123 243 124 244 fn execute(self: *HostOpsQueue, op: HostOp) void { 125 245 switch (op.kind) { 126 - .flush_cursor => { 127 - self.persist.updateHostSeq(op.host_id, op.payload.seq) catch |err| { 128 - log.debug("host_ops: cursor flush failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 129 - }; 130 - }, 131 246 .increment_failures => { 132 247 const failures = self.persist.incrementHostFailures(op.host_id) catch 0; 133 248 if (failures >= self.max_consecutive_failures) { ··· 147 262 }; 148 263 }, 149 264 } 265 + } 266 + 267 + fn timestamp(io: Io) i64 { 268 + return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 150 269 } 151 270 };
+7 -2
src/main.zig
··· 261 261 try resyncer.start(); 262 262 defer resyncer.deinit(); 263 263 264 - // init host ops queue — subscriber fibers (Evented) push DB ops here, 265 - // a background thread (Threaded) executes them. avoids cross-Io pg.Pool access. 264 + // init cursor map + host ops queue — subscriber fibers (Evented) write 265 + // cursor seqs to the coalescing map (atomic store, no lock) and push rare 266 + // ops (failures, status) to the MPSC queue. a background thread (Threaded) 267 + // sweeps cursors every 5s and drains rare ops immediately. 268 + var cursor_map: host_ops_mod.CursorMap = .{}; 266 269 var host_ops_queue: host_ops_mod.HostOpsQueue = .{ 267 270 .persist = &dp, 271 + .cursor_map = &cursor_map, 268 272 .shutdown = &shutdown_flag, 269 273 .max_consecutive_failures = 15, 270 274 }; ··· 294 298 slurper.collection_index = &ci; 295 299 slurper.resyncer = &resyncer; 296 300 slurper.host_ops = &host_ops_queue; 301 + slurper.cursor_map = &cursor_map; 297 302 298 303 // start: loads active hosts from DB, spawns subscriber threads 299 304 try slurper.start();
+12
src/slurper.zig
··· 230 230 collection_index: ?*collection_index_mod.CollectionIndex = null, 231 231 resyncer: ?*resync_mod.Resyncer = null, 232 232 host_ops: ?*host_ops_mod.HostOpsQueue = null, 233 + cursor_map: ?*host_ops_mod.CursorMap = null, 233 234 shutdown: *std.atomic.Value(bool), 234 235 options: Options, 235 236 ··· 489 490 if (self.frame_pool) |*fp| sub.pool = fp; 490 491 sub.pool_io = self.pool_io; 491 492 sub.host_ops = self.host_ops; 493 + if (self.cursor_map) |cm| { 494 + sub.cursor_map = cm; 495 + sub.cursor_slot = cm.register(host_id, last_seq); 496 + } 492 497 if (last_seq > 0) sub.last_upstream_seq = last_seq; 493 498 494 499 const future = try self.io.concurrent(runWorker, .{ self, host_id, sub }); ··· 505 510 /// worker thread wrapper — runs subscriber, cleans up on exit 506 511 fn runWorker(self: *Slurper, host_id: u64, sub: *subscriber_mod.Subscriber) void { 507 512 sub.run(); 513 + 514 + // return cursor slot to free list before destroying subscriber 515 + if (self.cursor_map) |cm| { 516 + if (sub.cursor_slot) |slot| { 517 + cm.unregister(slot); 518 + } 519 + } 508 520 509 521 // subscriber returned — remove from active workers 510 522 self.workers_mutex.lockUncancelable(self.io);
+9 -8
src/subscriber.zig
··· 211 211 pool: ?*frame_worker_mod.FramePool = null, 212 212 /// dedicated Threaded io for frame workers — safe from plain OS threads 213 213 pool_io: ?Io = null, 214 - /// host ops queue — pushes DB ops to a background thread (avoids cross-Io pg.Pool access) 214 + /// host ops queue — pushes rare DB ops to a background thread (avoids cross-Io pg.Pool access) 215 215 host_ops: ?*host_ops_mod.HostOpsQueue = null, 216 + /// coalescing cursor map — subscriber writes latest seq atomically, worker sweeps every 5s 217 + cursor_map: ?*host_ops_mod.CursorMap = null, 218 + cursor_slot: ?u32 = null, 216 219 shutdown: *std.atomic.Value(bool), 217 220 last_upstream_seq: ?u64 = null, 218 221 last_cursor_flush: i64 = 0, ··· 296 299 } 297 300 } 298 301 299 - /// flush cursor position to the host table (via host_ops queue — avoids cross-Io pg.Pool) 302 + /// flush cursor position (atomic store to coalescing map — worker sweeps every 5s) 300 303 fn flushCursor(self: *Subscriber) void { 301 304 if (self.options.host_id == 0) return; 302 305 const seq = self.last_upstream_seq orelse return; 303 - if (self.host_ops) |hq| { 304 - hq.push(.{ 305 - .host_id = self.options.host_id, 306 - .kind = .flush_cursor, 307 - .payload = .{ .seq = seq }, 308 - }); 306 + if (self.cursor_map) |cm| { 307 + if (self.cursor_slot) |slot| { 308 + cm.update(slot, seq); 309 + } 309 310 } 310 311 } 311 312