atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

init: zlay relay — standalone AT Protocol firehose relay

subscriber (upstream firehose), validator (DID key cache + sig verification),
broadcaster (WebSocket fan-out), event_log (SQLite persistence), ring_buffer.

depends on zat SDK for CBOR/CAR/MST/crypto primitives.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz f1693b35

+3108
+9
.dockerignore
··· 1 + .git 2 + .github 3 + .claude 4 + zig-out 5 + zig-cache 6 + .zig-cache 7 + data 8 + *.o 9 + *.swp
+5
.gitignore
··· 1 + .zig-cache/ 2 + zig-out/ 3 + data/ 4 + .env 5 + .env.*
+14
Dockerfile
··· 1 + FROM alpine:3.21 AS builder 2 + RUN apk add --no-cache zig sqlite-dev 3 + WORKDIR /src 4 + COPY . . 5 + RUN zig build -Doptimize=ReleaseSafe 6 + 7 + FROM alpine:3.21 8 + RUN apk add --no-cache sqlite-libs ca-certificates 9 + COPY --from=builder /src/zig-out/bin/relay /usr/local/bin/relay 10 + RUN mkdir -p /data/events 11 + ENV RELAY_DATA_DIR=/data/events 12 + ENV RELAY_DB_PATH=/data/relay.sqlite 13 + EXPOSE 3000 3001 14 + ENTRYPOINT ["/usr/local/bin/relay"]
+65
build.zig
··· 1 + const std = @import("std"); 2 + 3 + pub fn build(b: *std.Build) void { 4 + const target = b.standardTargetOptions(.{}); 5 + const optimize = b.standardOptimizeOption(.{}); 6 + 7 + const zat = b.dependency("zat", .{ 8 + .target = target, 9 + .optimize = optimize, 10 + }); 11 + const websocket = b.dependency("websocket", .{ 12 + .target = target, 13 + .optimize = optimize, 14 + }); 15 + const zqlite = b.dependency("zqlite", .{ 16 + .target = target, 17 + .optimize = optimize, 18 + }); 19 + 20 + const imports: []const std.Build.Module.Import = &.{ 21 + .{ .name = "zat", .module = zat.module("zat") }, 22 + .{ .name = "websocket", .module = websocket.module("websocket") }, 23 + .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 24 + }; 25 + 26 + // relay executable 27 + const relay_mod = b.createModule(.{ 28 + .root_source_file = b.path("src/main.zig"), 29 + .target = target, 30 + .optimize = optimize, 31 + .imports = imports, 32 + }); 33 + const relay = b.addExecutable(.{ 34 + .name = "zlay", 35 + .root_module = relay_mod, 36 + }); 37 + relay.linkLibC(); 38 + b.installArtifact(relay); 39 + 40 + const run_relay = b.addRunArtifact(relay); 41 + if (b.args) |args| run_relay.addArgs(args); 42 + const run_step = b.step("run", "run the relay"); 43 + run_step.dependOn(&run_relay.step); 44 + 45 + // tests 46 + const test_step = b.step("test", "run unit tests"); 47 + const test_files = .{ 48 + "src/broadcaster.zig", 49 + "src/validator.zig", 50 + "src/subscriber.zig", 51 + "src/event_log.zig", 52 + }; 53 + inline for (test_files) |file| { 54 + const t = b.addTest(.{ 55 + .root_module = b.createModule(.{ 56 + .root_source_file = b.path(file), 57 + .target = target, 58 + .optimize = optimize, 59 + .imports = imports, 60 + }), 61 + }); 62 + t.linkLibC(); 63 + test_step.dependOn(&b.addRunArtifact(t).step); 64 + } 65 + }
+25
build.zig.zon
··· 1 + .{ 2 + .name = .zlay, 3 + .version = "0.0.1", 4 + .fingerprint = 0x31343ede133f3e58, 5 + .minimum_zig_version = "0.15.0", 6 + .dependencies = .{ 7 + .zat = .{ 8 + .url = "https://tangled.org/zat.dev/zat/archive/0071362.tar.gz", 9 + .hash = "zat-0.2.6-5PuC7ka3BAD826XlwCMgmCA21Xy7TKoQBlVlfvEHEENm", 10 + }, 11 + .websocket = .{ 12 + .url = "https://github.com/karlseguin/websocket.zig/archive/97fefafa59cc78ce177cff540b8685cd7f699276.tar.gz", 13 + .hash = "websocket-0.1.0-ZPISdRlzAwBB_Bz2UMMqxYqF6YEVTIBoFsbzwPUJTHIc", 14 + }, 15 + .zqlite = .{ 16 + .url = "https://github.com/karlseguin/zqlite.zig/archive/05a88d6758753e1c63fdd45b211dde2057094b0c.tar.gz", 17 + .hash = "zqlite-0.0.1-RWLaYz6bmAAT7E_jxopXf-j5Ea8VQldnxsd6TU8sa0Bb", 18 + }, 19 + }, 20 + .paths = .{ 21 + "build.zig", 22 + "build.zig.zon", 23 + "src", 24 + }, 25 + }
+661
src/broadcaster.zig
··· 1 + //! relay broadcaster — fan-out firehose frames to downstream consumers 2 + //! 3 + //! features: 4 + //! - ref-counted shared frames (one copy per broadcast, not per consumer) 5 + //! - ConsumerTooSlow CBOR error frame before disconnect 6 + //! - ping/pong keepalive (30s idle, 5s deadline) 7 + //! - non-blocking broadcast via per-consumer ring buffers + write threads 8 + //! - health/stats HTTP endpoints via websocket handshake routing 9 + 10 + const std = @import("std"); 11 + const websocket = @import("websocket"); 12 + const zat = @import("zat"); 13 + const ring_buffer = @import("ring_buffer.zig"); 14 + const event_log_mod = @import("event_log.zig"); 15 + 16 + const Allocator = std.mem.Allocator; 17 + const log = std.log.scoped(.relay); 18 + 19 + // --- stats --- 20 + 21 + pub const Stats = struct { 22 + seq: std.atomic.Value(i64) = .{ .raw = 0 }, 23 + relay_seq: std.atomic.Value(u64) = .{ .raw = 0 }, 24 + consumer_count: std.atomic.Value(u64) = .{ .raw = 0 }, 25 + frames_in: std.atomic.Value(u64) = .{ .raw = 0 }, 26 + frames_out: std.atomic.Value(u64) = .{ .raw = 0 }, 27 + validated: std.atomic.Value(u64) = .{ .raw = 0 }, 28 + failed: std.atomic.Value(u64) = .{ .raw = 0 }, 29 + skipped: std.atomic.Value(u64) = .{ .raw = 0 }, 30 + decode_errors: std.atomic.Value(u64) = .{ .raw = 0 }, 31 + cache_hits: std.atomic.Value(u64) = .{ .raw = 0 }, 32 + cache_misses: std.atomic.Value(u64) = .{ .raw = 0 }, 33 + slow_consumers: std.atomic.Value(u64) = .{ .raw = 0 }, 34 + start_time: i64 = 0, 35 + }; 36 + 37 + // --- shared frame (ref-counted) --- 38 + 39 + pub const SharedFrame = struct { 40 + data: []const u8, 41 + ref_count: std.atomic.Value(u32), 42 + allocator: Allocator, 43 + 44 + pub fn create(allocator: Allocator, data: []const u8) !*SharedFrame { 45 + const frame = try allocator.create(SharedFrame); 46 + const duped = try allocator.dupe(u8, data); 47 + frame.* = .{ 48 + .data = duped, 49 + .ref_count = .{ .raw = 1 }, 50 + .allocator = allocator, 51 + }; 52 + return frame; 53 + } 54 + 55 + pub fn acquire(self: *SharedFrame) void { 56 + _ = self.ref_count.fetchAdd(1, .monotonic); 57 + } 58 + 59 + pub fn release(self: *SharedFrame) void { 60 + if (self.ref_count.fetchSub(1, .acq_rel) == 1) { 61 + self.allocator.free(self.data); 62 + self.allocator.destroy(self); 63 + } 64 + } 65 + }; 66 + 67 + // --- ConsumerTooSlow error frame --- 68 + 69 + /// build the ConsumerTooSlow CBOR error frame using the SDK encoder. 70 + /// header: {op: -1, t: "#info"}, payload: {error: "ConsumerTooSlow", message: "..."} 71 + fn buildErrorFrame(allocator: Allocator) ?[]const u8 { 72 + const cbor = zat.cbor; 73 + 74 + // header: {op: -1, t: "#info"} 75 + const header: cbor.Value = .{ .map = &.{ 76 + .{ .key = "op", .value = .{ .negative = -1 } }, 77 + .{ .key = "t", .value = .{ .text = "#info" } }, 78 + } }; 79 + 80 + // payload: {error: "ConsumerTooSlow", message: "consumer buffer full"} 81 + const payload: cbor.Value = .{ .map = &.{ 82 + .{ .key = "error", .value = .{ .text = "ConsumerTooSlow" } }, 83 + .{ .key = "message", .value = .{ .text = "consumer buffer full" } }, 84 + } }; 85 + 86 + const header_bytes = cbor.encodeAlloc(allocator, header) catch return null; 87 + const payload_bytes = cbor.encodeAlloc(allocator, payload) catch { 88 + allocator.free(header_bytes); 89 + return null; 90 + }; 91 + 92 + var frame = allocator.alloc(u8, header_bytes.len + payload_bytes.len) catch { 93 + allocator.free(header_bytes); 94 + allocator.free(payload_bytes); 95 + return null; 96 + }; 97 + @memcpy(frame[0..header_bytes.len], header_bytes); 98 + @memcpy(frame[header_bytes.len..], payload_bytes); 99 + 100 + allocator.free(header_bytes); 101 + allocator.free(payload_bytes); 102 + 103 + return frame; 104 + } 105 + 106 + // --- consumer --- 107 + 108 + const ping_interval_ns: u64 = 30 * std.time.ns_per_s; 109 + const ping_timeout_ns: u64 = 5 * std.time.ns_per_s; 110 + 111 + pub const Consumer = struct { 112 + const BUFFER_CAP = 8192; 113 + 114 + conn: *websocket.Conn, 115 + allocator: Allocator, 116 + buf: [BUFFER_CAP]*SharedFrame = undefined, 117 + write_pos: usize = 0, 118 + read_pos: usize = 0, 119 + buf_len: usize = 0, 120 + alive: std.atomic.Value(bool) = .{ .raw = true }, 121 + mutex: std.Thread.Mutex = .{}, 122 + cond: std.Thread.Condition = .{}, 123 + thread: ?std.Thread = null, 124 + last_send_time: i128 = 0, 125 + 126 + /// push a shared frame to this consumer's send buffer. 127 + /// acquires a reference. returns false if full (consumer too slow). 128 + pub fn enqueue(self: *Consumer, frame: *SharedFrame) bool { 129 + self.mutex.lock(); 130 + defer self.mutex.unlock(); 131 + 132 + if (self.buf_len == BUFFER_CAP) return false; 133 + 134 + frame.acquire(); 135 + self.buf[self.write_pos] = frame; 136 + self.write_pos = (self.write_pos + 1) % BUFFER_CAP; 137 + self.buf_len += 1; 138 + self.cond.signal(); 139 + return true; 140 + } 141 + 142 + /// push raw bytes (for replay, error frames). allocates a new SharedFrame. 143 + pub fn enqueueRaw(self: *Consumer, data: []const u8) bool { 144 + const frame = SharedFrame.create(self.allocator, data) catch return false; 145 + // enqueue will acquire, so we start with refcount 1 (for our ownership) 146 + // then release our ownership after enqueue 147 + if (self.enqueue(frame)) { 148 + frame.release(); // drop our ref, consumer write loop holds one 149 + return true; 150 + } 151 + frame.release(); 152 + return false; 153 + } 154 + 155 + fn dequeue(self: *Consumer) ?*SharedFrame { 156 + // caller must hold mutex 157 + if (self.buf_len == 0) return null; 158 + const frame = self.buf[self.read_pos]; 159 + self.read_pos = (self.read_pos + 1) % BUFFER_CAP; 160 + self.buf_len -= 1; 161 + return frame; 162 + } 163 + 164 + fn writeLoop(self: *Consumer) void { 165 + self.last_send_time = std.time.nanoTimestamp(); 166 + 167 + while (self.alive.load(.acquire)) { 168 + var frame: ?*SharedFrame = null; 169 + { 170 + self.mutex.lock(); 171 + defer self.mutex.unlock(); 172 + while (self.buf_len == 0 and self.alive.load(.acquire)) { 173 + // wake every 100ms to check ping timer 174 + self.cond.timedWait(&self.mutex, 100 * std.time.ns_per_ms) catch {}; 175 + } 176 + frame = self.dequeue(); 177 + } 178 + if (frame) |f| { 179 + defer f.release(); 180 + self.conn.writeBin(f.data) catch { 181 + self.alive.store(false, .release); 182 + return; 183 + }; 184 + self.last_send_time = std.time.nanoTimestamp(); 185 + } else { 186 + // no data — check if we should send a ping 187 + self.maybePing(); 188 + } 189 + } 190 + // drain remaining buffered frames 191 + while (true) { 192 + self.mutex.lock(); 193 + const frame = self.dequeue(); 194 + self.mutex.unlock(); 195 + if (frame) |f| { 196 + f.release(); 197 + } else break; 198 + } 199 + } 200 + 201 + fn maybePing(self: *Consumer) void { 202 + const now = std.time.nanoTimestamp(); 203 + const elapsed: u64 = @intCast(@max(0, now - self.last_send_time)); 204 + if (elapsed >= ping_interval_ns) { 205 + // send ping 206 + self.conn.writePing(&.{}) catch { 207 + self.alive.store(false, .release); 208 + return; 209 + }; 210 + self.last_send_time = now; 211 + } 212 + } 213 + 214 + pub fn shutdown(self: *Consumer) void { 215 + self.alive.store(false, .release); 216 + self.cond.signal(); 217 + if (self.thread) |t| t.join(); 218 + self.thread = null; 219 + } 220 + }; 221 + 222 + // --- broadcaster --- 223 + 224 + const history_capacity = 50_000; 225 + const FrameHistory = ring_buffer.RingBuffer(history_capacity); 226 + 227 + pub const Broadcaster = struct { 228 + allocator: Allocator, 229 + consumers: std.ArrayListUnmanaged(*Consumer) = .{}, 230 + consumers_mutex: std.Thread.Mutex = .{}, 231 + history: FrameHistory, 232 + persist: ?*event_log_mod.DiskPersist = null, 233 + stats: Stats = .{}, 234 + error_frame: ?[]const u8 = null, 235 + 236 + pub fn init(allocator: Allocator) Broadcaster { 237 + return .{ 238 + .allocator = allocator, 239 + .history = FrameHistory.init(allocator), 240 + .stats = .{ .start_time = std.time.timestamp() }, 241 + .error_frame = buildErrorFrame(allocator), 242 + }; 243 + } 244 + 245 + pub fn deinit(self: *Broadcaster) void { 246 + self.consumers_mutex.lock(); 247 + defer self.consumers_mutex.unlock(); 248 + for (self.consumers.items) |consumer| { 249 + consumer.shutdown(); 250 + self.allocator.destroy(consumer); 251 + } 252 + self.consumers.deinit(self.allocator); 253 + self.history.deinit(); 254 + if (self.error_frame) |ef| self.allocator.free(ef); 255 + } 256 + 257 + pub fn addConsumer(self: *Broadcaster, conn: *websocket.Conn) !*Consumer { 258 + const consumer = try self.allocator.create(Consumer); 259 + consumer.* = .{ 260 + .conn = conn, 261 + .allocator = self.allocator, 262 + }; 263 + consumer.thread = std.Thread.spawn(.{}, Consumer.writeLoop, .{consumer}) catch { 264 + self.allocator.destroy(consumer); 265 + return error.ThreadSpawnFailed; 266 + }; 267 + 268 + self.consumers_mutex.lock(); 269 + defer self.consumers_mutex.unlock(); 270 + self.consumers.append(self.allocator, consumer) catch { 271 + consumer.shutdown(); 272 + self.allocator.destroy(consumer); 273 + return error.OutOfMemory; 274 + }; 275 + _ = self.stats.consumer_count.fetchAdd(1, .monotonic); 276 + log.info("consumer connected ({d} total)", .{self.consumers.items.len}); 277 + return consumer; 278 + } 279 + 280 + pub fn removeConsumer(self: *Broadcaster, consumer: *Consumer) void { 281 + { 282 + self.consumers_mutex.lock(); 283 + defer self.consumers_mutex.unlock(); 284 + for (self.consumers.items, 0..) |c, i| { 285 + if (c == consumer) { 286 + _ = self.consumers.swapRemove(i); 287 + break; 288 + } 289 + } 290 + } 291 + consumer.shutdown(); 292 + _ = self.stats.consumer_count.fetchSub(1, .monotonic); 293 + log.info("consumer disconnected ({d} remaining)", .{self.consumers.items.len}); 294 + self.allocator.destroy(consumer); 295 + } 296 + 297 + /// broadcast a frame to all consumers. non-blocking — just enqueues. 298 + /// drops slow consumers whose buffers are full after sending ConsumerTooSlow. 299 + pub fn broadcast(self: *Broadcaster, seq: i64, data: []const u8) void { 300 + _ = self.stats.frames_in.fetchAdd(1, .monotonic); 301 + self.stats.seq.store(seq, .release); 302 + 303 + // add to history for cursor replay 304 + _ = self.history.push(seq, data); 305 + 306 + // create one shared frame for all consumers 307 + const frame = SharedFrame.create(self.allocator, data) catch return; 308 + defer frame.release(); // release broadcaster's reference 309 + 310 + self.consumers_mutex.lock(); 311 + defer self.consumers_mutex.unlock(); 312 + 313 + var frames_sent: u64 = 0; 314 + var i: usize = 0; 315 + while (i < self.consumers.items.len) { 316 + const consumer = self.consumers.items[i]; 317 + if (!consumer.alive.load(.acquire)) { 318 + _ = self.consumers.swapRemove(i); 319 + consumer.shutdown(); 320 + _ = self.stats.consumer_count.fetchSub(1, .monotonic); 321 + self.allocator.destroy(consumer); 322 + continue; 323 + } 324 + if (consumer.enqueue(frame)) { 325 + frames_sent += 1; 326 + } else { 327 + // buffer full — send ConsumerTooSlow error, then drop 328 + self.dropSlowConsumer(consumer); 329 + _ = self.consumers.swapRemove(i); 330 + _ = self.stats.consumer_count.fetchSub(1, .monotonic); 331 + _ = self.stats.slow_consumers.fetchAdd(1, .monotonic); 332 + continue; 333 + } 334 + i += 1; 335 + } 336 + _ = self.stats.frames_out.fetchAdd(frames_sent, .monotonic); 337 + } 338 + 339 + fn dropSlowConsumer(self: *Broadcaster, consumer: *Consumer) void { 340 + log.warn("dropping slow consumer (ConsumerTooSlow)", .{}); 341 + // try to send error frame directly (bypass buffer since it's full) 342 + if (self.error_frame) |ef| { 343 + consumer.conn.writeBin(ef) catch {}; 344 + } 345 + consumer.alive.store(false, .release); 346 + consumer.cond.signal(); 347 + consumer.conn.close(.{}) catch {}; 348 + // clean up asynchronously to avoid joining thread while holding mutex 349 + const alloc = self.allocator; 350 + const cleanup_thread = std.Thread.spawn(.{}, struct { 351 + fn run(c: *Consumer, a: Allocator) void { 352 + c.shutdown(); 353 + a.destroy(c); 354 + } 355 + }.run, .{ consumer, alloc }) catch null; 356 + if (cleanup_thread) |t| t.detach(); 357 + } 358 + 359 + /// two-phase cursor replay: disk (diskpersist) first, then in-memory ring buffer. 360 + /// the consumer is already in the live broadcast list, so frames arriving 361 + /// during replay are buffered — no gap possible. 362 + pub fn replayTo(self: *Broadcaster, consumer: *Consumer, cursor: i64) void { 363 + // phase 1: disk replay from diskpersist 364 + if (self.persist) |dp| { 365 + const cursor_u64: u64 = if (cursor > 0) @intCast(cursor) else 0; 366 + var entries: std.ArrayListUnmanaged(event_log_mod.PlaybackEntry) = .{}; 367 + defer { 368 + for (entries.items) |e| self.allocator.free(e.data); 369 + entries.deinit(self.allocator); 370 + } 371 + 372 + dp.playback(cursor_u64, self.allocator, &entries) catch |err| { 373 + log.warn("disk replay failed: {s}, falling back to memory", .{@errorName(err)}); 374 + self.replayFromMemory(consumer, cursor); 375 + return; 376 + }; 377 + 378 + var replayed: usize = 0; 379 + for (entries.items) |entry| { 380 + if (!consumer.enqueueRaw(entry.data)) { 381 + log.warn("replay buffer full after {d} frames", .{replayed}); 382 + break; 383 + } 384 + replayed += 1; 385 + } 386 + if (replayed > 0) { 387 + log.info("replayed {d} frames from disk (cursor={d})", .{ replayed, cursor }); 388 + } 389 + return; 390 + } 391 + 392 + // phase 2 fallback: in-memory ring buffer (no persist configured) 393 + self.replayFromMemory(consumer, cursor); 394 + } 395 + 396 + fn replayFromMemory(self: *Broadcaster, consumer: *Consumer, cursor: i64) void { 397 + self.history.mutex.lock(); 398 + defer self.history.mutex.unlock(); 399 + 400 + var i: usize = 0; 401 + while (i < self.history.len) : (i += 1) { 402 + const idx = (self.history.read_pos + i) % history_capacity; 403 + const entry = self.history.entries[idx]; 404 + if (entry.seq > cursor) { 405 + if (!consumer.enqueueRaw(entry.data)) break; 406 + } 407 + } 408 + } 409 + 410 + pub fn consumerCount(self: *Broadcaster) usize { 411 + self.consumers_mutex.lock(); 412 + defer self.consumers_mutex.unlock(); 413 + return self.consumers.items.len; 414 + } 415 + }; 416 + 417 + // --- websocket handler --- 418 + 419 + pub const Handler = struct { 420 + consumer: ?*Consumer, 421 + broadcaster: *Broadcaster, 422 + conn: *websocket.Conn, 423 + cursor: ?i64, 424 + 425 + /// called during handshake — validate path, parse cursor. 426 + /// do NOT start the write thread here (handshake reply hasn't been sent yet). 427 + pub fn init(handshake: *const websocket.Handshake, conn: *websocket.Conn, ctx: *Broadcaster) !Handler { 428 + const url = handshake.url; 429 + const path = if (std.mem.indexOfScalar(u8, url, '?')) |qi| url[0..qi] else url; 430 + 431 + if (!std.mem.eql(u8, path, "/xrpc/com.atproto.sync.subscribeRepos")) { 432 + return error.NotFound; 433 + } 434 + 435 + // parse cursor (deferred until afterInit when connection is fully upgraded) 436 + var cursor: ?i64 = null; 437 + if (std.mem.indexOf(u8, url, "cursor=")) |cursor_start| { 438 + const value_start = cursor_start + "cursor=".len; 439 + const value_end = std.mem.indexOfScalarPos(u8, url, value_start, '&') orelse url.len; 440 + const cursor_str = url[value_start..value_end]; 441 + cursor = std.fmt.parseInt(i64, cursor_str, 10) catch null; 442 + } 443 + 444 + return .{ .consumer = null, .broadcaster = ctx, .conn = conn, .cursor = cursor }; 445 + } 446 + 447 + /// called AFTER the websocket handshake reply has been sent to the client. 448 + /// safe to start the consumer write thread and begin sending frames. 449 + pub fn afterInit(self: *Handler, ctx: *Broadcaster) !void { 450 + const consumer = ctx.addConsumer(self.conn) catch return error.Unexpected; 451 + self.consumer = consumer; 452 + 453 + if (self.cursor) |cursor| { 454 + ctx.replayTo(consumer, cursor); 455 + } 456 + } 457 + 458 + pub fn clientMessage(_: *Handler, _: []const u8) !void { 459 + // relay consumers are read-only 460 + } 461 + 462 + pub fn close(self: *Handler) void { 463 + if (self.consumer) |c| self.broadcaster.removeConsumer(c); 464 + } 465 + }; 466 + 467 + pub fn formatPrometheusMetrics(stats: *const Stats, buf: []u8) []const u8 { 468 + const uptime: i64 = std.time.timestamp() - stats.start_time; 469 + return std.fmt.bufPrint(buf, 470 + \\# TYPE relay_frames_received_total counter 471 + \\relay_frames_received_total {d} 472 + \\ 473 + \\# TYPE relay_frames_broadcast_total counter 474 + \\relay_frames_broadcast_total {d} 475 + \\ 476 + \\# TYPE relay_consumers_active gauge 477 + \\relay_consumers_active {d} 478 + \\ 479 + \\# TYPE relay_validation_total counter 480 + \\relay_validation_total{{result="validated"}} {d} 481 + \\relay_validation_total{{result="failed"}} {d} 482 + \\relay_validation_total{{result="skipped"}} {d} 483 + \\ 484 + \\# TYPE relay_decode_errors_total counter 485 + \\relay_decode_errors_total {d} 486 + \\ 487 + \\# TYPE relay_cache_total counter 488 + \\relay_cache_total{{result="hit"}} {d} 489 + \\relay_cache_total{{result="miss"}} {d} 490 + \\ 491 + \\# TYPE relay_slow_consumers_total counter 492 + \\relay_slow_consumers_total {d} 493 + \\ 494 + \\# TYPE relay_upstream_seq gauge 495 + \\relay_upstream_seq {d} 496 + \\ 497 + \\# TYPE relay_seq gauge 498 + \\relay_seq {d} 499 + \\ 500 + \\# TYPE relay_uptime_seconds gauge 501 + \\relay_uptime_seconds {d} 502 + \\ 503 + , .{ 504 + stats.frames_in.load(.acquire), 505 + stats.frames_out.load(.acquire), 506 + stats.consumer_count.load(.acquire), 507 + stats.validated.load(.acquire), 508 + stats.failed.load(.acquire), 509 + stats.skipped.load(.acquire), 510 + stats.decode_errors.load(.acquire), 511 + stats.cache_hits.load(.acquire), 512 + stats.cache_misses.load(.acquire), 513 + stats.slow_consumers.load(.acquire), 514 + stats.seq.load(.acquire), 515 + stats.relay_seq.load(.acquire), 516 + uptime, 517 + }) catch ""; 518 + } 519 + 520 + pub fn formatStatsResponse(stats: *const Stats, buf: []u8) []const u8 { 521 + var json_buf: [2048]u8 = undefined; 522 + const json = std.fmt.bufPrint(&json_buf, 523 + \\{{"seq":{d},"relay_seq":{d},"consumers":{d},"frames_in":{d},"frames_out":{d},"validated":{d},"failed":{d},"skipped":{d},"decode_errors":{d},"cache_hits":{d},"cache_misses":{d},"slow_consumers":{d},"uptime_seconds":{d}}} 524 + , .{ 525 + stats.seq.load(.acquire), 526 + stats.relay_seq.load(.acquire), 527 + stats.consumer_count.load(.acquire), 528 + stats.frames_in.load(.acquire), 529 + stats.frames_out.load(.acquire), 530 + stats.validated.load(.acquire), 531 + stats.failed.load(.acquire), 532 + stats.skipped.load(.acquire), 533 + stats.decode_errors.load(.acquire), 534 + stats.cache_hits.load(.acquire), 535 + stats.cache_misses.load(.acquire), 536 + stats.slow_consumers.load(.acquire), 537 + std.time.timestamp() - stats.start_time, 538 + }) catch return "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\n\r\n"; 539 + 540 + return std.fmt.bufPrint( 541 + buf, 542 + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {d}\r\nConnection: close\r\n\r\n{s}", 543 + .{ json.len, json }, 544 + ) catch "HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\n\r\n"; 545 + } 546 + 547 + // --- tests --- 548 + 549 + test "broadcaster add and remove consumer" { 550 + var b = Broadcaster.init(std.testing.allocator); 551 + defer b.deinit(); 552 + 553 + try std.testing.expectEqual(@as(i64, 0), b.stats.seq.load(.acquire)); 554 + try std.testing.expectEqual(@as(usize, 0), b.consumerCount()); 555 + } 556 + 557 + test "broadcast updates stats and history" { 558 + var b = Broadcaster.init(std.testing.allocator); 559 + defer b.deinit(); 560 + 561 + b.broadcast(1, "frame1"); 562 + b.broadcast(2, "frame2"); 563 + b.broadcast(3, "frame3"); 564 + 565 + try std.testing.expectEqual(@as(i64, 3), b.stats.seq.load(.acquire)); 566 + try std.testing.expectEqual(@as(u64, 3), b.stats.frames_in.load(.acquire)); 567 + try std.testing.expectEqual(@as(i64, 1), b.history.oldestSeq().?); 568 + try std.testing.expectEqual(@as(i64, 3), b.history.newestSeq().?); 569 + } 570 + 571 + test "frame history supports cursor replay" { 572 + var b = Broadcaster.init(std.testing.allocator); 573 + defer b.deinit(); 574 + 575 + for (1..6) |i| { 576 + var buf: [32]u8 = undefined; 577 + const data = std.fmt.bufPrint(&buf, "frame{d}", .{i}) catch unreachable; 578 + b.broadcast(@intCast(i), data); 579 + } 580 + 581 + const frames = try b.history.framesSince(std.testing.allocator, 3); 582 + defer { 583 + for (frames) |f| std.testing.allocator.free(f.data); 584 + std.testing.allocator.free(frames); 585 + } 586 + 587 + try std.testing.expectEqual(@as(usize, 2), frames.len); 588 + try std.testing.expectEqual(@as(i64, 4), frames[0].seq); 589 + try std.testing.expectEqual(@as(i64, 5), frames[1].seq); 590 + } 591 + 592 + test "shared frame ref counting" { 593 + const frame = try SharedFrame.create(std.testing.allocator, "hello"); 594 + frame.acquire(); // ref=2 595 + frame.acquire(); // ref=3 596 + frame.release(); // ref=2 597 + frame.release(); // ref=1 598 + frame.release(); // ref=0 → freed 599 + } 600 + 601 + test "error frame is valid CBOR" { 602 + const cbor = zat.cbor; 603 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 604 + defer arena.deinit(); 605 + const alloc = arena.allocator(); 606 + 607 + const frame = buildErrorFrame(alloc) orelse return error.FrameBuildFailed; 608 + 609 + // decode header 610 + const h_result = try cbor.decode(alloc, frame); 611 + const h = h_result.value; 612 + try std.testing.expectEqual(@as(i64, -1), h.getInt("op").?); 613 + try std.testing.expectEqualStrings("#info", h.getString("t").?); 614 + 615 + // decode payload 616 + const p = try cbor.decodeAll(alloc, frame[h_result.consumed..]); 617 + try std.testing.expectEqualStrings("ConsumerTooSlow", p.getString("error").?); 618 + try std.testing.expectEqualStrings("consumer buffer full", p.getString("message").?); 619 + } 620 + 621 + test "formatPrometheusMetrics produces valid output" { 622 + var stats = Stats{ .start_time = std.time.timestamp() - 60 }; 623 + stats.seq.store(99999, .release); 624 + stats.relay_seq.store(12345, .release); 625 + stats.consumer_count.store(3, .release); 626 + stats.frames_in.store(10000, .release); 627 + stats.frames_out.store(9000, .release); 628 + stats.validated.store(500, .release); 629 + stats.failed.store(2, .release); 630 + stats.skipped.store(9498, .release); 631 + stats.cache_hits.store(400, .release); 632 + stats.cache_misses.store(100, .release); 633 + 634 + var buf: [4096]u8 = undefined; 635 + const output = formatPrometheusMetrics(&stats, &buf); 636 + 637 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_received_total 10000") != null); 638 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_broadcast_total 9000") != null); 639 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_consumers_active 3") != null); 640 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_validation_total{result=\"validated\"} 500") != null); 641 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_validation_total{result=\"failed\"} 2") != null); 642 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_upstream_seq 99999") != null); 643 + try std.testing.expect(std.mem.indexOf(u8, output, "relay_seq 12345") != null); 644 + try std.testing.expect(std.mem.indexOf(u8, output, "# TYPE relay_uptime_seconds gauge") != null); 645 + } 646 + 647 + test "formatStatsResponse produces valid JSON" { 648 + var stats = Stats{ .start_time = std.time.timestamp() - 42 }; 649 + stats.seq.store(100, .release); 650 + stats.frames_in.store(200, .release); 651 + stats.consumer_count.store(5, .release); 652 + 653 + var buf: [4096]u8 = undefined; 654 + const response = formatStatsResponse(&stats, &buf); 655 + 656 + try std.testing.expect(std.mem.startsWith(u8, response, "HTTP/1.1 200 OK")); 657 + try std.testing.expect(std.mem.indexOf(u8, response, "\"seq\":100") != null); 658 + try std.testing.expect(std.mem.indexOf(u8, response, "\"consumers\":5") != null); 659 + try std.testing.expect(std.mem.indexOf(u8, response, "\"frames_in\":200") != null); 660 + try std.testing.expect(std.mem.indexOf(u8, response, "\"slow_consumers\":0") != null); 661 + }
+990
src/event_log.zig
··· 1 + //! disk persistence matching indigo's diskpersist format 2 + //! 3 + //! append-only log files with relay-assigned sequence numbers. 4 + //! SQLite metadata index for fast cursor→file lookup. 5 + //! 6 + //! on-disk entry format (28-byte LE header + CBOR payload): 7 + //! [4B flags LE] [4B kind LE] [4B payload_len LE] [8B uid LE] [8B seq LE] [payload] 8 + //! 9 + //! file naming: evts-{startSeq} (rotated every events_per_file events) 10 + //! 11 + //! see: indigo cmd/relay/stream/persist/diskpersist/diskpersist.go 12 + 13 + const std = @import("std"); 14 + const zqlite = @import("zqlite"); 15 + 16 + const Allocator = std.mem.Allocator; 17 + const log = std.log.scoped(.relay); 18 + 19 + // --- constants matching indigo --- 20 + 21 + pub const header_size: usize = 28; // 4 + 4 + 4 + 8 + 8 22 + 23 + pub const EvtKind = enum(u32) { 24 + commit = 1, 25 + handle = 2, // deprecated 26 + tombstone = 3, // deprecated 27 + identity = 4, 28 + account = 5, 29 + sync = 6, 30 + }; 31 + 32 + pub const EvtFlags = struct { 33 + pub const takedown: u32 = 1; 34 + pub const rebased: u32 = 2; 35 + }; 36 + 37 + const default_events_per_file: u32 = 10_000; 38 + const default_flush_interval_ms: u64 = 100; 39 + const default_flush_threshold: usize = 400; 40 + 41 + // --- header --- 42 + 43 + pub const EvtHeader = struct { 44 + flags: u32, 45 + kind: u32, 46 + len: u32, // payload length (not including header) 47 + uid: u64, 48 + seq: u64, 49 + 50 + pub fn encode(self: EvtHeader, buf: *[header_size]u8) void { 51 + std.mem.writeInt(u32, buf[0..4], self.flags, .little); 52 + std.mem.writeInt(u32, buf[4..8], self.kind, .little); 53 + std.mem.writeInt(u32, buf[8..12], self.len, .little); 54 + std.mem.writeInt(u64, buf[12..20], self.uid, .little); 55 + std.mem.writeInt(u64, buf[20..28], self.seq, .little); 56 + } 57 + 58 + pub fn decode(buf: *const [header_size]u8) EvtHeader { 59 + return .{ 60 + .flags = std.mem.readInt(u32, buf[0..4], .little), 61 + .kind = std.mem.readInt(u32, buf[4..8], .little), 62 + .len = std.mem.readInt(u32, buf[8..12], .little), 63 + .uid = std.mem.readInt(u64, buf[12..20], .little), 64 + .seq = std.mem.readInt(u64, buf[20..28], .little), 65 + }; 66 + } 67 + }; 68 + 69 + // --- persist job (write buffer entry) --- 70 + 71 + const PersistJob = struct { 72 + data: []u8, // header + payload, owned 73 + seq: u64, // assigned seq (for broadcast ordering) 74 + }; 75 + 76 + // --- disk persistence --- 77 + 78 + pub const DiskPersist = struct { 79 + allocator: Allocator, 80 + dir_path: []const u8, 81 + dir: std.fs.Dir, 82 + db: zqlite.Conn, 83 + current_file: ?std.fs.File = null, 84 + current_file_path: ?[]const u8 = null, 85 + 86 + // sequence state 87 + cur_seq: u64 = 1, 88 + event_counter: u64 = 0, 89 + 90 + // config 91 + events_per_file: u32 = default_events_per_file, 92 + retention_hours: u64 = 72, // 3 days 93 + 94 + // DID → UID cache (matches indigo's bidirectional ARC cache) 95 + did_cache: std.StringHashMapUnmanaged(u64) = .{}, 96 + did_cache_mutex: std.Thread.Mutex = .{}, 97 + 98 + // write buffer (flushed periodically or when threshold hit) 99 + outbuf: std.ArrayListUnmanaged(u8) = .{}, 100 + evtbuf: std.ArrayListUnmanaged(PersistJob) = .{}, 101 + mutex: std.Thread.Mutex = .{}, 102 + 103 + // flush thread 104 + flush_thread: ?std.Thread = null, 105 + alive: std.atomic.Value(bool) = .{ .raw = true }, 106 + flush_cond: std.Thread.Condition = .{}, 107 + 108 + pub fn init(allocator: Allocator, dir_path: []const u8, db_path: []const u8) !DiskPersist { 109 + // ensure directory exists 110 + std.fs.cwd().makePath(dir_path) catch |err| switch (err) { 111 + error.PathAlreadyExists => {}, 112 + else => return err, 113 + }; 114 + 115 + var dir = try std.fs.cwd().openDir(dir_path, .{ .iterate = true }); 116 + errdefer dir.close(); 117 + 118 + // ensure db parent directory exists 119 + if (std.fs.path.dirname(db_path)) |parent| { 120 + std.fs.cwd().makePath(parent) catch |err| switch (err) { 121 + error.PathAlreadyExists => {}, 122 + else => return err, 123 + }; 124 + } 125 + 126 + // open SQLite 127 + const db_path_z = try allocator.dupeZ(u8, db_path); 128 + defer allocator.free(db_path_z); 129 + var db = try zqlite.open(db_path_z, zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite); 130 + errdefer db.close(); 131 + 132 + // pragmas 133 + try db.execNoArgs("PRAGMA journal_mode=WAL"); 134 + try db.execNoArgs("PRAGMA busy_timeout=5000"); 135 + try db.execNoArgs("PRAGMA synchronous=NORMAL"); 136 + 137 + // create tables 138 + try db.execNoArgs( 139 + \\CREATE TABLE IF NOT EXISTS log_file_refs ( 140 + \\ id INTEGER PRIMARY KEY AUTOINCREMENT, 141 + \\ path TEXT NOT NULL, 142 + \\ archived INTEGER NOT NULL DEFAULT 0, 143 + \\ seq_start INTEGER NOT NULL, 144 + \\ created_at TEXT NOT NULL DEFAULT (datetime('now')) 145 + \\) 146 + ); 147 + try db.execNoArgs( 148 + \\CREATE TABLE IF NOT EXISTS account ( 149 + \\ uid INTEGER PRIMARY KEY AUTOINCREMENT, 150 + \\ did TEXT NOT NULL UNIQUE, 151 + \\ status TEXT NOT NULL DEFAULT 'active', 152 + \\ rev TEXT, 153 + \\ commit_data_cid BLOB, 154 + \\ created_at TEXT NOT NULL DEFAULT (datetime('now')) 155 + \\) 156 + ); 157 + 158 + var self = DiskPersist{ 159 + .allocator = allocator, 160 + .dir_path = try allocator.dupe(u8, dir_path), 161 + .dir = dir, 162 + .db = db, 163 + }; 164 + 165 + // recover from existing log files 166 + try self.resumeLog(); 167 + 168 + return self; 169 + } 170 + 171 + pub fn deinit(self: *DiskPersist) void { 172 + // stop flush thread 173 + self.alive.store(false, .release); 174 + self.flush_cond.signal(); 175 + if (self.flush_thread) |t| t.join(); 176 + 177 + // flush remaining 178 + self.mutex.lock(); 179 + self.flushLocked() catch {}; 180 + self.mutex.unlock(); 181 + 182 + // free write buffer 183 + for (self.evtbuf.items) |job| self.allocator.free(job.data); 184 + self.evtbuf.deinit(self.allocator); 185 + self.outbuf.deinit(self.allocator); 186 + 187 + // free DID cache keys 188 + { 189 + var it = self.did_cache.iterator(); 190 + while (it.next()) |entry| self.allocator.free(entry.key_ptr.*); 191 + self.did_cache.deinit(self.allocator); 192 + } 193 + 194 + if (self.current_file) |f| f.close(); 195 + if (self.current_file_path) |p| self.allocator.free(p); 196 + self.dir.close(); 197 + self.db.close(); 198 + self.allocator.free(self.dir_path); 199 + } 200 + 201 + /// start the background flush thread 202 + pub fn start(self: *DiskPersist) !void { 203 + self.flush_thread = try std.Thread.spawn(.{}, flushLoop, .{self}); 204 + } 205 + 206 + /// resolve a DID to a numeric UID. creates a new account row on first encounter. 207 + /// matches indigo's Relay.DidToUid → Account.UID mapping. 208 + pub fn uidForDid(self: *DiskPersist, did: []const u8) !u64 { 209 + // fast path: check in-memory cache 210 + { 211 + self.did_cache_mutex.lock(); 212 + defer self.did_cache_mutex.unlock(); 213 + if (self.did_cache.get(did)) |uid| return uid; 214 + } 215 + 216 + // check database 217 + if (self.db.row( 218 + "SELECT uid FROM account WHERE did = ?", 219 + .{did}, 220 + )) |maybe_row| { 221 + if (maybe_row) |r| { 222 + defer r.deinit(); 223 + const uid: u64 = @intCast(r.int(0)); 224 + // populate cache 225 + const did_duped = try self.allocator.dupe(u8, did); 226 + self.did_cache_mutex.lock(); 227 + defer self.did_cache_mutex.unlock(); 228 + self.did_cache.put(self.allocator, did_duped, uid) catch { 229 + self.allocator.free(did_duped); 230 + }; 231 + return uid; 232 + } 233 + } else |_| {} 234 + 235 + // create new account row (ignore if already exists from concurrent insert) 236 + self.db.exec( 237 + "INSERT OR IGNORE INTO account (did) VALUES (?)", 238 + .{did}, 239 + ) catch |err| { 240 + log.warn("failed to create account for {s}: {s}", .{ did, @errorName(err) }); 241 + return err; 242 + }; 243 + 244 + // read back the UID (whether we just created it or it already existed) 245 + const row = try self.db.row( 246 + "SELECT uid FROM account WHERE did = ?", 247 + .{did}, 248 + ) orelse return error.AccountCreationFailed; 249 + defer row.deinit(); 250 + const uid: u64 = @intCast(row.int(0)); 251 + 252 + // populate cache 253 + const did_duped = try self.allocator.dupe(u8, did); 254 + self.did_cache_mutex.lock(); 255 + defer self.did_cache_mutex.unlock(); 256 + self.did_cache.put(self.allocator, did_duped, uid) catch { 257 + self.allocator.free(did_duped); 258 + }; 259 + 260 + return uid; 261 + } 262 + 263 + /// per-DID sync state for chain tracking 264 + pub const AccountState = struct { 265 + rev: []const u8, 266 + data_cid: []const u8, 267 + }; 268 + 269 + /// get stored sync state for a user 270 + pub fn getAccountState(self: *DiskPersist, uid: u64, allocator: Allocator) !?AccountState { 271 + const row = (try self.db.row( 272 + "SELECT rev, commit_data_cid FROM account WHERE uid = ? AND rev IS NOT NULL", 273 + .{@as(i64, @intCast(uid))}, 274 + )) orelse return null; 275 + defer row.deinit(); 276 + const rev = row.text(0); 277 + const data_cid = row.blob(1); 278 + if (rev.len == 0 or data_cid.len == 0) return null; 279 + return .{ 280 + .rev = try allocator.dupe(u8, rev), 281 + .data_cid = try allocator.dupe(u8, data_cid), 282 + }; 283 + } 284 + 285 + /// update stored sync state after a verified commit 286 + pub fn updateAccountState(self: *DiskPersist, uid: u64, rev: []const u8, data_cid: []const u8) !void { 287 + try self.db.exec( 288 + "UPDATE account SET rev = ?, commit_data_cid = ? WHERE uid = ?", 289 + .{ rev, data_cid, @as(i64, @intCast(uid)) }, 290 + ); 291 + } 292 + 293 + /// persist an event. assigns a sequence number. returns the assigned seq. 294 + /// the event is buffered and will be flushed to disk asynchronously. 295 + pub fn persist(self: *DiskPersist, kind: EvtKind, uid: u64, payload: []const u8) !u64 { 296 + // build the on-disk record: header + payload 297 + const total_len = header_size + payload.len; 298 + const data = try self.allocator.alloc(u8, total_len); 299 + errdefer self.allocator.free(data); 300 + 301 + // write header (seq filled in later under lock) 302 + const header = EvtHeader{ 303 + .flags = 0, 304 + .kind = @intFromEnum(kind), 305 + .len = @intCast(payload.len), 306 + .uid = uid, 307 + .seq = 0, // placeholder 308 + }; 309 + header.encode(data[0..header_size]); 310 + @memcpy(data[header_size..], payload); 311 + 312 + self.mutex.lock(); 313 + defer self.mutex.unlock(); 314 + 315 + // assign seq 316 + const seq = self.cur_seq; 317 + self.cur_seq += 1; 318 + std.mem.writeInt(u64, data[20..28], seq, .little); 319 + 320 + try self.evtbuf.append(self.allocator, .{ .data = data, .seq = seq }); 321 + try self.outbuf.appendSlice(self.allocator, data); 322 + self.event_counter += 1; 323 + 324 + // flush if threshold hit 325 + if (self.evtbuf.items.len >= default_flush_threshold) { 326 + try self.flushLocked(); 327 + } 328 + 329 + return seq; 330 + } 331 + 332 + /// playback events with seq > since. calls cb for each event. 333 + pub fn playback(self: *DiskPersist, since: u64, allocator: Allocator, result: *std.ArrayListUnmanaged(PlaybackEntry)) !void { 334 + self.mutex.lock(); 335 + defer self.mutex.unlock(); 336 + 337 + // find the log file containing `since` 338 + var start_files: std.ArrayListUnmanaged(LogFileRef) = .{}; 339 + defer start_files.deinit(allocator); 340 + 341 + if (since > 0) { 342 + // find file whose seq_start is just before `since` 343 + if (self.db.row("SELECT id, path, seq_start FROM log_file_refs WHERE seq_start <= ? ORDER BY seq_start DESC LIMIT 1", .{since})) |row| { 344 + if (row) |r| { 345 + defer r.deinit(); 346 + try start_files.append(allocator, .{ 347 + .path = try allocator.dupe(u8, r.text(1)), 348 + .seq_start = @intCast(r.int(2)), 349 + }); 350 + } 351 + } else |_| {} 352 + } 353 + 354 + // find all subsequent files 355 + { 356 + var rows = try self.db.rows("SELECT id, path, seq_start FROM log_file_refs WHERE seq_start > ? ORDER BY seq_start ASC", .{since}); 357 + defer rows.deinit(); 358 + while (rows.next()) |r| { 359 + try start_files.append(allocator, .{ 360 + .path = try allocator.dupe(u8, r.text(1)), 361 + .seq_start = @intCast(r.int(2)), 362 + }); 363 + } 364 + } 365 + 366 + defer for (start_files.items) |f| allocator.free(f.path); 367 + 368 + // read events from each file 369 + for (start_files.items) |ref| { 370 + var file = self.dir.openFile(ref.path, .{}) catch continue; 371 + defer file.close(); 372 + try readEventsFrom(allocator, file, since, result); 373 + } 374 + } 375 + 376 + /// last assigned sequence number, or null if empty 377 + pub fn lastSeq(self: *DiskPersist) ?u64 { 378 + if (self.cur_seq <= 1) return null; 379 + return self.cur_seq - 1; 380 + } 381 + 382 + /// garbage-collect log files older than the retention period 383 + pub fn gc(self: *DiskPersist) !void { 384 + self.mutex.lock(); 385 + defer self.mutex.unlock(); 386 + 387 + const cutoff_hours = self.retention_hours; 388 + const cutoff_sql = try std.fmt.allocPrint(self.allocator, "-{d} hours", .{cutoff_hours}); 389 + defer self.allocator.free(cutoff_sql); 390 + 391 + // find expired refs 392 + var expired: std.ArrayListUnmanaged(GcRef) = .{}; 393 + defer { 394 + for (expired.items) |e| self.allocator.free(e.path); 395 + expired.deinit(self.allocator); 396 + } 397 + 398 + { 399 + var rows = try self.db.rows( 400 + "SELECT id, path FROM log_file_refs WHERE created_at < datetime('now', ?)", 401 + .{cutoff_sql}, 402 + ); 403 + defer rows.deinit(); 404 + while (rows.next()) |r| { 405 + try expired.append(self.allocator, .{ 406 + .id = r.int(0), 407 + .path = try self.allocator.dupe(u8, r.text(1)), 408 + }); 409 + } 410 + } 411 + 412 + for (expired.items) |ref| { 413 + // skip current file 414 + if (self.current_file_path) |cur| { 415 + if (std.mem.eql(u8, ref.path, cur)) continue; 416 + } 417 + 418 + // delete db record first (prevents playback from finding it) 419 + self.db.exec("DELETE FROM log_file_refs WHERE id = ?", .{ref.id}) catch |err| { 420 + log.warn("gc: failed to delete db record {d}: {s}", .{ ref.id, @errorName(err) }); 421 + continue; 422 + }; 423 + 424 + // delete file 425 + self.dir.deleteFile(ref.path) catch |err| { 426 + log.warn("gc: failed to delete {s}: {s}", .{ ref.path, @errorName(err) }); 427 + }; 428 + } 429 + 430 + if (expired.items.len > 0) { 431 + log.info("gc: removed {d} expired log files", .{expired.items.len}); 432 + } 433 + } 434 + 435 + /// take down all events for a user (set flag + zero payload) 436 + pub fn takeDownUser(self: *DiskPersist, uid: u64) !void { 437 + self.mutex.lock(); 438 + defer self.mutex.unlock(); 439 + 440 + // iterate all log files 441 + var refs: std.ArrayListUnmanaged([]const u8) = .{}; 442 + defer { 443 + for (refs.items) |p| self.allocator.free(p); 444 + refs.deinit(self.allocator); 445 + } 446 + 447 + { 448 + var rows = try self.db.rows("SELECT path FROM log_file_refs ORDER BY seq_start DESC", .{}); 449 + defer rows.deinit(); 450 + while (rows.next()) |r| { 451 + try refs.append(self.allocator, try self.allocator.dupe(u8, r.text(0))); 452 + } 453 + } 454 + 455 + for (refs.items) |path| { 456 + var file = self.dir.openFile(path, .{ .mode = .read_write }) catch continue; 457 + defer file.close(); 458 + mutateUserEventsInFile(file, uid) catch |err| { 459 + log.warn("takedown: failed to process {s}: {s}", .{ path, @errorName(err) }); 460 + }; 461 + } 462 + } 463 + 464 + // --- internals --- 465 + 466 + fn resumeLog(self: *DiskPersist) !void { 467 + // find most recent log file 468 + const r = try self.db.row("SELECT id, path, seq_start FROM log_file_refs ORDER BY seq_start DESC LIMIT 1", .{}); 469 + if (r) |row| { 470 + defer row.deinit(); 471 + const path = row.text(1); 472 + const seq_start: u64 = @intCast(row.int(2)); 473 + 474 + var file = self.dir.openFile(path, .{ .mode = .read_write }) catch { 475 + // file missing, start fresh 476 + try self.initLogFile(); 477 + return; 478 + }; 479 + 480 + // scan for last seq 481 + const last_seq = scanForLastSeq(file) catch { 482 + file.close(); 483 + try self.initLogFile(); 484 + return; 485 + }; 486 + 487 + if (last_seq) |ls| { 488 + self.cur_seq = ls + 1; 489 + log.info("recovered seq from disk: last_seq={d}", .{ls}); 490 + } else { 491 + self.cur_seq = if (seq_start > 0) seq_start else 1; 492 + } 493 + 494 + // seek to end for appending 495 + const stat = try file.stat(); 496 + try file.seekTo(stat.size); 497 + 498 + self.current_file = file; 499 + self.current_file_path = try self.allocator.dupe(u8, path); 500 + } else { 501 + try self.initLogFile(); 502 + } 503 + } 504 + 505 + fn initLogFile(self: *DiskPersist) !void { 506 + try self.createLogFile(self.cur_seq); 507 + } 508 + 509 + fn createLogFile(self: *DiskPersist, start_seq: u64) !void { 510 + var name_buf: [64]u8 = undefined; 511 + const name = std.fmt.bufPrint(&name_buf, "evts-{d}", .{start_seq}) catch unreachable; 512 + 513 + if (self.current_file) |f| f.close(); 514 + if (self.current_file_path) |p| self.allocator.free(p); 515 + 516 + self.current_file = try self.dir.createFile(name, .{ .truncate = false }); 517 + self.current_file_path = try self.allocator.dupe(u8, name); 518 + 519 + // register in SQLite 520 + try self.db.exec( 521 + "INSERT INTO log_file_refs (path, seq_start) VALUES (?, ?)", 522 + .{ name, @as(i64, @intCast(start_seq)) }, 523 + ); 524 + 525 + self.event_counter = 0; 526 + } 527 + 528 + fn flushLocked(self: *DiskPersist) !void { 529 + if (self.evtbuf.items.len == 0) return; 530 + 531 + // write buffered bytes to current file 532 + const file = self.current_file orelse return; 533 + file.writeAll(self.outbuf.items) catch |err| { 534 + log.err("flush: write failed: {s}", .{@errorName(err)}); 535 + return err; 536 + }; 537 + 538 + // clear buffers 539 + self.outbuf.clearRetainingCapacity(); 540 + 541 + // free job data 542 + for (self.evtbuf.items) |job| { 543 + self.allocator.free(job.data); 544 + } 545 + self.evtbuf.clearRetainingCapacity(); 546 + 547 + // check if we need to rotate 548 + if (self.event_counter >= self.events_per_file) { 549 + self.createLogFile(self.cur_seq) catch |err| { 550 + log.err("flush: log rotation failed: {s}", .{@errorName(err)}); 551 + }; 552 + } 553 + } 554 + 555 + fn flushLoop(self: *DiskPersist) void { 556 + while (self.alive.load(.acquire)) { 557 + // wait for flush interval or signal 558 + { 559 + self.mutex.lock(); 560 + defer self.mutex.unlock(); 561 + self.flush_cond.timedWait(&self.mutex, default_flush_interval_ms * std.time.ns_per_ms) catch {}; 562 + self.flushLocked() catch {}; 563 + } 564 + } 565 + } 566 + }; 567 + 568 + // --- playback types --- 569 + 570 + pub const PlaybackEntry = struct { 571 + seq: u64, 572 + kind: u32, 573 + uid: u64, 574 + data: []const u8, // payload only (owned by caller) 575 + }; 576 + 577 + const LogFileRef = struct { 578 + path: []const u8, 579 + seq_start: u64, 580 + }; 581 + 582 + const GcRef = struct { 583 + id: i64, 584 + path: []const u8, 585 + }; 586 + 587 + // --- file-level operations --- 588 + 589 + fn readEventsFrom(allocator: Allocator, file: std.fs.File, since: u64, result: *std.ArrayListUnmanaged(PlaybackEntry)) !void { 590 + const file_size = (try file.stat()).size; 591 + 592 + // if since > 0, scan to the right position 593 + if (since > 0) { 594 + seekToSeq(file, since, file_size) catch return; 595 + } 596 + 597 + // read events 598 + while (true) { 599 + var hdr_buf: [header_size]u8 = undefined; 600 + const n = file.readAll(&hdr_buf) catch break; 601 + if (n < header_size) break; 602 + 603 + const hdr = EvtHeader.decode(&hdr_buf); 604 + 605 + // skip taken down / rebased events 606 + if (hdr.flags & (EvtFlags.takedown | EvtFlags.rebased) != 0) { 607 + file.seekBy(@intCast(hdr.len)) catch break; 608 + continue; 609 + } 610 + 611 + if (hdr.seq <= since) { 612 + file.seekBy(@intCast(hdr.len)) catch break; 613 + continue; 614 + } 615 + 616 + // read payload 617 + const data = allocator.alloc(u8, hdr.len) catch break; 618 + const read_n = file.readAll(data) catch { 619 + allocator.free(data); 620 + break; 621 + }; 622 + if (read_n < hdr.len) { 623 + allocator.free(data); 624 + break; 625 + } 626 + 627 + result.append(allocator, .{ 628 + .seq = hdr.seq, 629 + .kind = hdr.kind, 630 + .uid = hdr.uid, 631 + .data = data, 632 + }) catch { 633 + allocator.free(data); 634 + break; 635 + }; 636 + } 637 + } 638 + 639 + /// scan file headers to seek to the first event with seq > target 640 + fn seekToSeq(file: std.fs.File, target: u64, file_size: u64) !void { 641 + try file.seekTo(0); 642 + var pos: u64 = 0; 643 + while (pos + header_size <= file_size) { 644 + var hdr_buf: [header_size]u8 = undefined; 645 + const n = try file.readAll(&hdr_buf); 646 + if (n < header_size) break; 647 + 648 + const hdr = EvtHeader.decode(&hdr_buf); 649 + if (hdr.seq > target) { 650 + // seek back to start of this header 651 + try file.seekTo(pos); 652 + return; 653 + } 654 + pos += header_size + hdr.len; 655 + try file.seekTo(pos); 656 + } 657 + } 658 + 659 + /// scan a file for the last sequence number 660 + fn scanForLastSeq(file: std.fs.File) !?u64 { 661 + try file.seekTo(0); 662 + const file_size = (try file.stat()).size; 663 + 664 + var last_seq: ?u64 = null; 665 + var pos: u64 = 0; 666 + while (pos + header_size <= file_size) { 667 + var hdr_buf: [header_size]u8 = undefined; 668 + try file.seekTo(pos); 669 + const n = try file.readAll(&hdr_buf); 670 + if (n < header_size) break; 671 + 672 + const hdr = EvtHeader.decode(&hdr_buf); 673 + last_seq = hdr.seq; 674 + pos += header_size + hdr.len; 675 + } 676 + return last_seq; 677 + } 678 + 679 + /// set takedown flag + zero payload for all events belonging to uid 680 + fn mutateUserEventsInFile(file: std.fs.File, uid: u64) !void { 681 + const file_size = (try file.stat()).size; 682 + var pos: u64 = 0; 683 + 684 + while (pos + header_size <= file_size) { 685 + var hdr_buf: [header_size]u8 = undefined; 686 + try file.seekTo(pos); 687 + const n = try file.readAll(&hdr_buf); 688 + if (n < header_size) break; 689 + 690 + const hdr = EvtHeader.decode(&hdr_buf); 691 + 692 + if (hdr.uid == uid and (hdr.flags & EvtFlags.takedown) == 0) { 693 + // set takedown flag 694 + const new_flags = hdr.flags | EvtFlags.takedown; 695 + var flags_buf: [4]u8 = undefined; 696 + std.mem.writeInt(u32, &flags_buf, new_flags, .little); 697 + try file.seekTo(pos); 698 + try file.writeAll(&flags_buf); 699 + 700 + // zero the payload 701 + const payload_start = pos + header_size; 702 + try file.seekTo(payload_start); 703 + var zeros: [4096]u8 = [_]u8{0} ** 4096; 704 + var remaining: u64 = hdr.len; 705 + while (remaining > 0) { 706 + const chunk = @min(remaining, zeros.len); 707 + try file.writeAll(zeros[0..chunk]); 708 + remaining -= chunk; 709 + } 710 + } 711 + 712 + pos += header_size + hdr.len; 713 + } 714 + } 715 + 716 + // === tests === 717 + 718 + test "header encode/decode roundtrip" { 719 + const hdr = EvtHeader{ 720 + .flags = 0, 721 + .kind = @intFromEnum(EvtKind.commit), 722 + .len = 1234, 723 + .uid = 42, 724 + .seq = 99999, 725 + }; 726 + var buf: [header_size]u8 = undefined; 727 + hdr.encode(&buf); 728 + const decoded = EvtHeader.decode(&buf); 729 + try std.testing.expectEqual(hdr.flags, decoded.flags); 730 + try std.testing.expectEqual(hdr.kind, decoded.kind); 731 + try std.testing.expectEqual(hdr.len, decoded.len); 732 + try std.testing.expectEqual(hdr.uid, decoded.uid); 733 + try std.testing.expectEqual(hdr.seq, decoded.seq); 734 + } 735 + 736 + test "header is little-endian" { 737 + const hdr = EvtHeader{ .flags = 0, .kind = 1, .len = 256, .uid = 0, .seq = 1 }; 738 + var buf: [header_size]u8 = undefined; 739 + hdr.encode(&buf); 740 + // len=256 in LE is 0x00 0x01 0x00 0x00 741 + try std.testing.expectEqual(@as(u8, 0x00), buf[8]); 742 + try std.testing.expectEqual(@as(u8, 0x01), buf[9]); 743 + } 744 + 745 + test "persist and playback" { 746 + var tmp = std.testing.tmpDir(.{}); 747 + defer tmp.cleanup(); 748 + 749 + const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 750 + defer std.testing.allocator.free(dir_path); 751 + 752 + const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 753 + defer std.testing.allocator.free(db_path); 754 + 755 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 756 + defer dp.deinit(); 757 + 758 + // persist some events (sync flush, no background thread) 759 + const seq1 = try dp.persist(.commit, 1, "payload-one"); 760 + const seq2 = try dp.persist(.identity, 2, "payload-two"); 761 + const seq3 = try dp.persist(.commit, 1, "payload-three"); 762 + 763 + try std.testing.expectEqual(@as(u64, 1), seq1); 764 + try std.testing.expectEqual(@as(u64, 2), seq2); 765 + try std.testing.expectEqual(@as(u64, 3), seq3); 766 + 767 + // flush manually 768 + { 769 + dp.mutex.lock(); 770 + defer dp.mutex.unlock(); 771 + try dp.flushLocked(); 772 + } 773 + 774 + // playback from cursor=0 → all events 775 + var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; 776 + defer { 777 + for (entries.items) |e| std.testing.allocator.free(e.data); 778 + entries.deinit(std.testing.allocator); 779 + } 780 + try dp.playback(0, std.testing.allocator, &entries); 781 + 782 + try std.testing.expectEqual(@as(usize, 3), entries.items.len); 783 + try std.testing.expectEqualStrings("payload-one", entries.items[0].data); 784 + try std.testing.expectEqual(@as(u64, 1), entries.items[0].seq); 785 + try std.testing.expectEqualStrings("payload-two", entries.items[1].data); 786 + try std.testing.expectEqualStrings("payload-three", entries.items[2].data); 787 + } 788 + 789 + test "playback with cursor" { 790 + var tmp = std.testing.tmpDir(.{}); 791 + defer tmp.cleanup(); 792 + 793 + const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 794 + defer std.testing.allocator.free(dir_path); 795 + 796 + const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 797 + defer std.testing.allocator.free(db_path); 798 + 799 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 800 + defer dp.deinit(); 801 + 802 + _ = try dp.persist(.commit, 1, "a"); 803 + _ = try dp.persist(.commit, 2, "b"); 804 + _ = try dp.persist(.commit, 3, "c"); 805 + { 806 + dp.mutex.lock(); 807 + defer dp.mutex.unlock(); 808 + try dp.flushLocked(); 809 + } 810 + 811 + // playback from cursor=2 → only seq 3 812 + var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; 813 + defer { 814 + for (entries.items) |e| std.testing.allocator.free(e.data); 815 + entries.deinit(std.testing.allocator); 816 + } 817 + try dp.playback(2, std.testing.allocator, &entries); 818 + 819 + try std.testing.expectEqual(@as(usize, 1), entries.items.len); 820 + try std.testing.expectEqual(@as(u64, 3), entries.items[0].seq); 821 + try std.testing.expectEqualStrings("c", entries.items[0].data); 822 + } 823 + 824 + test "seq recovery after reinit" { 825 + var tmp = std.testing.tmpDir(.{}); 826 + defer tmp.cleanup(); 827 + 828 + const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 829 + defer std.testing.allocator.free(dir_path); 830 + 831 + const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 832 + defer std.testing.allocator.free(db_path); 833 + 834 + // write some events 835 + { 836 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 837 + defer dp.deinit(); 838 + _ = try dp.persist(.commit, 1, "x"); 839 + _ = try dp.persist(.commit, 2, "y"); 840 + _ = try dp.persist(.account, 3, "z"); 841 + dp.mutex.lock(); 842 + defer dp.mutex.unlock(); 843 + try dp.flushLocked(); 844 + } 845 + 846 + // reinit — should recover seq 847 + { 848 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 849 + defer dp.deinit(); 850 + try std.testing.expectEqual(@as(u64, 3), dp.lastSeq().?); 851 + const seq4 = try dp.persist(.commit, 1, "w"); 852 + try std.testing.expectEqual(@as(u64, 4), seq4); 853 + } 854 + } 855 + 856 + test "takedown zeros payload" { 857 + var tmp = std.testing.tmpDir(.{}); 858 + defer tmp.cleanup(); 859 + 860 + const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 861 + defer std.testing.allocator.free(dir_path); 862 + 863 + const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 864 + defer std.testing.allocator.free(db_path); 865 + 866 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 867 + defer dp.deinit(); 868 + 869 + _ = try dp.persist(.commit, 42, "secret-data"); 870 + _ = try dp.persist(.commit, 99, "other-data"); 871 + { 872 + dp.mutex.lock(); 873 + defer dp.mutex.unlock(); 874 + try dp.flushLocked(); 875 + } 876 + 877 + // take down user 42 878 + try dp.takeDownUser(42); 879 + 880 + // playback should skip user 42's events 881 + var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; 882 + defer { 883 + for (entries.items) |e| std.testing.allocator.free(e.data); 884 + entries.deinit(std.testing.allocator); 885 + } 886 + try dp.playback(0, std.testing.allocator, &entries); 887 + 888 + try std.testing.expectEqual(@as(usize, 1), entries.items.len); 889 + try std.testing.expectEqualStrings("other-data", entries.items[0].data); 890 + } 891 + 892 + test "uidForDid assigns and caches UIDs" { 893 + var tmp = std.testing.tmpDir(.{}); 894 + defer tmp.cleanup(); 895 + 896 + const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 897 + defer std.testing.allocator.free(dir_path); 898 + 899 + const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 900 + defer std.testing.allocator.free(db_path); 901 + 902 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 903 + defer dp.deinit(); 904 + 905 + // first call creates the account 906 + const uid1 = try dp.uidForDid("did:plc:alice"); 907 + try std.testing.expect(uid1 > 0); 908 + 909 + // same DID returns same UID (from cache) 910 + const uid1_again = try dp.uidForDid("did:plc:alice"); 911 + try std.testing.expectEqual(uid1, uid1_again); 912 + 913 + // different DID gets a different UID 914 + const uid2 = try dp.uidForDid("did:plc:bob"); 915 + try std.testing.expect(uid2 > 0); 916 + try std.testing.expect(uid1 != uid2); 917 + } 918 + 919 + test "uidForDid survives reinit" { 920 + var tmp = std.testing.tmpDir(.{}); 921 + defer tmp.cleanup(); 922 + 923 + const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 924 + defer std.testing.allocator.free(dir_path); 925 + 926 + const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 927 + defer std.testing.allocator.free(db_path); 928 + 929 + var uid1: u64 = undefined; 930 + { 931 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 932 + defer dp.deinit(); 933 + uid1 = try dp.uidForDid("did:plc:carol"); 934 + } 935 + 936 + // reinit — UID should be the same from database 937 + { 938 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 939 + defer dp.deinit(); 940 + const uid1_again = try dp.uidForDid("did:plc:carol"); 941 + try std.testing.expectEqual(uid1, uid1_again); 942 + } 943 + } 944 + 945 + test "takedown with real UIDs" { 946 + var tmp = std.testing.tmpDir(.{}); 947 + defer tmp.cleanup(); 948 + 949 + const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 950 + defer std.testing.allocator.free(dir_path); 951 + 952 + const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 953 + defer std.testing.allocator.free(db_path); 954 + 955 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 956 + defer dp.deinit(); 957 + 958 + const alice_uid = try dp.uidForDid("did:plc:alice"); 959 + const bob_uid = try dp.uidForDid("did:plc:bob"); 960 + 961 + _ = try dp.persist(.commit, alice_uid, "alice-post"); 962 + _ = try dp.persist(.commit, bob_uid, "bob-post"); 963 + _ = try dp.persist(.commit, alice_uid, "alice-post-2"); 964 + { 965 + dp.mutex.lock(); 966 + defer dp.mutex.unlock(); 967 + try dp.flushLocked(); 968 + } 969 + 970 + // take down alice 971 + try dp.takeDownUser(alice_uid); 972 + 973 + // playback should only have bob's event 974 + var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; 975 + defer { 976 + for (entries.items) |e| std.testing.allocator.free(e.data); 977 + entries.deinit(std.testing.allocator); 978 + } 979 + try dp.playback(0, std.testing.allocator, &entries); 980 + 981 + try std.testing.expectEqual(@as(usize, 1), entries.items.len); 982 + try std.testing.expectEqualStrings("bob-post", entries.items[0].data); 983 + try std.testing.expectEqual(bob_uid, entries.items[0].uid); 984 + } 985 + 986 + fn tmpDirRealPath(allocator: Allocator, tmp: std.testing.TmpDir) ![]const u8 { 987 + var buf: [std.fs.max_path_bytes]u8 = undefined; 988 + const real = try tmp.dir.realpath(".", &buf); 989 + return try allocator.dupe(u8, real); 990 + }
+335
src/main.zig
··· 1 + //! zat relay — AT Protocol firehose relay server 2 + //! 3 + //! subscribes to an upstream firehose, validates frames via DID resolution 4 + //! and signature verification, persists to disk with relay-assigned seq 5 + //! numbers, and rebroadcasts to downstream consumers over WebSocket. 6 + //! 7 + //! endpoints: 8 + //! /xrpc/com.atproto.sync.subscribeRepos — firehose WebSocket (supports ?cursor=N) 9 + //! /_health — HTTP 200 JSON health check 10 + //! /_stats — HTTP 200 JSON with relay statistics 11 + 12 + const std = @import("std"); 13 + const websocket = @import("websocket"); 14 + const broadcaster = @import("broadcaster.zig"); 15 + const validator_mod = @import("validator.zig"); 16 + const subscriber_mod = @import("subscriber.zig"); 17 + const event_log_mod = @import("event_log.zig"); 18 + 19 + const log = std.log.scoped(.relay); 20 + 21 + var shutdown_flag: std.atomic.Value(bool) = .{ .raw = false }; 22 + 23 + /// HTTP server state — shared so main can close the socket to unblock accept() 24 + const HttpServer = struct { 25 + server: std.net.Server, 26 + stats: *broadcaster.Stats, 27 + persist: *event_log_mod.DiskPersist, 28 + sub: *subscriber_mod.Subscriber, 29 + 30 + fn run(self: *HttpServer) void { 31 + while (!shutdown_flag.load(.acquire)) { 32 + const conn = self.server.accept() catch |err| { 33 + if (shutdown_flag.load(.acquire)) return; 34 + log.debug("http accept error: {s}", .{@errorName(err)}); 35 + continue; 36 + }; 37 + handleHttpConn(conn.stream, self.stats, self.persist, self.sub); 38 + } 39 + } 40 + }; 41 + 42 + pub fn main() !void { 43 + var gpa: std.heap.GeneralPurposeAllocator(.{}) = .init; 44 + defer _ = gpa.deinit(); 45 + const allocator = gpa.allocator(); 46 + 47 + // parse config from env 48 + const port = parseEnvInt(u16, "RELAY_PORT", 3000); 49 + const http_port = parseEnvInt(u16, "RELAY_HTTP_PORT", 3001); 50 + const upstream = std.posix.getenv("RELAY_UPSTREAM") orelse "bsky.network"; 51 + const data_dir = std.posix.getenv("RELAY_DATA_DIR") orelse "data/events"; 52 + const retention_hours = parseEnvInt(u64, "RELAY_RETENTION_HOURS", 72); 53 + 54 + // install signal handlers (including SIGPIPE ignore) 55 + installSignalHandlers(); 56 + 57 + // init components 58 + var bc = broadcaster.Broadcaster.init(allocator); 59 + defer bc.deinit(); 60 + 61 + var val = validator_mod.Validator.init(allocator, &bc.stats); 62 + defer val.deinit(); 63 + try val.start(); 64 + 65 + // init disk persistence (indigo-compatible diskpersist format + SQLite index) 66 + const db_path = std.posix.getenv("RELAY_DB_PATH") orelse "data/relay.sqlite"; 67 + var dp = event_log_mod.DiskPersist.init(allocator, data_dir, db_path) catch |err| { 68 + log.err("failed to init disk persist at {s}: {s}", .{ data_dir, @errorName(err) }); 69 + return err; 70 + }; 71 + defer dp.deinit(); 72 + dp.retention_hours = retention_hours; 73 + 74 + if (dp.lastSeq()) |last| { 75 + log.info("event log recovered: last_seq={d}", .{last}); 76 + } 77 + 78 + // start flush thread 79 + try dp.start(); 80 + 81 + // wire persist into broadcaster for cursor replay 82 + bc.persist = &dp; 83 + 84 + var sub = subscriber_mod.Subscriber.init( 85 + allocator, 86 + &bc, 87 + &val, 88 + &dp, 89 + &shutdown_flag, 90 + .{ .upstream_host = upstream }, 91 + ); 92 + 93 + // start upstream subscriber thread 94 + const sub_thread = try std.Thread.spawn(.{}, subscriber_mod.Subscriber.run, .{&sub}); 95 + 96 + // start GC thread (runs every 10 minutes) 97 + const gc_thread = try std.Thread.spawn(.{}, gcLoop, .{&dp}); 98 + 99 + // start HTTP health/stats server 100 + const address = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, http_port); 101 + var http_srv = HttpServer{ 102 + .server = address.listen(.{ .reuse_address = true }) catch |err| { 103 + log.err("http server failed to listen on :{d}: {s}", .{ http_port, @errorName(err) }); 104 + return err; 105 + }, 106 + .stats = &bc.stats, 107 + .persist = &dp, 108 + .sub = &sub, 109 + }; 110 + const http_thread = try std.Thread.spawn(.{}, HttpServer.run, .{&http_srv}); 111 + 112 + // start downstream WebSocket server 113 + log.info("relay listening on :{d} (ws), :{d} (http)", .{ port, http_port }); 114 + log.info("upstream: {s}", .{upstream}); 115 + log.info("data dir: {s} (retention: {d}h)", .{ data_dir, retention_hours }); 116 + 117 + var server = try websocket.Server(broadcaster.Handler).init(allocator, .{ 118 + .port = port, 119 + .max_conn = 4096, 120 + .max_message_size = 5 * 1024 * 1024, 121 + }); 122 + defer server.deinit(); 123 + 124 + const server_thread = try server.listenInNewThread(&bc); 125 + 126 + // wait for shutdown signal 127 + while (!shutdown_flag.load(.acquire)) { 128 + std.posix.nanosleep(0, 100 * std.time.ns_per_ms); 129 + } 130 + 131 + log.info("shutdown signal received, stopping...", .{}); 132 + 133 + // stop WebSocket server (closes all downstream connections) 134 + server.stop(); 135 + server_thread.join(); 136 + 137 + // wait for subscriber to finish 138 + sub_thread.join(); 139 + 140 + // wait for GC thread 141 + gc_thread.join(); 142 + 143 + // close HTTP listener socket to unblock accept(), then join 144 + http_srv.server.stream.close(); 145 + http_thread.join(); 146 + 147 + log.info("relay stopped cleanly", .{}); 148 + } 149 + 150 + fn gcLoop(dp: *event_log_mod.DiskPersist) void { 151 + const gc_interval: u64 = 10 * 60; // 10 minutes in seconds 152 + while (!shutdown_flag.load(.acquire)) { 153 + // sleep in small increments to check shutdown 154 + var remaining: u64 = gc_interval; 155 + while (remaining > 0 and !shutdown_flag.load(.acquire)) { 156 + const chunk = @min(remaining, 1); 157 + std.posix.nanosleep(chunk, 0); 158 + remaining -= chunk; 159 + } 160 + if (shutdown_flag.load(.acquire)) return; 161 + 162 + dp.gc() catch |err| { 163 + log.warn("event log GC failed: {s}", .{@errorName(err)}); 164 + }; 165 + } 166 + } 167 + 168 + fn signalHandler(_: c_int) callconv(.c) void { 169 + shutdown_flag.store(true, .release); 170 + } 171 + 172 + fn installSignalHandlers() void { 173 + const act: std.posix.Sigaction = .{ 174 + .handler = .{ .handler = signalHandler }, 175 + .mask = 0, 176 + .flags = 0, 177 + }; 178 + std.posix.sigaction(std.posix.SIG.INT, &act, null); 179 + std.posix.sigaction(std.posix.SIG.TERM, &act, null); 180 + 181 + // ignore SIGPIPE — writing to disconnected consumers must not crash the process 182 + const ignore_act: std.posix.Sigaction = .{ 183 + .handler = .{ .handler = std.posix.SIG.IGN }, 184 + .mask = 0, 185 + .flags = 0, 186 + }; 187 + std.posix.sigaction(std.posix.SIG.PIPE, &ignore_act, null); 188 + } 189 + 190 + fn handleHttpConn(stream: std.net.Stream, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, sub: *subscriber_mod.Subscriber) void { 191 + defer stream.close(); 192 + 193 + // read request (headers + body for small POSTs) 194 + var buf: [8192]u8 = undefined; 195 + const n = stream.read(&buf) catch return; 196 + if (n == 0) return; 197 + const request = buf[0..n]; 198 + 199 + // parse first line: "METHOD /path HTTP/1.1" 200 + const line_end = std.mem.indexOfScalar(u8, request, '\n') orelse return; 201 + const first_line = request[0..line_end]; 202 + 203 + const method_end = std.mem.indexOfScalar(u8, first_line, ' ') orelse return; 204 + const method = first_line[0..method_end]; 205 + 206 + const path_start = method_end + 1; 207 + const rest = first_line[path_start..]; 208 + const path_end = std.mem.indexOfScalar(u8, rest, ' ') orelse rest.len; 209 + const path = rest[0..path_end]; 210 + 211 + // find body (after \r\n\r\n) 212 + const header_end = std.mem.indexOf(u8, request, "\r\n\r\n"); 213 + const body: []const u8 = if (header_end) |he| request[he + 4 ..] else ""; 214 + 215 + if (std.mem.eql(u8, method, "GET")) { 216 + handleGet(stream, path, stats); 217 + } else if (std.mem.eql(u8, method, "POST")) { 218 + handlePost(stream, path, request[0 .. header_end orelse n], body, persist, sub); 219 + } else { 220 + httpRespond(stream, "405 Method Not Allowed", "text/plain", "method not allowed"); 221 + } 222 + } 223 + 224 + fn handleGet(stream: std.net.Stream, path: []const u8, stats: *broadcaster.Stats) void { 225 + if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 226 + httpRespond(stream, "200 OK", "application/json", "{\"status\":\"ok\"}"); 227 + } else if (std.mem.eql(u8, path, "/_stats")) { 228 + var stats_buf: [4096]u8 = undefined; 229 + const response = broadcaster.formatStatsResponse(stats, &stats_buf); 230 + _ = stream.write(response) catch {}; 231 + } else if (std.mem.eql(u8, path, "/metrics")) { 232 + var metrics_buf: [4096]u8 = undefined; 233 + const body = broadcaster.formatPrometheusMetrics(stats, &metrics_buf); 234 + var resp_buf: [8192]u8 = undefined; 235 + const response = std.fmt.bufPrint(&resp_buf, "HTTP/1.1 200 OK\r\nContent-Type: text/plain; version=0.0.4; charset=utf-8\r\nContent-Length: {d}\r\nConnection: close\r\n\r\n{s}", .{ body.len, body }) catch return; 236 + _ = stream.write(response) catch {}; 237 + } else { 238 + httpRespond(stream, "404 Not Found", "text/plain", "not found"); 239 + } 240 + } 241 + 242 + fn handlePost(stream: std.net.Stream, path: []const u8, headers: []const u8, body: []const u8, persist: *event_log_mod.DiskPersist, sub: *subscriber_mod.Subscriber) void { 243 + if (std.mem.eql(u8, path, "/admin/repo/ban")) { 244 + handleBan(stream, headers, body, persist); 245 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { 246 + handleRequestCrawl(stream, body, sub); 247 + } else { 248 + httpRespond(stream, "404 Not Found", "text/plain", "not found"); 249 + } 250 + } 251 + 252 + fn handleBan(stream: std.net.Stream, headers: []const u8, body: []const u8, persist: *event_log_mod.DiskPersist) void { 253 + // check admin password 254 + const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { 255 + httpRespond(stream, "403 Forbidden", "application/json", "{\"error\":\"admin endpoint not configured\"}"); 256 + return; 257 + }; 258 + 259 + // extract Authorization: Bearer <token> 260 + const auth_value = findHeader(headers, "authorization") orelse { 261 + httpRespond(stream, "401 Unauthorized", "application/json", "{\"error\":\"missing authorization header\"}"); 262 + return; 263 + }; 264 + const bearer_prefix = "Bearer "; 265 + if (!std.mem.startsWith(u8, auth_value, bearer_prefix)) { 266 + httpRespond(stream, "401 Unauthorized", "application/json", "{\"error\":\"invalid authorization scheme\"}"); 267 + return; 268 + } 269 + const token = auth_value[bearer_prefix.len..]; 270 + if (!std.mem.eql(u8, token, admin_pw)) { 271 + httpRespond(stream, "401 Unauthorized", "application/json", "{\"error\":\"invalid token\"}"); 272 + return; 273 + } 274 + 275 + // parse JSON body for "did" field 276 + const parsed = std.json.parseFromSlice(struct { did: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 277 + httpRespond(stream, "400 Bad Request", "application/json", "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); 278 + return; 279 + }; 280 + defer parsed.deinit(); 281 + const did = parsed.value.did; 282 + 283 + // resolve DID → UID and take down 284 + const uid = persist.uidForDid(did) catch { 285 + httpRespond(stream, "500 Internal Server Error", "application/json", "{\"error\":\"failed to resolve DID\"}"); 286 + return; 287 + }; 288 + persist.takeDownUser(uid) catch { 289 + httpRespond(stream, "500 Internal Server Error", "application/json", "{\"error\":\"takedown failed\"}"); 290 + return; 291 + }; 292 + 293 + log.info("admin: banned {s} (uid={d})", .{ did, uid }); 294 + httpRespond(stream, "200 OK", "application/json", "{\"success\":true}"); 295 + } 296 + 297 + fn handleRequestCrawl(stream: std.net.Stream, body: []const u8, sub: *subscriber_mod.Subscriber) void { 298 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, sub.allocator, body, .{ .ignore_unknown_fields = true }) catch { 299 + httpRespond(stream, "400 Bad Request", "application/json", "{\"error\":\"invalid JSON, expected {\\\"hostname\\\":\\\"...\\\"}\"}"); 300 + return; 301 + }; 302 + defer parsed.deinit(); 303 + 304 + sub.addCrawlRequest(parsed.value.hostname) catch { 305 + httpRespond(stream, "500 Internal Server Error", "application/json", "{\"error\":\"failed to store crawl request\"}"); 306 + return; 307 + }; 308 + 309 + log.info("crawl requested: {s}", .{parsed.value.hostname}); 310 + httpRespond(stream, "200 OK", "application/json", "{\"success\":true}"); 311 + } 312 + 313 + fn findHeader(headers: []const u8, name: []const u8) ?[]const u8 { 314 + var iter = std.mem.splitScalar(u8, headers, '\n'); 315 + while (iter.next()) |line| { 316 + const trimmed = std.mem.trimRight(u8, line, "\r"); 317 + const colon = std.mem.indexOfScalar(u8, trimmed, ':') orelse continue; 318 + const key = std.mem.trim(u8, trimmed[0..colon], " "); 319 + if (std.ascii.eqlIgnoreCase(key, name)) { 320 + return std.mem.trim(u8, trimmed[colon + 1 ..], " "); 321 + } 322 + } 323 + return null; 324 + } 325 + 326 + fn httpRespond(stream: std.net.Stream, status: []const u8, content_type: []const u8, body: []const u8) void { 327 + var buf: [4096]u8 = undefined; 328 + const response = std.fmt.bufPrint(&buf, "HTTP/1.1 {s}\r\nContent-Type: {s}\r\nContent-Length: {d}\r\nConnection: close\r\n\r\n{s}", .{ status, content_type, body.len, body }) catch return; 329 + _ = stream.write(response) catch {}; 330 + } 331 + 332 + fn parseEnvInt(comptime T: type, key: []const u8, default: T) T { 333 + const val = std.posix.getenv(key) orelse return default; 334 + return std.fmt.parseInt(T, val, 10) catch default; 335 + }
+256
src/ring_buffer.zig
··· 1 + //! fixed-size ring buffer for firehose frames 2 + //! 3 + //! used for per-consumer send buffers (non-blocking broadcast) 4 + //! and global frame history (cursor replay). 5 + 6 + const std = @import("std"); 7 + const Allocator = std.mem.Allocator; 8 + 9 + pub const Frame = struct { 10 + seq: i64, 11 + data: []const u8, // owned by the ring buffer 12 + 13 + pub const empty: Frame = .{ .seq = 0, .data = &.{} }; 14 + }; 15 + 16 + /// thread-safe fixed-size ring buffer of frames. 17 + /// push overwrites oldest entries when full. 18 + /// data is duped into the buffer and freed on overwrite/deinit. 19 + pub fn RingBuffer(comptime capacity: usize) type { 20 + return struct { 21 + entries: [capacity]Frame = [_]Frame{Frame.empty} ** capacity, 22 + write_pos: usize = 0, // next write position 23 + read_pos: usize = 0, // next read position (for pop) 24 + len: usize = 0, 25 + allocator: Allocator, 26 + mutex: std.Thread.Mutex = .{}, 27 + 28 + const Self = @This(); 29 + 30 + pub fn init(allocator: Allocator) Self { 31 + return .{ .allocator = allocator }; 32 + } 33 + 34 + pub fn deinit(self: *Self) void { 35 + // free all live entries 36 + var i: usize = 0; 37 + while (i < self.len) : (i += 1) { 38 + const idx = (self.read_pos + i) % capacity; 39 + const entry = self.entries[idx]; 40 + if (entry.data.len > 0) { 41 + self.allocator.free(entry.data); 42 + } 43 + } 44 + self.* = undefined; 45 + } 46 + 47 + /// push a frame. if full, overwrites oldest. returns false if alloc failed. 48 + pub fn push(self: *Self, seq: i64, data: []const u8) bool { 49 + self.mutex.lock(); 50 + defer self.mutex.unlock(); 51 + return self.pushUnlocked(seq, data); 52 + } 53 + 54 + fn pushUnlocked(self: *Self, seq: i64, data: []const u8) bool { 55 + const duped = self.allocator.dupe(u8, data) catch return false; 56 + 57 + // free old entry if overwriting 58 + if (self.len == capacity) { 59 + const old = self.entries[self.write_pos]; 60 + if (old.data.len > 0) { 61 + self.allocator.free(old.data); 62 + } 63 + // advance read_pos since we're overwriting the oldest 64 + self.read_pos = (self.read_pos + 1) % capacity; 65 + } else { 66 + self.len += 1; 67 + } 68 + 69 + self.entries[self.write_pos] = .{ .seq = seq, .data = duped }; 70 + self.write_pos = (self.write_pos + 1) % capacity; 71 + return true; 72 + } 73 + 74 + /// pop the oldest frame. caller owns the returned data. 75 + pub fn pop(self: *Self) ?Frame { 76 + self.mutex.lock(); 77 + defer self.mutex.unlock(); 78 + return self.popUnlocked(); 79 + } 80 + 81 + fn popUnlocked(self: *Self) ?Frame { 82 + if (self.len == 0) return null; 83 + const frame = self.entries[self.read_pos]; 84 + self.entries[self.read_pos] = Frame.empty; 85 + self.read_pos = (self.read_pos + 1) % capacity; 86 + self.len -= 1; 87 + return frame; 88 + } 89 + 90 + /// number of frames currently buffered 91 + pub fn count(self: *Self) usize { 92 + self.mutex.lock(); 93 + defer self.mutex.unlock(); 94 + return self.len; 95 + } 96 + 97 + /// check if buffer is full 98 + pub fn isFull(self: *Self) bool { 99 + self.mutex.lock(); 100 + defer self.mutex.unlock(); 101 + return self.len == capacity; 102 + } 103 + 104 + /// get all frames with seq > cursor, ordered by seq. 105 + /// caller owns the returned slice AND frame data. 106 + pub fn framesSince(self: *Self, allocator: Allocator, cursor: i64) ![]const Frame { 107 + self.mutex.lock(); 108 + defer self.mutex.unlock(); 109 + 110 + var result: std.ArrayList(Frame) = .{}; 111 + errdefer { 112 + for (result.items) |f| allocator.free(f.data); 113 + result.deinit(allocator); 114 + } 115 + 116 + var i: usize = 0; 117 + while (i < self.len) : (i += 1) { 118 + const idx = (self.read_pos + i) % capacity; 119 + const entry = self.entries[idx]; 120 + if (entry.seq > cursor) { 121 + const duped = try allocator.dupe(u8, entry.data); 122 + try result.append(allocator, .{ .seq = entry.seq, .data = duped }); 123 + } 124 + } 125 + return try result.toOwnedSlice(allocator); 126 + } 127 + 128 + /// oldest seq in the buffer, or null if empty 129 + pub fn oldestSeq(self: *Self) ?i64 { 130 + self.mutex.lock(); 131 + defer self.mutex.unlock(); 132 + if (self.len == 0) return null; 133 + return self.entries[self.read_pos].seq; 134 + } 135 + 136 + /// newest seq in the buffer, or null if empty 137 + pub fn newestSeq(self: *Self) ?i64 { 138 + self.mutex.lock(); 139 + defer self.mutex.unlock(); 140 + if (self.len == 0) return null; 141 + const newest_idx = if (self.write_pos == 0) capacity - 1 else self.write_pos - 1; 142 + return self.entries[newest_idx].seq; 143 + } 144 + }; 145 + } 146 + 147 + // === tests === 148 + 149 + test "push and pop" { 150 + var buf = RingBuffer(4).init(std.testing.allocator); 151 + defer buf.deinit(); 152 + 153 + try std.testing.expect(buf.push(1, "hello")); 154 + try std.testing.expect(buf.push(2, "world")); 155 + try std.testing.expectEqual(@as(usize, 2), buf.count()); 156 + 157 + const f1 = buf.pop().?; 158 + defer std.testing.allocator.free(f1.data); 159 + try std.testing.expectEqual(@as(i64, 1), f1.seq); 160 + try std.testing.expectEqualStrings("hello", f1.data); 161 + 162 + const f2 = buf.pop().?; 163 + defer std.testing.allocator.free(f2.data); 164 + try std.testing.expectEqual(@as(i64, 2), f2.seq); 165 + 166 + try std.testing.expect(buf.pop() == null); 167 + } 168 + 169 + test "overwrite when full" { 170 + var buf = RingBuffer(3).init(std.testing.allocator); 171 + defer buf.deinit(); 172 + 173 + try std.testing.expect(buf.push(1, "a")); 174 + try std.testing.expect(buf.push(2, "b")); 175 + try std.testing.expect(buf.push(3, "c")); 176 + try std.testing.expectEqual(@as(usize, 3), buf.count()); 177 + 178 + // push overwrites oldest (seq=1) 179 + try std.testing.expect(buf.push(4, "d")); 180 + try std.testing.expectEqual(@as(usize, 3), buf.count()); 181 + 182 + const f1 = buf.pop().?; 183 + defer std.testing.allocator.free(f1.data); 184 + try std.testing.expectEqual(@as(i64, 2), f1.seq); 185 + } 186 + 187 + test "framesSince" { 188 + var buf = RingBuffer(8).init(std.testing.allocator); 189 + defer buf.deinit(); 190 + 191 + for (1..6) |i| { 192 + try std.testing.expect(buf.push(@intCast(i), "data")); 193 + } 194 + 195 + const frames = try buf.framesSince(std.testing.allocator, 3); 196 + defer { 197 + for (frames) |f| std.testing.allocator.free(f.data); 198 + std.testing.allocator.free(frames); 199 + } 200 + 201 + try std.testing.expectEqual(@as(usize, 2), frames.len); 202 + try std.testing.expectEqual(@as(i64, 4), frames[0].seq); 203 + try std.testing.expectEqual(@as(i64, 5), frames[1].seq); 204 + } 205 + 206 + test "oldestSeq and newestSeq" { 207 + var buf = RingBuffer(4).init(std.testing.allocator); 208 + defer buf.deinit(); 209 + 210 + try std.testing.expect(buf.oldestSeq() == null); 211 + try std.testing.expect(buf.newestSeq() == null); 212 + 213 + try std.testing.expect(buf.push(10, "x")); 214 + try std.testing.expect(buf.push(20, "y")); 215 + try std.testing.expect(buf.push(30, "z")); 216 + 217 + try std.testing.expectEqual(@as(i64, 10), buf.oldestSeq().?); 218 + try std.testing.expectEqual(@as(i64, 30), buf.newestSeq().?); 219 + } 220 + 221 + test "empty buffer operations" { 222 + var buf = RingBuffer(4).init(std.testing.allocator); 223 + defer buf.deinit(); 224 + 225 + try std.testing.expectEqual(@as(usize, 0), buf.count()); 226 + try std.testing.expect(!buf.isFull()); 227 + try std.testing.expect(buf.pop() == null); 228 + 229 + const frames = try buf.framesSince(std.testing.allocator, 0); 230 + defer std.testing.allocator.free(frames); 231 + try std.testing.expectEqual(@as(usize, 0), frames.len); 232 + } 233 + 234 + test "wrap-around with pop and push" { 235 + var buf = RingBuffer(3).init(std.testing.allocator); 236 + defer buf.deinit(); 237 + 238 + // fill 239 + try std.testing.expect(buf.push(1, "a")); 240 + try std.testing.expect(buf.push(2, "b")); 241 + try std.testing.expect(buf.push(3, "c")); 242 + 243 + // pop two 244 + const f1 = buf.pop().?; 245 + std.testing.allocator.free(f1.data); 246 + const f2 = buf.pop().?; 247 + std.testing.allocator.free(f2.data); 248 + 249 + // push two more (wraps around) 250 + try std.testing.expect(buf.push(4, "d")); 251 + try std.testing.expect(buf.push(5, "e")); 252 + 253 + try std.testing.expectEqual(@as(usize, 3), buf.count()); 254 + try std.testing.expectEqual(@as(i64, 3), buf.oldestSeq().?); 255 + try std.testing.expectEqual(@as(i64, 5), buf.newestSeq().?); 256 + }
+308
src/subscriber.zig
··· 1 + //! relay subscriber — connects to upstream firehose and feeds frames 2 + //! through validation into the broadcaster with relay-assigned seq numbers. 3 + //! 4 + //! decodes firehose frames using the zat SDK's CBOR codec, extracts typed 5 + //! event data, validates commit frames, and persists all events to disk 6 + //! with relay-assigned sequence numbers before broadcast. 7 + 8 + const std = @import("std"); 9 + const websocket = @import("websocket"); 10 + const zat = @import("zat"); 11 + const broadcaster = @import("broadcaster.zig"); 12 + const validator_mod = @import("validator.zig"); 13 + const event_log_mod = @import("event_log.zig"); 14 + 15 + const Allocator = std.mem.Allocator; 16 + const log = std.log.scoped(.relay); 17 + 18 + pub const Options = struct { 19 + upstream_host: []const u8 = "bsky.network", 20 + max_message_size: usize = 5 * 1024 * 1024, 21 + }; 22 + 23 + pub const Subscriber = struct { 24 + allocator: Allocator, 25 + options: Options, 26 + bc: *broadcaster.Broadcaster, 27 + validator: *validator_mod.Validator, 28 + persist: ?*event_log_mod.DiskPersist, 29 + shutdown: *std.atomic.Value(bool), 30 + last_upstream_seq: ?i64 = null, 31 + crawl_requests: std.ArrayListUnmanaged([]const u8) = .{}, 32 + crawl_mutex: std.Thread.Mutex = .{}, 33 + 34 + pub fn addCrawlRequest(self: *Subscriber, hostname: []const u8) !void { 35 + const duped = try self.allocator.dupe(u8, hostname); 36 + self.crawl_mutex.lock(); 37 + defer self.crawl_mutex.unlock(); 38 + try self.crawl_requests.append(self.allocator, duped); 39 + } 40 + 41 + pub fn init( 42 + allocator: Allocator, 43 + bc: *broadcaster.Broadcaster, 44 + val: *validator_mod.Validator, 45 + persist: ?*event_log_mod.DiskPersist, 46 + shutdown: *std.atomic.Value(bool), 47 + options: Options, 48 + ) Subscriber { 49 + return .{ 50 + .allocator = allocator, 51 + .options = options, 52 + .bc = bc, 53 + .validator = val, 54 + .persist = persist, 55 + .shutdown = shutdown, 56 + }; 57 + } 58 + 59 + /// run the subscriber loop. reconnects with exponential backoff. 60 + /// blocks until shutdown flag is set. 61 + pub fn run(self: *Subscriber) void { 62 + var backoff: u64 = 1; 63 + const max_backoff: u64 = 60; 64 + 65 + while (!self.shutdown.load(.acquire)) { 66 + log.info("connecting to upstream {s}...", .{self.options.upstream_host}); 67 + 68 + self.connectAndRead() catch |err| { 69 + if (self.shutdown.load(.acquire)) return; 70 + log.err("upstream error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 71 + }; 72 + 73 + if (self.shutdown.load(.acquire)) return; 74 + 75 + // backoff sleep in small increments so we can check shutdown 76 + var remaining: u64 = backoff; 77 + while (remaining > 0 and !self.shutdown.load(.acquire)) { 78 + const chunk = @min(remaining, 1); 79 + std.posix.nanosleep(chunk, 0); 80 + remaining -= chunk; 81 + } 82 + backoff = @min(backoff * 2, max_backoff); 83 + } 84 + } 85 + 86 + fn connectAndRead(self: *Subscriber) !void { 87 + var path_buf: [256]u8 = undefined; 88 + var w: std.Io.Writer = .fixed(&path_buf); 89 + 90 + try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos"); 91 + if (self.last_upstream_seq) |cursor| { 92 + try w.print("?cursor={d}", .{cursor}); 93 + } 94 + const path = w.buffered(); 95 + 96 + var client = try websocket.Client.init(self.allocator, .{ 97 + .host = self.options.upstream_host, 98 + .port = 443, 99 + .tls = true, 100 + .max_size = self.options.max_message_size, 101 + }); 102 + defer client.deinit(); 103 + 104 + var host_header_buf: [256]u8 = undefined; 105 + const host_header = std.fmt.bufPrint( 106 + &host_header_buf, 107 + "Host: {s}\r\n", 108 + .{self.options.upstream_host}, 109 + ) catch self.options.upstream_host; 110 + 111 + try client.handshake(path, .{ .headers = host_header }); 112 + log.info("connected to upstream firehose", .{}); 113 + 114 + // reset backoff on successful connect 115 + var handler = FrameHandler{ 116 + .subscriber = self, 117 + }; 118 + try client.readLoop(&handler); 119 + } 120 + }; 121 + 122 + const FrameHandler = struct { 123 + subscriber: *Subscriber, 124 + 125 + pub fn serverMessage(self: *FrameHandler, data: []const u8) !void { 126 + const sub = self.subscriber; 127 + 128 + // decode frame using SDK CBOR codec: [header map] [payload map] 129 + var arena = std.heap.ArenaAllocator.init(sub.allocator); 130 + defer arena.deinit(); 131 + const alloc = arena.allocator(); 132 + 133 + const header_result = zat.cbor.decode(alloc, data) catch |err| { 134 + log.debug("frame header decode failed: {s} (len={d})", .{ @errorName(err), data.len }); 135 + _ = sub.bc.stats.decode_errors.fetchAdd(1, .monotonic); 136 + return; 137 + }; 138 + const header = header_result.value; 139 + const payload_data = data[header_result.consumed..]; 140 + 141 + // check op field (1 = message, -1 = error) 142 + const op = header.getInt("op") orelse return; 143 + if (op == -1) return; // error frame from upstream, skip 144 + 145 + const frame_type = header.getString("t") orelse return; 146 + const payload = zat.cbor.decodeAll(alloc, payload_data) catch |err| { 147 + log.debug("frame payload decode failed: {s} (type={s})", .{ @errorName(err), frame_type }); 148 + _ = sub.bc.stats.decode_errors.fetchAdd(1, .monotonic); 149 + return; 150 + }; 151 + 152 + // extract seq for cursor tracking (all event types have seq) 153 + const upstream_seq = payload.getInt("seq"); 154 + if (upstream_seq) |s| { 155 + sub.last_upstream_seq = s; 156 + sub.bc.stats.seq.store(s, .release); 157 + } 158 + 159 + // route by frame type 160 + const is_commit = std.mem.eql(u8, frame_type, "#commit"); 161 + 162 + // extract DID: "repo" for commits, "did" for identity/account 163 + const did: ?[]const u8 = if (is_commit) 164 + payload.getString("repo") 165 + else 166 + payload.getString("did"); 167 + 168 + // validate commit frames using pre-decoded payload 169 + var commit_data_cid: ?[]const u8 = null; 170 + var commit_rev: ?[]const u8 = null; 171 + if (is_commit) { 172 + const result = sub.validator.validateCommit(payload); 173 + if (!result.valid) return; 174 + commit_data_cid = result.data_cid; 175 + commit_rev = result.commit_rev; 176 + } 177 + 178 + // determine event kind for persistence 179 + const kind: event_log_mod.EvtKind = if (is_commit) .commit else .identity; 180 + 181 + // resolve DID → numeric UID for event header 182 + const uid: u64 = if (sub.persist) |dp| blk: { 183 + break :blk if (did) |d| dp.uidForDid(d) catch 0 else 0; 184 + } else 0; 185 + 186 + // persist and get relay-assigned seq, broadcast raw bytes 187 + if (sub.persist) |dp| { 188 + const relay_seq = dp.persist(kind, uid, data) catch |err| { 189 + log.warn("persist failed: {s}", .{@errorName(err)}); 190 + sub.bc.broadcast(upstream_seq orelse 0, data); 191 + return; 192 + }; 193 + 194 + // update per-DID state after successful commit validation 195 + if (is_commit and uid > 0) { 196 + if (commit_data_cid) |data_cid| { 197 + if (commit_rev) |rev| { 198 + dp.updateAccountState(uid, rev, data_cid) catch |err| { 199 + log.debug("account state update failed: {s}", .{@errorName(err)}); 200 + }; 201 + } 202 + } 203 + } 204 + 205 + sub.bc.stats.relay_seq.store(relay_seq, .release); 206 + sub.bc.broadcast(@intCast(relay_seq), data); 207 + } else { 208 + sub.bc.broadcast(upstream_seq orelse 0, data); 209 + } 210 + } 211 + 212 + pub fn close(_: *FrameHandler) void { 213 + log.info("upstream connection closed", .{}); 214 + } 215 + }; 216 + 217 + // --- tests --- 218 + 219 + test "decode frame via SDK and extract fields" { 220 + const cbor = zat.cbor; 221 + 222 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 223 + defer arena.deinit(); 224 + const alloc = arena.allocator(); 225 + 226 + // build a commit frame using SDK encoder 227 + const header: cbor.Value = .{ .map = &.{ 228 + .{ .key = "op", .value = .{ .unsigned = 1 } }, 229 + .{ .key = "t", .value = .{ .text = "#commit" } }, 230 + } }; 231 + const payload: cbor.Value = .{ .map = &.{ 232 + .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 233 + .{ .key = "seq", .value = .{ .unsigned = 12345 } }, 234 + .{ .key = "rev", .value = .{ .text = "3k2abc000000" } }, 235 + .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, 236 + } }; 237 + 238 + const header_bytes = try cbor.encodeAlloc(alloc, header); 239 + const payload_bytes = try cbor.encodeAlloc(alloc, payload); 240 + 241 + var frame = try alloc.alloc(u8, header_bytes.len + payload_bytes.len); 242 + @memcpy(frame[0..header_bytes.len], header_bytes); 243 + @memcpy(frame[header_bytes.len..], payload_bytes); 244 + 245 + // decode using SDK (same path as FrameHandler.serverMessage) 246 + const h_result = try cbor.decode(alloc, frame); 247 + const h = h_result.value; 248 + const p_data = frame[h_result.consumed..]; 249 + const p = try cbor.decodeAll(alloc, p_data); 250 + 251 + try std.testing.expectEqualStrings("#commit", h.getString("t").?); 252 + try std.testing.expectEqual(@as(i64, 1), h.getInt("op").?); 253 + try std.testing.expectEqual(@as(i64, 12345), p.getInt("seq").?); 254 + try std.testing.expectEqualStrings("did:plc:test123", p.getString("repo").?); 255 + } 256 + 257 + test "decode identity frame via SDK" { 258 + const cbor = zat.cbor; 259 + 260 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 261 + defer arena.deinit(); 262 + const alloc = arena.allocator(); 263 + 264 + const header: cbor.Value = .{ .map = &.{ 265 + .{ .key = "op", .value = .{ .unsigned = 1 } }, 266 + .{ .key = "t", .value = .{ .text = "#identity" } }, 267 + } }; 268 + const payload: cbor.Value = .{ .map = &.{ 269 + .{ .key = "did", .value = .{ .text = "did:plc:alice" } }, 270 + .{ .key = "seq", .value = .{ .unsigned = 99 } }, 271 + .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, 272 + } }; 273 + 274 + const header_bytes = try cbor.encodeAlloc(alloc, header); 275 + const payload_bytes = try cbor.encodeAlloc(alloc, payload); 276 + 277 + var frame = try alloc.alloc(u8, header_bytes.len + payload_bytes.len); 278 + @memcpy(frame[0..header_bytes.len], header_bytes); 279 + @memcpy(frame[header_bytes.len..], payload_bytes); 280 + 281 + const h_result = try cbor.decode(alloc, frame); 282 + const h = h_result.value; 283 + const p_data = frame[h_result.consumed..]; 284 + const p = try cbor.decodeAll(alloc, p_data); 285 + 286 + try std.testing.expectEqualStrings("#identity", h.getString("t").?); 287 + try std.testing.expectEqualStrings("did:plc:alice", p.getString("did").?); 288 + try std.testing.expectEqual(@as(i64, 99), p.getInt("seq").?); 289 + } 290 + 291 + test "error frame (op=-1) is detected" { 292 + const cbor = zat.cbor; 293 + 294 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 295 + defer arena.deinit(); 296 + const alloc = arena.allocator(); 297 + 298 + const header: cbor.Value = .{ .map = &.{ 299 + .{ .key = "op", .value = .{ .negative = -1 } }, 300 + .{ .key = "t", .value = .{ .text = "#info" } }, 301 + } }; 302 + 303 + const header_bytes = try cbor.encodeAlloc(alloc, header); 304 + const h_result = try cbor.decode(alloc, header_bytes); 305 + const h = h_result.value; 306 + 307 + try std.testing.expectEqual(@as(i64, -1), h.getInt("op").?); 308 + }
+440
src/validator.zig
··· 1 + //! relay frame validator — DID key resolution + real signature verification 2 + //! 3 + //! validates firehose commit frames by verifying the commit signature against 4 + //! the pre-resolved signing key for the DID. accepts pre-decoded CBOR payload 5 + //! from the subscriber (decoded via zat SDK). on cache miss, skips validation 6 + //! and queues background resolution. no frame is ever blocked on network I/O. 7 + 8 + const std = @import("std"); 9 + const zat = @import("zat"); 10 + const broadcaster = @import("broadcaster.zig"); 11 + 12 + const Allocator = std.mem.Allocator; 13 + const log = std.log.scoped(.relay); 14 + 15 + /// decoded and cached signing key for a DID 16 + const CachedKey = struct { 17 + key_type: zat.multicodec.KeyType, 18 + raw: [33]u8, // compressed public key (secp256k1 or p256) 19 + len: u8, 20 + resolve_time: i64 = 0, // epoch seconds when resolved 21 + }; 22 + 23 + pub const ValidationResult = struct { 24 + valid: bool, 25 + skipped: bool, 26 + data_cid: ?[]const u8 = null, // MST root CID from verified commit 27 + commit_rev: ?[]const u8 = null, // rev from verified commit 28 + }; 29 + 30 + /// configuration for commit validation checks 31 + pub const ValidatorConfig = struct { 32 + /// verify MST structure during signature verification 33 + verify_mst: bool = false, // off by default for relay throughput 34 + /// verify commit diffs via MST inversion (sync 1.1) 35 + verify_commit_diff: bool = false, 36 + /// max allowed operations per commit 37 + max_ops: usize = 200, 38 + /// max clock skew for rev timestamps (seconds) 39 + rev_clock_skew: i64 = 300, // 5 minutes 40 + }; 41 + 42 + pub const Validator = struct { 43 + allocator: Allocator, 44 + stats: *broadcaster.Stats, 45 + config: ValidatorConfig, 46 + // DID → signing key cache (decoded, ready for verification) 47 + cache: std.StringHashMapUnmanaged(CachedKey) = .{}, 48 + cache_mutex: std.Thread.Mutex = .{}, 49 + // background resolve queue 50 + queue: std.ArrayListUnmanaged([]const u8) = .{}, 51 + queue_mutex: std.Thread.Mutex = .{}, 52 + queue_cond: std.Thread.Condition = .{}, 53 + resolver_thread: ?std.Thread = null, 54 + alive: std.atomic.Value(bool) = .{ .raw = true }, 55 + 56 + pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { 57 + return initWithConfig(allocator, stats, .{}); 58 + } 59 + 60 + pub fn initWithConfig(allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig) Validator { 61 + return .{ 62 + .allocator = allocator, 63 + .stats = stats, 64 + .config = config, 65 + }; 66 + } 67 + 68 + pub fn deinit(self: *Validator) void { 69 + self.alive.store(false, .release); 70 + self.queue_cond.signal(); 71 + if (self.resolver_thread) |t| t.join(); 72 + 73 + // free cache keys (CachedKey is inline, no separate free needed) 74 + var cache_it = self.cache.iterator(); 75 + while (cache_it.next()) |entry| { 76 + self.allocator.free(entry.key_ptr.*); 77 + } 78 + self.cache.deinit(self.allocator); 79 + 80 + // free queued DIDs 81 + for (self.queue.items) |did| { 82 + self.allocator.free(did); 83 + } 84 + self.queue.deinit(self.allocator); 85 + } 86 + 87 + /// start the background resolver thread 88 + pub fn start(self: *Validator) !void { 89 + self.resolver_thread = try std.Thread.spawn(.{}, resolveLoop, .{self}); 90 + } 91 + 92 + /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder). 93 + /// on cache miss, queues background resolution and skips. 94 + pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult { 95 + // extract DID from decoded payload 96 + const did = payload.getString("repo") orelse { 97 + _ = self.stats.skipped.fetchAdd(1, .monotonic); 98 + return .{ .valid = true, .skipped = true }; 99 + }; 100 + 101 + // check cache for pre-resolved signing key 102 + const cached_key: ?CachedKey = blk: { 103 + self.cache_mutex.lock(); 104 + defer self.cache_mutex.unlock(); 105 + break :blk self.cache.get(did); 106 + }; 107 + 108 + if (cached_key == null) { 109 + // cache miss — queue for background resolution, skip validation 110 + _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 111 + _ = self.stats.skipped.fetchAdd(1, .monotonic); 112 + self.queueResolve(did); 113 + return .{ .valid = true, .skipped = true }; 114 + } 115 + 116 + _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 117 + 118 + // cache hit — do structure checks + signature verification 119 + if (self.verifyCommit(payload, did, cached_key.?)) |vr| { 120 + _ = self.stats.validated.fetchAdd(1, .monotonic); 121 + return vr; 122 + } else |err| { 123 + log.debug("commit verification failed for {s}: {s}", .{ did, @errorName(err) }); 124 + _ = self.stats.failed.fetchAdd(1, .monotonic); 125 + return .{ .valid = false, .skipped = false }; 126 + } 127 + } 128 + 129 + fn verifyCommit(self: *Validator, payload: zat.cbor.Value, expected_did: []const u8, cached_key: CachedKey) !ValidationResult { 130 + // commit structure checks first (cheap, no allocation) 131 + try self.checkCommitStructure(payload); 132 + 133 + // extract blocks (raw CAR bytes) from the pre-decoded payload 134 + const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame; 135 + 136 + // blocks size check 137 + if (blocks.len > 2 * 1024 * 1024) return error.InvalidFrame; 138 + 139 + // build public key for verification 140 + const public_key = zat.multicodec.PublicKey{ 141 + .key_type = cached_key.key_type, 142 + .raw = cached_key.raw[0..cached_key.len], 143 + }; 144 + 145 + // run real signature verification (needs its own arena for CAR/MST temporaries) 146 + var arena = std.heap.ArenaAllocator.init(self.allocator); 147 + defer arena.deinit(); 148 + const alloc = arena.allocator(); 149 + 150 + // try sync 1.1 path: extract ops and use verifyCommitDiff 151 + if (self.config.verify_commit_diff) { 152 + if (self.extractOps(alloc, payload)) |msg_ops| { 153 + // get stored prev_data from payload 154 + const prev_data: ?[]const u8 = if (payload.get("prevData")) |pd| switch (pd) { 155 + .cid => |c| c.raw, 156 + .null => null, 157 + else => null, 158 + } else null; 159 + 160 + const diff_result = zat.verifyCommitDiff(alloc, blocks, msg_ops, prev_data, public_key, .{ 161 + .expected_did = expected_did, 162 + .skip_inversion = prev_data == null, 163 + }) catch |err| { 164 + return err; 165 + }; 166 + 167 + return .{ 168 + .valid = true, 169 + .skipped = false, 170 + .data_cid = diff_result.data_cid, 171 + .commit_rev = diff_result.commit_rev, 172 + }; 173 + } 174 + } 175 + 176 + // fallback: legacy verification (signature + optional MST walk) 177 + const result = zat.verifyCommitCar(alloc, blocks, public_key, .{ 178 + .verify_mst = self.config.verify_mst, 179 + .expected_did = expected_did, 180 + }) catch |err| { 181 + return err; 182 + }; 183 + 184 + return .{ 185 + .valid = true, 186 + .skipped = false, 187 + .data_cid = null, 188 + .commit_rev = result.commit_rev, 189 + }; 190 + } 191 + 192 + /// extract ops from payload and convert to mst.Operation array 193 + fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation { 194 + _ = self; 195 + const ops_array = payload.getArray("ops") orelse return null; 196 + var ops: std.ArrayListUnmanaged(zat.MstOperation) = .{}; 197 + for (ops_array) |op| { 198 + const action = op.getString("action") orelse continue; 199 + const collection = op.getString("collection") orelse continue; 200 + const rkey = op.getString("rkey") orelse continue; 201 + 202 + // build path: "collection/rkey" 203 + const path = std.fmt.allocPrint(alloc, "{s}/{s}", .{ collection, rkey }) catch return null; 204 + 205 + // extract CID values 206 + const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) { 207 + .cid => |c| c.raw, 208 + else => null, 209 + } else null; 210 + 211 + var value: ?[]const u8 = null; 212 + var prev: ?[]const u8 = null; 213 + 214 + if (std.mem.eql(u8, action, "create")) { 215 + value = cid_value; 216 + } else if (std.mem.eql(u8, action, "update")) { 217 + value = cid_value; 218 + // prev is extracted from the MST during inversion, not from payload 219 + // for update ops, we need both value and prev — prev comes from prevData chain 220 + prev = if (op.get("prev")) |v| switch (v) { 221 + .cid => |c| c.raw, 222 + else => null, 223 + } else null; 224 + } else if (std.mem.eql(u8, action, "delete")) { 225 + prev = if (op.get("prev")) |v| switch (v) { 226 + .cid => |c| c.raw, 227 + else => null, 228 + } else null; 229 + } else continue; 230 + 231 + ops.append(alloc, .{ 232 + .path = path, 233 + .value = value, 234 + .prev = prev, 235 + }) catch return null; 236 + } 237 + 238 + if (ops.items.len == 0) return null; 239 + return ops.items; 240 + } 241 + 242 + fn checkCommitStructure(self: *Validator, payload: zat.cbor.Value) !void { 243 + // check repo field is a valid DID 244 + const repo = payload.getString("repo") orelse return error.InvalidFrame; 245 + if (zat.Did.parse(repo) == null) return error.InvalidFrame; 246 + 247 + // check rev is a valid TID 248 + if (payload.getString("rev")) |rev| { 249 + if (zat.Tid.parse(rev) == null) return error.InvalidFrame; 250 + } 251 + 252 + // check ops count 253 + if (payload.get("ops")) |ops_value| { 254 + switch (ops_value) { 255 + .array => |ops| { 256 + if (ops.len > self.config.max_ops) return error.InvalidFrame; 257 + // validate each op has valid collection/rkey 258 + for (ops) |op| { 259 + if (op.getString("collection")) |coll| { 260 + if (zat.Nsid.parse(coll) == null) return error.InvalidFrame; 261 + } 262 + if (op.getString("rkey")) |rk| { 263 + if (zat.Rkey.parse(rk) == null) return error.InvalidFrame; 264 + } 265 + } 266 + }, 267 + else => return error.InvalidFrame, 268 + } 269 + } 270 + } 271 + 272 + fn queueResolve(self: *Validator, did: []const u8) void { 273 + // check if already cached (race between validate and resolver) 274 + { 275 + self.cache_mutex.lock(); 276 + defer self.cache_mutex.unlock(); 277 + if (self.cache.contains(did)) return; 278 + } 279 + 280 + const duped = self.allocator.dupe(u8, did) catch return; 281 + 282 + self.queue_mutex.lock(); 283 + defer self.queue_mutex.unlock(); 284 + self.queue.append(self.allocator, duped) catch { 285 + self.allocator.free(duped); 286 + return; 287 + }; 288 + self.queue_cond.signal(); 289 + } 290 + 291 + fn resolveLoop(self: *Validator) void { 292 + var resolver = zat.DidResolver.init(self.allocator); 293 + defer resolver.deinit(); 294 + 295 + while (self.alive.load(.acquire)) { 296 + var did: ?[]const u8 = null; 297 + { 298 + self.queue_mutex.lock(); 299 + defer self.queue_mutex.unlock(); 300 + while (self.queue.items.len == 0 and self.alive.load(.acquire)) { 301 + self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {}; 302 + } 303 + if (self.queue.items.len > 0) { 304 + did = self.queue.orderedRemove(0); 305 + } 306 + } 307 + 308 + if (did) |d| { 309 + defer self.allocator.free(d); 310 + 311 + // skip if already cached (resolved while queued) 312 + { 313 + self.cache_mutex.lock(); 314 + defer self.cache_mutex.unlock(); 315 + if (self.cache.contains(d)) continue; 316 + } 317 + 318 + // resolve DID → signing key 319 + const parsed = zat.Did.parse(d) orelse continue; 320 + var doc = resolver.resolve(parsed) catch |err| { 321 + log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) }); 322 + continue; 323 + }; 324 + defer doc.deinit(); 325 + 326 + // extract and decode signing key 327 + const vm = doc.signingKey() orelse continue; 328 + const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue; 329 + defer self.allocator.free(key_bytes); 330 + const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue; 331 + 332 + // store decoded key in cache (fixed-size, no pointer chasing) 333 + var cached = CachedKey{ 334 + .key_type = public_key.key_type, 335 + .raw = undefined, 336 + .len = @intCast(public_key.raw.len), 337 + .resolve_time = std.time.timestamp(), 338 + }; 339 + @memcpy(cached.raw[0..public_key.raw.len], public_key.raw); 340 + 341 + const did_duped = self.allocator.dupe(u8, d) catch continue; 342 + 343 + self.cache_mutex.lock(); 344 + defer self.cache_mutex.unlock(); 345 + self.cache.put(self.allocator, did_duped, cached) catch { 346 + self.allocator.free(did_duped); 347 + }; 348 + } 349 + } 350 + } 351 + 352 + /// cache size (for diagnostics) 353 + pub fn cacheSize(self: *Validator) usize { 354 + self.cache_mutex.lock(); 355 + defer self.cache_mutex.unlock(); 356 + return self.cache.count(); 357 + } 358 + }; 359 + 360 + // --- tests --- 361 + 362 + test "validateCommit skips on cache miss" { 363 + var stats = broadcaster.Stats{}; 364 + var v = Validator.init(std.testing.allocator, &stats); 365 + defer v.deinit(); 366 + 367 + // build a commit payload using SDK 368 + const payload: zat.cbor.Value = .{ .map = &.{ 369 + .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 370 + .{ .key = "seq", .value = .{ .unsigned = 42 } }, 371 + .{ .key = "rev", .value = .{ .text = "3k2abc000000" } }, 372 + .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, 373 + } }; 374 + 375 + const result = v.validateCommit(payload); 376 + try std.testing.expect(result.valid); 377 + try std.testing.expect(result.skipped); 378 + try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 379 + } 380 + 381 + test "validateCommit skips when no repo field" { 382 + var stats = broadcaster.Stats{}; 383 + var v = Validator.init(std.testing.allocator, &stats); 384 + defer v.deinit(); 385 + 386 + // payload without "repo" field 387 + const payload: zat.cbor.Value = .{ .map = &.{ 388 + .{ .key = "seq", .value = .{ .unsigned = 42 } }, 389 + } }; 390 + 391 + const result = v.validateCommit(payload); 392 + try std.testing.expect(result.valid); 393 + try std.testing.expect(result.skipped); 394 + try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 395 + } 396 + 397 + test "checkCommitStructure rejects invalid DID" { 398 + var stats = broadcaster.Stats{}; 399 + var v = Validator.init(std.testing.allocator, &stats); 400 + defer v.deinit(); 401 + 402 + const payload: zat.cbor.Value = .{ .map = &.{ 403 + .{ .key = "repo", .value = .{ .text = "not-a-did" } }, 404 + } }; 405 + 406 + try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 407 + } 408 + 409 + test "checkCommitStructure accepts valid commit" { 410 + var stats = broadcaster.Stats{}; 411 + var v = Validator.init(std.testing.allocator, &stats); 412 + defer v.deinit(); 413 + 414 + const payload: zat.cbor.Value = .{ .map = &.{ 415 + .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 416 + .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 417 + } }; 418 + 419 + try v.checkCommitStructure(payload); 420 + } 421 + 422 + test "checkCommitStructure rejects too many ops" { 423 + var stats = broadcaster.Stats{}; 424 + var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .max_ops = 2 }); 425 + defer v.deinit(); 426 + 427 + // build ops array with 3 items (over limit of 2) 428 + const ops = [_]zat.cbor.Value{ 429 + .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 430 + .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 431 + .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 432 + }; 433 + 434 + const payload: zat.cbor.Value = .{ .map = &.{ 435 + .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 436 + .{ .key = "ops", .value = .{ .array = &ops } }, 437 + } }; 438 + 439 + try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 440 + }