atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

narrow persist_order to cover only dp.persist()

move resequenceFrame, heap dupe, and broadcast_queue.push() outside
the ordering lock. persist_order now covers only the DB persist call
and seq store — the minimum needed for monotonic sequence assignment.

this eliminates the cascade where producers spin on persist_order
while another producer is blocked in a full broadcast_queue.push().
slight out-of-order in the ring is acceptable — seq is embedded in
frame data and consumers/history track by seq.

metrics showed persist_order_spins_total dominating at ~1,100 hosts
(548M spins) while push_lock_spins was zero — confirming the critical
section width was the bottleneck.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz c8efaa16 fe8a08cd

+16 -25
+3 -5
src/api/admin.zig
··· 74 74 log.debug("collection removeAll after ban failed: {s}", .{@errorName(err)}); 75 75 }; 76 76 77 - // emit #account event — same ordered publication path as workers: 78 - // persist_order spinlock → dp.persist → resequence → queue.push. 79 - // one publication path for all relay-sequenced events. 77 + // emit #account event — persist under narrow ordering lock, resequence + enqueue outside. 80 78 if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 81 79 while (ctx.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 82 80 std.atomic.spinLoopHint(); ··· 84 82 85 83 if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { 86 84 ctx.bc.stats.relay_seq.store(relay_seq, .release); 85 + ctx.bc.persist_order.store(0, .release); 86 + 87 87 const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 88 88 const owned = ctx.persist.allocator.dupe(u8, broadcast_data) catch { 89 - ctx.bc.persist_order.store(0, .release); 90 89 log.warn("admin: failed to alloc broadcast data for {s}", .{did}); 91 90 h.respondJson(conn, .ok, "{\"success\":true}"); 92 91 return; 93 92 }; 94 93 ctx.bc.broadcast_queue.push(relay_seq, owned, &ctx.bc.stats); 95 - ctx.bc.persist_order.store(0, .release); 96 94 log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 97 95 } else |err| { 98 96 ctx.bc.persist_order.store(0, .release);
+7 -11
src/frame_worker.zig
··· 273 273 else 274 274 .identity; 275 275 276 - // persist under ordering lock, then push to broadcast queue. 277 - // the broadcaster fiber (Evented) drains the queue and does fan-out — 278 - // worker threads never touch consumer state directly. 276 + // persist under narrow ordering lock (seq assignment only), then 277 + // resequence + enqueue outside the lock. slight out-of-order in the 278 + // ring is fine — seq is embedded in frame data and consumers track by seq. 279 279 if (work.persist) |dp| { 280 280 const relay_seq = blk: { 281 281 var spins: u64 = 0; ··· 293 293 return; 294 294 }; 295 295 work.bc.stats.relay_seq.store(seq, .release); 296 - const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data; 297 - // dupe for the broadcast queue — arena will free broadcast_data 298 - const owned = work.allocator.dupe(u8, broadcast_data) catch { 299 - work.bc.persist_order.store(0, .release); 300 - return; 301 - }; 302 - work.bc.broadcast_queue.push(seq, owned, &work.bc.stats); 303 296 work.bc.persist_order.store(0, .release); 304 297 break :blk seq; 305 298 }; 306 - _ = relay_seq; 299 + 300 + const broadcast_data = broadcaster.resequenceFrame(alloc, data, relay_seq) orelse data; 301 + const owned = work.allocator.dupe(u8, broadcast_data) catch return; 302 + work.bc.broadcast_queue.push(relay_seq, owned, &work.bc.stats); 307 303 308 304 // update per-DID state outside the ordering lock (Postgres round-trip) 309 305 if ((is_commit or is_sync) and uid > 0) {
+6 -9
src/subscriber.zig
··· 709 709 else // is_identity (unknown types already filtered above) 710 710 .identity; 711 711 712 - // persist and push to broadcast queue (broadcaster fiber handles fan-out). 713 - // ordering spinlock ensures frames are persisted + enqueued in seq order. 712 + // persist under narrow ordering lock (seq assignment only), then 713 + // resequence + enqueue outside the lock. 714 714 if (sub.persist) |dp| { 715 715 const relay_seq = blk: { 716 716 var spins: u64 = 0; ··· 728 728 return; 729 729 }; 730 730 sub.bc.stats.relay_seq.store(seq, .release); 731 - const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data; 732 - const owned = sub.allocator.dupe(u8, broadcast_data) catch { 733 - sub.bc.persist_order.store(0, .release); 734 - return; 735 - }; 736 - sub.bc.broadcast_queue.push(seq, owned, &sub.bc.stats); 737 731 sub.bc.persist_order.store(0, .release); 738 732 break :blk seq; 739 733 }; 740 - _ = relay_seq; 734 + 735 + const broadcast_data = broadcaster.resequenceFrame(alloc, data, relay_seq) orelse data; 736 + const owned = sub.allocator.dupe(u8, broadcast_data) catch return; 737 + sub.bc.broadcast_queue.push(relay_seq, owned, &sub.bc.stats); 741 738 742 739 // update per-DID state outside the ordering lock (Postgres round-trip) 743 740 if ((is_commit or is_sync) and uid > 0) {