atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

add pipeline contention metrics + zero-consumer fast path

instrumentation for the ~1,300 host CPU cliff:
- relay_persist_order_spins_total: spin iterations on the ordering lock
- relay_broadcast_queue_full_total: spin iterations on full broadcast queue
- relay_broadcast_queue_depth_hwm: high-water mark of queue depth
- relay_broadcast_no_consumers_total: frames that skipped SharedFrame alloc

zero-consumer fast path: when no consumers are connected, broadcast()
returns after history.push() without allocating SharedFrame or taking
consumers_mutex. saves one heap alloc + one mutex per frame.

also includes the cursor coalesce fix (CursorMap) and slot reuse
(free list with unregister on subscriber exit) from previous commits.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+72 -7
+1 -1
src/api/admin.zig
··· 91 91 h.respondJson(conn, .ok, "{\"success\":true}"); 92 92 return; 93 93 }; 94 - ctx.bc.broadcast_queue.push(relay_seq, owned); 94 + ctx.bc.broadcast_queue.push(relay_seq, owned, &ctx.bc.stats); 95 95 ctx.bc.persist_order.store(0, .release); 96 96 log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 97 97 } else |err| {
+57 -2
src/broadcaster.zig
··· 57 57 host_authority_time_us: std.atomic.Value(u64) = .{ .raw = 0 }, 58 58 // frame pool memory pressure 59 59 pool_queued_bytes: std.atomic.Value(u64) = .{ .raw = 0 }, 60 + // persist/broadcast pipeline contention 61 + persist_order_spins: std.atomic.Value(u64) = .{ .raw = 0 }, 62 + broadcast_queue_full: std.atomic.Value(u64) = .{ .raw = 0 }, 63 + broadcast_queue_depth_hwm: std.atomic.Value(u32) = .{ .raw = 0 }, 64 + broadcast_no_consumers: std.atomic.Value(u64) = .{ .raw = 0 }, 60 65 start_time: i64 = 0, 61 66 }; 62 67 ··· 256 261 return .{ .allocator = allocator }; 257 262 } 258 263 264 + /// approximate queue depth (may be slightly stale — for metrics only) 265 + pub fn depth(self: *const BroadcastQueue) u32 { 266 + const tail = self.tail.load(.monotonic); 267 + const head = self.head.load(.monotonic); 268 + return if (tail >= head) tail - head else CAPACITY - head + tail; 269 + } 270 + 259 271 /// push an item (called by worker threads). spins until space is available. 260 272 /// matches Indigo semantics: every persisted event reaches live broadcast. 261 273 /// the broadcaster fiber drains at memory speed (no I/O), so spin is brief. 262 - pub fn push(self: *BroadcastQueue, seq: u64, data: []const u8) void { 274 + pub fn push(self: *BroadcastQueue, seq: u64, data: []const u8, stats: *Stats) void { 275 + var full_spins: u32 = 0; 263 276 while (true) { 264 277 // acquire spinlock 265 278 while (self.push_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { ··· 271 284 if (next_tail == self.head.load(.acquire)) { 272 285 // full — release lock, yield, retry 273 286 self.push_lock.store(0, .release); 287 + full_spins += 1; 274 288 std.atomic.spinLoopHint(); 275 289 continue; 276 290 } ··· 278 292 self.items[tail] = .{ .seq = seq, .data = data }; 279 293 self.tail.store(next_tail, .release); 280 294 self.push_lock.store(0, .release); 295 + 296 + if (full_spins > 0) { 297 + _ = stats.broadcast_queue_full.fetchAdd(full_spins, .monotonic); 298 + } 299 + // update high-water mark (racy but directionally correct) 300 + const d = self.depth(); 301 + const hwm = stats.broadcast_queue_depth_hwm.load(.monotonic); 302 + if (d > hwm) { 303 + _ = stats.broadcast_queue_depth_hwm.cmpxchgWeak(hwm, d, .monotonic, .monotonic); 304 + } 281 305 return; 282 306 } 283 307 } ··· 522 546 // add to history for cursor replay 523 547 _ = self.history.push(seq, data); 524 548 549 + // fast path: skip SharedFrame allocation when nobody is listening 550 + if (self.stats.consumer_count.load(.acquire) == 0) { 551 + _ = self.stats.broadcast_no_consumers.fetchAdd(1, .monotonic); 552 + return; 553 + } 554 + 525 555 // create one shared frame for all consumers 526 556 const frame = SharedFrame.create(self.allocator, data) catch return; 527 557 defer frame.release(); // release broadcaster's reference ··· 933 963 stats.host_authority_time_us.load(.acquire), 934 964 stats.pool_queued_bytes.load(.acquire), 935 965 }) catch return w.buffered(); 966 + 967 + // pipeline contention metrics (separate print to stay under 32-arg limit) 968 + w.print( 969 + \\# TYPE relay_persist_order_spins_total counter 970 + \\# HELP relay_persist_order_spins_total spin iterations waiting for persist ordering lock 971 + \\relay_persist_order_spins_total {d} 972 + \\ 973 + \\# TYPE relay_broadcast_queue_full_total counter 974 + \\# HELP relay_broadcast_queue_full_total spin iterations on full broadcast queue 975 + \\relay_broadcast_queue_full_total {d} 976 + \\ 977 + \\# TYPE relay_broadcast_queue_depth_hwm gauge 978 + \\# HELP relay_broadcast_queue_depth_hwm high-water mark of broadcast queue depth 979 + \\relay_broadcast_queue_depth_hwm {d} 980 + \\ 981 + \\# TYPE relay_broadcast_no_consumers_total counter 982 + \\# HELP relay_broadcast_no_consumers_total frames skipped broadcast (no consumers) 983 + \\relay_broadcast_no_consumers_total {d} 984 + \\ 985 + , .{ 986 + stats.persist_order_spins.load(.acquire), 987 + stats.broadcast_queue_full.load(.acquire), 988 + stats.broadcast_queue_depth_hwm.load(.acquire), 989 + stats.broadcast_no_consumers.load(.acquire), 990 + }) catch {}; 936 991 937 992 // validation failure breakdown by reason 938 993 w.print( ··· 1424 1479 bc_ptr.persist_order.store(0, .release); 1425 1480 continue; 1426 1481 }; 1427 - bc_ptr.broadcast_queue.push(seq, data); 1482 + bc_ptr.broadcast_queue.push(seq, data, &bc_ptr.stats); 1428 1483 bc_ptr.persist_order.store(0, .release); 1429 1484 } 1430 1485 }
+7 -2
src/frame_worker.zig
··· 278 278 // worker threads never touch consumer state directly. 279 279 if (work.persist) |dp| { 280 280 const relay_seq = blk: { 281 + var spins: u64 = 0; 281 282 while (work.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 283 + spins += 1; 282 284 std.atomic.spinLoopHint(); 285 + } 286 + if (spins > 0) { 287 + _ = work.bc.stats.persist_order_spins.fetchAdd(spins, .monotonic); 283 288 } 284 289 285 290 const seq = dp.persist(kind, uid, data) catch |err| { ··· 294 299 work.bc.persist_order.store(0, .release); 295 300 return; 296 301 }; 297 - work.bc.broadcast_queue.push(seq, owned); 302 + work.bc.broadcast_queue.push(seq, owned, &work.bc.stats); 298 303 work.bc.persist_order.store(0, .release); 299 304 break :blk seq; 300 305 }; ··· 321 326 } else { 322 327 const upstream_seq = payload.getUint("seq") orelse 0; 323 328 const owned = work.allocator.dupe(u8, data) catch return; 324 - work.bc.broadcast_queue.push(upstream_seq, owned); 329 + work.bc.broadcast_queue.push(upstream_seq, owned, &work.bc.stats); 325 330 } 326 331 } 327 332
+7 -2
src/subscriber.zig
··· 713 713 // ordering spinlock ensures frames are persisted + enqueued in seq order. 714 714 if (sub.persist) |dp| { 715 715 const relay_seq = blk: { 716 + var spins: u64 = 0; 716 717 while (sub.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 718 + spins += 1; 717 719 std.atomic.spinLoopHint(); 720 + } 721 + if (spins > 0) { 722 + _ = sub.bc.stats.persist_order_spins.fetchAdd(spins, .monotonic); 718 723 } 719 724 720 725 const seq = dp.persist(kind, uid, data) catch |err| { ··· 728 733 sub.bc.persist_order.store(0, .release); 729 734 return; 730 735 }; 731 - sub.bc.broadcast_queue.push(seq, owned); 736 + sub.bc.broadcast_queue.push(seq, owned, &sub.bc.stats); 732 737 sub.bc.persist_order.store(0, .release); 733 738 break :blk seq; 734 739 }; ··· 755 760 } else { 756 761 const seq = upstream_seq orelse 0; 757 762 const owned = sub.allocator.dupe(u8, data) catch return; 758 - sub.bc.broadcast_queue.push(seq, owned); 763 + sub.bc.broadcast_queue.push(seq, owned, &sub.bc.stats); 759 764 } 760 765 } 761 766