atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 342 lines 13 kB view raw
1//! host ops — background thread for host bookkeeping (cursor flush, failure tracking) 2//! 3//! two mechanisms: 4//! 5//! 1. CursorMap — coalescing cursor store. subscribers atomically write their 6//! latest seq (nanoseconds, no lock). the worker thread sweeps every 5s and 7//! batch-flushes only changed cursors to the DB. this replaces the old FIFO 8//! cursor queue that caused ~350 individual UPDATEs/sec and busy-spin on full. 9//! 10//! 2. HostOpsQueue — MPSC ring for rare ops (increment_failures, reset_failures, 11//! update_status). same atomic spinlock pattern as BroadcastQueue. these ops 12//! are infrequent (connect/disconnect/error), so the queue never fills. 13 14const std = @import("std"); 15const event_log_mod = @import("event_log.zig"); 16const broadcaster = @import("broadcaster.zig"); 17 18const Io = std.Io; 19const log = std.log.scoped(.relay); 20 21// --------------------------------------------------------------------------- 22// cursor coalescing map 23// --------------------------------------------------------------------------- 24 25pub const CursorMap = struct { 26 const MAX_SLOTS = 4096; 27 /// sentinel: slot is inactive (freed or never used) — worker skips these 28 const EMPTY: u64 = 0; 29 30 host_ids: [MAX_SLOTS]u64 = @splat(0), 31 seqs: [MAX_SLOTS]std.atomic.Value(u64) = @splat(.{ .raw = EMPTY }), 32 /// last value successfully written to DB (worker-thread only, no atomics) 33 last_flushed: [MAX_SLOTS]u64 = @splat(0), 34 /// high-water mark — slots [0..count) have been used at least once 35 count: std.atomic.Value(u32) = .{ .raw = 0 }, 36 37 /// free list for slot reuse (protected by free_lock spinlock) 38 free_stack: [MAX_SLOTS]u32 = undefined, 39 free_top: u32 = 0, 40 free_lock: std.atomic.Value(u32) = .{ .raw = 0 }, 41 42 /// register a host and return its slot index. called at spawn time (slurper). 43 pub fn register(self: *CursorMap, host_id: u64, initial_seq: u64) ?u32 { 44 // try reusing a freed slot first 45 const slot = self.popFree() orelse blk: { 46 const s = self.count.fetchAdd(1, .acq_rel); 47 if (s >= MAX_SLOTS) { 48 _ = self.count.fetchSub(1, .monotonic); 49 log.warn("cursor_map: out of slots (max {d})", .{MAX_SLOTS}); 50 return null; 51 } 52 break :blk s; 53 }; 54 // write host_id and last_flushed before making the slot visible via seqs 55 self.host_ids[slot] = host_id; 56 self.last_flushed[slot] = initial_seq; 57 // release: makes host_ids write visible to worker thread's acquire load 58 self.seqs[slot].store(initial_seq, .release); 59 return slot; 60 } 61 62 /// return a slot to the free list. called when a subscriber exits (slurper). 63 pub fn unregister(self: *CursorMap, slot: u32) void { 64 // mark inactive first — worker sees EMPTY on next sweep and skips 65 self.seqs[slot].store(EMPTY, .release); 66 self.host_ids[slot] = 0; 67 self.pushFree(slot); 68 } 69 70 /// update cursor (called from subscriber — any Io context). single atomic store. 71 pub fn update(self: *CursorMap, slot: u32, seq: u64) void { 72 self.seqs[slot].store(seq, .release); 73 } 74 75 /// sweep all slots, flush changed cursors to DB. called from worker thread only. 76 fn flush(self: *CursorMap, persist: *event_log_mod.DiskPersist) void { 77 const n = self.count.load(.acquire); 78 var flushed: u32 = 0; 79 for (0..n) |i| { 80 const current = self.seqs[i].load(.acquire); 81 // EMPTY means freed or never-used slot — skip 82 if (current == EMPTY) continue; 83 if (current != self.last_flushed[i]) { 84 persist.updateHostSeq(self.host_ids[i], current) catch |err| { 85 log.debug("cursor flush failed for host_id={d}: {s}", .{ self.host_ids[i], @errorName(err) }); 86 continue; 87 }; 88 self.last_flushed[i] = current; 89 flushed += 1; 90 } 91 } 92 if (flushed > 0) { 93 log.debug("cursor_map: flushed {d}/{d} cursors", .{ flushed, n }); 94 } 95 } 96 97 fn popFree(self: *CursorMap) ?u32 { 98 self.lockFree(); 99 defer self.unlockFree(); 100 if (self.free_top == 0) return null; 101 self.free_top -= 1; 102 return self.free_stack[self.free_top]; 103 } 104 105 fn pushFree(self: *CursorMap, slot: u32) void { 106 self.lockFree(); 107 defer self.unlockFree(); 108 self.free_stack[self.free_top] = slot; 109 self.free_top += 1; 110 } 111 112 fn lockFree(self: *CursorMap) void { 113 while (self.free_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 114 std.atomic.spinLoopHint(); 115 } 116 } 117 118 fn unlockFree(self: *CursorMap) void { 119 self.free_lock.store(0, .release); 120 } 121}; 122 123// --------------------------------------------------------------------------- 124// rare ops queue (failures, status) 125// --------------------------------------------------------------------------- 126 127pub const HostOp = struct { 128 host_id: u64, 129 kind: Kind, 130 payload: Payload, 131 132 pub const Kind = enum { 133 increment_failures, 134 reset_failures, 135 update_status, 136 takedown_user, 137 }; 138 139 pub const Payload = union { 140 /// pointer to subscriber's host_shutdown atomic — set by worker on exhaustion 141 host_shutdown: *std.atomic.Value(bool), 142 none: void, 143 status: Status, 144 takedown: Takedown, 145 146 pub const Status = struct { 147 buf: [16]u8 = .{0} ** 16, 148 len: u8 = 0, 149 150 pub fn init(s: []const u8) Status { 151 var result: Status = .{}; 152 const n: u8 = @intCast(@min(s.len, 16)); 153 @memcpy(result.buf[0..n], s[0..n]); 154 result.len = n; 155 return result; 156 } 157 158 pub fn slice(self: *const Status) []const u8 { 159 return self.buf[0..self.len]; 160 } 161 }; 162 163 /// inline buffer for takedown — #account CBOR frame is <200 bytes 164 pub const Takedown = struct { 165 uid: u64, 166 frame_buf: [256]u8 = .{0} ** 256, 167 frame_len: u16 = 0, 168 169 pub fn frameSlice(self: *const Takedown) []const u8 { 170 return self.frame_buf[0..self.frame_len]; 171 } 172 }; 173 }; 174}; 175 176pub const HostOpsQueue = struct { 177 const CAPACITY = 4096; 178 const CURSOR_FLUSH_INTERVAL_SEC: i64 = 5; 179 180 items: [CAPACITY]HostOp = undefined, 181 head: std.atomic.Value(u32) = .{ .raw = 0 }, 182 tail: std.atomic.Value(u32) = .{ .raw = 0 }, 183 push_lock: std.atomic.Value(u32) = .{ .raw = 0 }, 184 185 persist: *event_log_mod.DiskPersist, 186 cursor_map: *CursorMap, 187 shutdown: *std.atomic.Value(bool), 188 max_consecutive_failures: u32, 189 bc: ?*broadcaster.Broadcaster = null, 190 191 /// push a rare op (called from any Io context). spins until space is available. 192 /// only used for failures/status — cursors go through CursorMap. 193 pub fn push(self: *HostOpsQueue, op: HostOp) void { 194 while (true) { 195 // acquire spinlock 196 while (self.push_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 197 std.atomic.spinLoopHint(); 198 } 199 200 const tail = self.tail.load(.monotonic); 201 const next_tail = (tail + 1) % CAPACITY; 202 if (next_tail == self.head.load(.acquire)) { 203 // full — release lock, yield, retry 204 self.push_lock.store(0, .release); 205 std.atomic.spinLoopHint(); 206 continue; 207 } 208 209 self.items[tail] = op; 210 self.tail.store(next_tail, .release); 211 self.push_lock.store(0, .release); 212 return; 213 } 214 } 215 216 /// pop an op (single-consumer — worker thread only). 217 fn pop(self: *HostOpsQueue) ?HostOp { 218 const head = self.head.load(.monotonic); 219 if (head == self.tail.load(.acquire)) return null; 220 221 const item = self.items[head]; 222 self.head.store((head + 1) % CAPACITY, .release); 223 return item; 224 } 225 226 /// worker thread entry point. runs on pool_io (Threaded). 227 /// drains rare ops + playback requests immediately, sweeps cursor map every 5s. 228 pub fn run(self: *HostOpsQueue, pool_io: Io) void { 229 var last_cursor_flush: i64 = timestamp(pool_io); 230 231 while (!self.shutdown.load(.acquire)) { 232 // drain rare ops (failures, status, takedowns) 233 var drained: u32 = 0; 234 while (self.pop()) |op| { 235 self.execute(op); 236 drained += 1; 237 } 238 239 // drain playback requests 240 drained += self.drainPlaybackRequests(); 241 242 // periodic cursor sweep 243 const now = timestamp(pool_io); 244 if (now - last_cursor_flush >= CURSOR_FLUSH_INTERVAL_SEC) { 245 self.cursor_map.flush(self.persist); 246 last_cursor_flush = now; 247 } 248 249 if (drained == 0) { 250 // brief sleep — rare ops are infrequent, cursor sweep is timer-driven 251 pool_io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch return; 252 } 253 } 254 255 // final cursor flush + drain remaining ops on shutdown 256 self.cursor_map.flush(self.persist); 257 while (self.pop()) |op| { 258 self.execute(op); 259 } 260 _ = self.drainPlaybackRequests(); 261 } 262 263 /// drain all pending playback requests from the MPSC queue 264 fn drainPlaybackRequests(self: *HostOpsQueue) u32 { 265 var maybe_batch = self.persist.popPlaybackBatch(); 266 var count: u32 = 0; 267 // Treiber stack pops in LIFO order — fine for playback (each request is independent) 268 while (maybe_batch) |req| { 269 maybe_batch = req.next.load(.acquire); 270 self.persist.playback(req.since, req.allocator, &req.entries) catch |e| { 271 req.err = e; 272 }; 273 req.done.store(true, .release); 274 count += 1; 275 } 276 return count; 277 } 278 279 fn execute(self: *HostOpsQueue, op: HostOp) void { 280 switch (op.kind) { 281 .increment_failures => { 282 const failures = self.persist.incrementHostFailures(op.host_id) catch 0; 283 if (failures >= self.max_consecutive_failures) { 284 log.warn("host_ops: host_id={d} exhausted after {d} failures", .{ op.host_id, failures }); 285 self.persist.updateHostStatus(op.host_id, "exhausted") catch {}; 286 op.payload.host_shutdown.store(true, .release); 287 } 288 }, 289 .reset_failures => { 290 self.persist.resetHostFailures(op.host_id) catch |err| { 291 log.debug("host_ops: reset failures failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 292 }; 293 }, 294 .update_status => { 295 self.persist.updateHostStatus(op.host_id, op.payload.status.slice()) catch |err| { 296 log.debug("host_ops: update status failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 297 }; 298 }, 299 .takedown_user => { 300 self.executeTakedown(op.payload.takedown); 301 }, 302 } 303 } 304 305 /// execute takedown on pool_io: takeDownUser + persist + broadcast 306 fn executeTakedown(self: *HostOpsQueue, td: HostOp.Payload.Takedown) void { 307 self.persist.takeDownUser(td.uid) catch |err| { 308 log.warn("host_ops: takedown failed for uid={d}: {s}", .{ td.uid, @errorName(err) }); 309 return; 310 }; 311 312 if (td.frame_len == 0) return; 313 const frame_bytes = td.frameSlice(); 314 const bc = self.bc orelse return; 315 316 // persist the #account event under ordering lock 317 while (bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 318 std.atomic.spinLoopHint(); 319 } 320 321 if (self.persist.persist(.account, td.uid, frame_bytes)) |relay_seq| { 322 bc.stats.relay_seq.store(relay_seq, .release); 323 324 const broadcast_data = broadcaster.resequenceFrame(self.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 325 const owned = self.persist.allocator.dupe(u8, broadcast_data) catch { 326 bc.persist_order.store(0, .release); 327 log.warn("host_ops: failed to alloc broadcast data for takedown uid={d}", .{td.uid}); 328 return; 329 }; 330 bc.broadcast_queue.push(relay_seq, owned, &bc.stats); 331 bc.persist_order.store(0, .release); 332 log.info("host_ops: emitted #account takedown for uid={d} (seq={d})", .{ td.uid, relay_seq }); 333 } else |err| { 334 bc.persist_order.store(0, .release); 335 log.warn("host_ops: persist #account takedown failed: {s}", .{@errorName(err)}); 336 } 337 } 338 339 fn timestamp(io: Io) i64 { 340 return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 341 } 342};