atproto relay implementation in zig
zlay.waow.tech
1//! host ops — background thread for host bookkeeping (cursor flush, failure tracking)
2//!
3//! two mechanisms:
4//!
5//! 1. CursorMap — coalescing cursor store. subscribers atomically write their
6//! latest seq (nanoseconds, no lock). the worker thread sweeps every 5s and
7//! batch-flushes only changed cursors to the DB. this replaces the old FIFO
8//! cursor queue that caused ~350 individual UPDATEs/sec and busy-spin on full.
9//!
10//! 2. HostOpsQueue — MPSC ring for rare ops (increment_failures, reset_failures,
11//! update_status). same atomic spinlock pattern as BroadcastQueue. these ops
12//! are infrequent (connect/disconnect/error), so the queue never fills.
13
14const std = @import("std");
15const event_log_mod = @import("event_log.zig");
16const broadcaster = @import("broadcaster.zig");
17
18const Io = std.Io;
19const log = std.log.scoped(.relay);
20
21// ---------------------------------------------------------------------------
22// cursor coalescing map
23// ---------------------------------------------------------------------------
24
25pub const CursorMap = struct {
26 const MAX_SLOTS = 4096;
27 /// sentinel: slot is inactive (freed or never used) — worker skips these
28 const EMPTY: u64 = 0;
29
30 host_ids: [MAX_SLOTS]u64 = @splat(0),
31 seqs: [MAX_SLOTS]std.atomic.Value(u64) = @splat(.{ .raw = EMPTY }),
32 /// last value successfully written to DB (worker-thread only, no atomics)
33 last_flushed: [MAX_SLOTS]u64 = @splat(0),
34 /// high-water mark — slots [0..count) have been used at least once
35 count: std.atomic.Value(u32) = .{ .raw = 0 },
36
37 /// free list for slot reuse (protected by free_lock spinlock)
38 free_stack: [MAX_SLOTS]u32 = undefined,
39 free_top: u32 = 0,
40 free_lock: std.atomic.Value(u32) = .{ .raw = 0 },
41
42 /// register a host and return its slot index. called at spawn time (slurper).
43 pub fn register(self: *CursorMap, host_id: u64, initial_seq: u64) ?u32 {
44 // try reusing a freed slot first
45 const slot = self.popFree() orelse blk: {
46 const s = self.count.fetchAdd(1, .acq_rel);
47 if (s >= MAX_SLOTS) {
48 _ = self.count.fetchSub(1, .monotonic);
49 log.warn("cursor_map: out of slots (max {d})", .{MAX_SLOTS});
50 return null;
51 }
52 break :blk s;
53 };
54 // write host_id and last_flushed before making the slot visible via seqs
55 self.host_ids[slot] = host_id;
56 self.last_flushed[slot] = initial_seq;
57 // release: makes host_ids write visible to worker thread's acquire load
58 self.seqs[slot].store(initial_seq, .release);
59 return slot;
60 }
61
62 /// return a slot to the free list. called when a subscriber exits (slurper).
63 pub fn unregister(self: *CursorMap, slot: u32) void {
64 // mark inactive first — worker sees EMPTY on next sweep and skips
65 self.seqs[slot].store(EMPTY, .release);
66 self.host_ids[slot] = 0;
67 self.pushFree(slot);
68 }
69
70 /// update cursor (called from subscriber — any Io context). single atomic store.
71 pub fn update(self: *CursorMap, slot: u32, seq: u64) void {
72 self.seqs[slot].store(seq, .release);
73 }
74
75 /// sweep all slots, flush changed cursors to DB. called from worker thread only.
76 fn flush(self: *CursorMap, persist: *event_log_mod.DiskPersist) void {
77 const n = self.count.load(.acquire);
78 var flushed: u32 = 0;
79 for (0..n) |i| {
80 const current = self.seqs[i].load(.acquire);
81 // EMPTY means freed or never-used slot — skip
82 if (current == EMPTY) continue;
83 if (current != self.last_flushed[i]) {
84 persist.updateHostSeq(self.host_ids[i], current) catch |err| {
85 log.debug("cursor flush failed for host_id={d}: {s}", .{ self.host_ids[i], @errorName(err) });
86 continue;
87 };
88 self.last_flushed[i] = current;
89 flushed += 1;
90 }
91 }
92 if (flushed > 0) {
93 log.debug("cursor_map: flushed {d}/{d} cursors", .{ flushed, n });
94 }
95 }
96
97 fn popFree(self: *CursorMap) ?u32 {
98 self.lockFree();
99 defer self.unlockFree();
100 if (self.free_top == 0) return null;
101 self.free_top -= 1;
102 return self.free_stack[self.free_top];
103 }
104
105 fn pushFree(self: *CursorMap, slot: u32) void {
106 self.lockFree();
107 defer self.unlockFree();
108 self.free_stack[self.free_top] = slot;
109 self.free_top += 1;
110 }
111
112 fn lockFree(self: *CursorMap) void {
113 while (self.free_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) {
114 std.atomic.spinLoopHint();
115 }
116 }
117
118 fn unlockFree(self: *CursorMap) void {
119 self.free_lock.store(0, .release);
120 }
121};
122
123// ---------------------------------------------------------------------------
124// rare ops queue (failures, status)
125// ---------------------------------------------------------------------------
126
127pub const HostOp = struct {
128 host_id: u64,
129 kind: Kind,
130 payload: Payload,
131
132 pub const Kind = enum {
133 increment_failures,
134 reset_failures,
135 update_status,
136 takedown_user,
137 };
138
139 pub const Payload = union {
140 /// pointer to subscriber's host_shutdown atomic — set by worker on exhaustion
141 host_shutdown: *std.atomic.Value(bool),
142 none: void,
143 status: Status,
144 takedown: Takedown,
145
146 pub const Status = struct {
147 buf: [16]u8 = .{0} ** 16,
148 len: u8 = 0,
149
150 pub fn init(s: []const u8) Status {
151 var result: Status = .{};
152 const n: u8 = @intCast(@min(s.len, 16));
153 @memcpy(result.buf[0..n], s[0..n]);
154 result.len = n;
155 return result;
156 }
157
158 pub fn slice(self: *const Status) []const u8 {
159 return self.buf[0..self.len];
160 }
161 };
162
163 /// inline buffer for takedown — #account CBOR frame is <200 bytes
164 pub const Takedown = struct {
165 uid: u64,
166 frame_buf: [256]u8 = .{0} ** 256,
167 frame_len: u16 = 0,
168
169 pub fn frameSlice(self: *const Takedown) []const u8 {
170 return self.frame_buf[0..self.frame_len];
171 }
172 };
173 };
174};
175
176pub const HostOpsQueue = struct {
177 const CAPACITY = 4096;
178 const CURSOR_FLUSH_INTERVAL_SEC: i64 = 5;
179
180 items: [CAPACITY]HostOp = undefined,
181 head: std.atomic.Value(u32) = .{ .raw = 0 },
182 tail: std.atomic.Value(u32) = .{ .raw = 0 },
183 push_lock: std.atomic.Value(u32) = .{ .raw = 0 },
184
185 persist: *event_log_mod.DiskPersist,
186 cursor_map: *CursorMap,
187 shutdown: *std.atomic.Value(bool),
188 max_consecutive_failures: u32,
189 bc: ?*broadcaster.Broadcaster = null,
190
191 /// push a rare op (called from any Io context). spins until space is available.
192 /// only used for failures/status — cursors go through CursorMap.
193 pub fn push(self: *HostOpsQueue, op: HostOp) void {
194 while (true) {
195 // acquire spinlock
196 while (self.push_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) {
197 std.atomic.spinLoopHint();
198 }
199
200 const tail = self.tail.load(.monotonic);
201 const next_tail = (tail + 1) % CAPACITY;
202 if (next_tail == self.head.load(.acquire)) {
203 // full — release lock, yield, retry
204 self.push_lock.store(0, .release);
205 std.atomic.spinLoopHint();
206 continue;
207 }
208
209 self.items[tail] = op;
210 self.tail.store(next_tail, .release);
211 self.push_lock.store(0, .release);
212 return;
213 }
214 }
215
216 /// pop an op (single-consumer — worker thread only).
217 fn pop(self: *HostOpsQueue) ?HostOp {
218 const head = self.head.load(.monotonic);
219 if (head == self.tail.load(.acquire)) return null;
220
221 const item = self.items[head];
222 self.head.store((head + 1) % CAPACITY, .release);
223 return item;
224 }
225
226 /// worker thread entry point. runs on pool_io (Threaded).
227 /// drains rare ops + playback requests immediately, sweeps cursor map every 5s.
228 pub fn run(self: *HostOpsQueue, pool_io: Io) void {
229 var last_cursor_flush: i64 = timestamp(pool_io);
230
231 while (!self.shutdown.load(.acquire)) {
232 // drain rare ops (failures, status, takedowns)
233 var drained: u32 = 0;
234 while (self.pop()) |op| {
235 self.execute(op);
236 drained += 1;
237 }
238
239 // drain playback requests
240 drained += self.drainPlaybackRequests();
241
242 // periodic cursor sweep
243 const now = timestamp(pool_io);
244 if (now - last_cursor_flush >= CURSOR_FLUSH_INTERVAL_SEC) {
245 self.cursor_map.flush(self.persist);
246 last_cursor_flush = now;
247 }
248
249 if (drained == 0) {
250 // brief sleep — rare ops are infrequent, cursor sweep is timer-driven
251 pool_io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch return;
252 }
253 }
254
255 // final cursor flush + drain remaining ops on shutdown
256 self.cursor_map.flush(self.persist);
257 while (self.pop()) |op| {
258 self.execute(op);
259 }
260 _ = self.drainPlaybackRequests();
261 }
262
263 /// drain all pending playback requests from the MPSC queue
264 fn drainPlaybackRequests(self: *HostOpsQueue) u32 {
265 var maybe_batch = self.persist.popPlaybackBatch();
266 var count: u32 = 0;
267 // Treiber stack pops in LIFO order — fine for playback (each request is independent)
268 while (maybe_batch) |req| {
269 maybe_batch = req.next.load(.acquire);
270 self.persist.playback(req.since, req.allocator, &req.entries) catch |e| {
271 req.err = e;
272 };
273 req.done.store(true, .release);
274 count += 1;
275 }
276 return count;
277 }
278
279 fn execute(self: *HostOpsQueue, op: HostOp) void {
280 switch (op.kind) {
281 .increment_failures => {
282 const failures = self.persist.incrementHostFailures(op.host_id) catch 0;
283 if (failures >= self.max_consecutive_failures) {
284 log.warn("host_ops: host_id={d} exhausted after {d} failures", .{ op.host_id, failures });
285 self.persist.updateHostStatus(op.host_id, "exhausted") catch {};
286 op.payload.host_shutdown.store(true, .release);
287 }
288 },
289 .reset_failures => {
290 self.persist.resetHostFailures(op.host_id) catch |err| {
291 log.debug("host_ops: reset failures failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) });
292 };
293 },
294 .update_status => {
295 self.persist.updateHostStatus(op.host_id, op.payload.status.slice()) catch |err| {
296 log.debug("host_ops: update status failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) });
297 };
298 },
299 .takedown_user => {
300 self.executeTakedown(op.payload.takedown);
301 },
302 }
303 }
304
305 /// execute takedown on pool_io: takeDownUser + persist + broadcast
306 fn executeTakedown(self: *HostOpsQueue, td: HostOp.Payload.Takedown) void {
307 self.persist.takeDownUser(td.uid) catch |err| {
308 log.warn("host_ops: takedown failed for uid={d}: {s}", .{ td.uid, @errorName(err) });
309 return;
310 };
311
312 if (td.frame_len == 0) return;
313 const frame_bytes = td.frameSlice();
314 const bc = self.bc orelse return;
315
316 // persist the #account event under ordering lock
317 while (bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) {
318 std.atomic.spinLoopHint();
319 }
320
321 if (self.persist.persist(.account, td.uid, frame_bytes)) |relay_seq| {
322 bc.stats.relay_seq.store(relay_seq, .release);
323
324 const broadcast_data = broadcaster.resequenceFrame(self.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes;
325 const owned = self.persist.allocator.dupe(u8, broadcast_data) catch {
326 bc.persist_order.store(0, .release);
327 log.warn("host_ops: failed to alloc broadcast data for takedown uid={d}", .{td.uid});
328 return;
329 };
330 bc.broadcast_queue.push(relay_seq, owned, &bc.stats);
331 bc.persist_order.store(0, .release);
332 log.info("host_ops: emitted #account takedown for uid={d} (seq={d})", .{ td.uid, relay_seq });
333 } else |err| {
334 bc.persist_order.store(0, .release);
335 log.warn("host_ops: persist #account takedown failed: {s}", .{@errorName(err)});
336 }
337 }
338
339 fn timestamp(io: Io) i64 {
340 return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s));
341 }
342};