atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

collectiondir: standalone collection directory service

extract collection directory into a second binary sharing the same build
tree. subscribes to relay firehose, maintains its own RocksDB index, and
serves listReposByCollection on port 2510.

- build.zig: add collectiondir executable target
- Dockerfile.runtime: copy collectiondir binary
- collection_index: add default CF handle + getMeta/putMeta/deleteMeta
- collectiondir.zig: HTTP server, health checks, admin endpoints
- collectiondir_subscriber.zig: firehose consumer with cursor in RocksDB
- collectiondir_backfill.zig: RocksDB-backed progress, no postgres

all state in RocksDB — no shared postgres dependency.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz cbe6907c 1639565a

+1214
+1
Dockerfile.runtime
··· 1 1 FROM debian:bookworm-slim 2 2 RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/* 3 3 COPY zig-out/bin/zlay /usr/local/bin/zlay 4 + COPY zig-out/bin/collectiondir /usr/local/bin/collectiondir 4 5 RUN mkdir -p /data/events /data/collection-index 5 6 ENV RELAY_DATA_DIR=/data/events 6 7 EXPOSE 3000 3001
+21
build.zig
··· 64 64 const run_step = b.step("run", "run the relay"); 65 65 run_step.dependOn(&run_relay.step); 66 66 67 + // collectiondir executable (shadow collection directory service) 68 + const collectiondir_mod = b.createModule(.{ 69 + .root_source_file = b.path("src/collectiondir.zig"), 70 + .target = target, 71 + .optimize = optimize, 72 + .imports = imports, 73 + }); 74 + collectiondir_mod.addImport("build_options", build_options.createModule()); 75 + const collectiondir = b.addExecutable(.{ 76 + .name = "collectiondir", 77 + .root_module = collectiondir_mod, 78 + }); 79 + collectiondir.linkLibC(); 80 + collectiondir.linkLibCpp(); 81 + b.installArtifact(collectiondir); 82 + 83 + const run_collectiondir = b.addRunArtifact(collectiondir); 84 + if (b.args) |args| run_collectiondir.addArgs(args); 85 + const run_collectiondir_step = b.step("run-collectiondir", "run the collection directory service"); 86 + run_collectiondir_step.dependOn(&run_collectiondir.step); 87 + 67 88 // tests 68 89 const test_step = b.step("test", "run unit tests"); 69 90 const test_files = .{
+48
src/collection_index.zig
··· 19 19 20 20 pub const CollectionIndex = struct { 21 21 db: rocksdb.DB, 22 + default: rocksdb.ColumnFamilyHandle, 22 23 rbc: rocksdb.ColumnFamilyHandle, 23 24 cbr: rocksdb.ColumnFamilyHandle, 24 25 allocator: Allocator, ··· 50 51 defer allocator.free(families); 51 52 52 53 // find column family handles by name 54 + var default: ?rocksdb.ColumnFamilyHandle = null; 53 55 var rbc: ?rocksdb.ColumnFamilyHandle = null; 54 56 var cbr: ?rocksdb.ColumnFamilyHandle = null; 55 57 for (families) |cf| { 58 + if (std.mem.eql(u8, cf.name, "default")) default = cf.handle; 56 59 if (std.mem.eql(u8, cf.name, "rbc")) rbc = cf.handle; 57 60 if (std.mem.eql(u8, cf.name, "cbr")) cbr = cf.handle; 58 61 } ··· 61 64 62 65 return .{ 63 66 .db = db, 67 + .default = default orelse return error.MissingColumnFamily, 64 68 .rbc = rbc orelse return error.MissingColumnFamily, 65 69 .cbr = cbr orelse return error.MissingColumnFamily, 66 70 .allocator = allocator, 71 + }; 72 + } 73 + 74 + // --- metadata helpers (default CF) --- 75 + 76 + /// read a metadata value from the default column family. 77 + /// caller must call .deinit() on the returned Data to free rocksdb memory. 78 + pub fn getMeta(self: *CollectionIndex, key: []const u8) ?rocksdb.Data { 79 + var err_str: ?rocksdb.Data = null; 80 + const val = self.db.get(self.default, key, &err_str) catch { 81 + if (err_str) |e| { 82 + log.debug("getMeta({s}) failed: {s}", .{ key, e.data }); 83 + e.deinit(); 84 + } 85 + return null; 86 + }; 87 + return val; 88 + } 89 + 90 + /// write a metadata value to the default column family 91 + pub fn putMeta(self: *CollectionIndex, key: []const u8, value: []const u8) void { 92 + var err_str: ?rocksdb.Data = null; 93 + var batch = rocksdb.WriteBatch.init(); 94 + defer batch.deinit(); 95 + batch.put(self.default, key, value); 96 + self.db.write(batch, &err_str) catch { 97 + if (err_str) |e| { 98 + log.warn("putMeta({s}) failed: {s}", .{ key, e.data }); 99 + e.deinit(); 100 + } 101 + }; 102 + } 103 + 104 + /// delete a metadata key from the default column family 105 + pub fn deleteMeta(self: *CollectionIndex, key: []const u8) void { 106 + var err_str: ?rocksdb.Data = null; 107 + var batch = rocksdb.WriteBatch.init(); 108 + defer batch.deinit(); 109 + batch.delete(self.default, key); 110 + self.db.write(batch, &err_str) catch { 111 + if (err_str) |e| { 112 + log.warn("deleteMeta({s}) failed: {s}", .{ key, e.data }); 113 + e.deinit(); 114 + } 67 115 }; 68 116 } 69 117
+407
src/collectiondir.zig
··· 1 + //! collectiondir — standalone collection directory service 2 + //! 3 + //! subscribes to the relay's firehose and maintains an independent RocksDB 4 + //! collection index. no postgres — all state (cursor, backfill progress, 5 + //! collection index) lives in RocksDB. 6 + //! 7 + //! reuses collection_index.zig (unchanged) and resync.zig (unchanged, stage 2). 8 + //! backfill uses collectiondir_backfill.zig (RocksDB-backed progress). 9 + //! 10 + //! LISTEN_PORT (default 2510): HTTP API 11 + //! GET /xrpc/com.atproto.sync.listReposByCollection 12 + //! GET /_health — readiness (subscriber connected + cursor fresh) 13 + //! GET /_healthz — liveness 14 + //! GET /metrics 15 + //! GET /admin/backfill-collections — backfill status 16 + //! POST /admin/backfill-collections — trigger backfill 17 + //! 18 + //! env: 19 + //! RELAY_URL — relay firehose host (e.g. "zlay" for in-cluster) 20 + //! RELAY_PORT — relay firehose port (default 3000) 21 + //! RELAY_TLS — "true" for wss:// (default "false") 22 + //! COLLECTION_INDEX_DIR — RocksDB data directory 23 + //! LISTEN_PORT — HTTP listen port (default 2510) 24 + //! RELAY_ADMIN_PASSWORD — admin auth token 25 + 26 + const std = @import("std"); 27 + const http = std.http; 28 + const collection_index_mod = @import("collection_index.zig"); 29 + const backfill_mod = @import("collectiondir_backfill.zig"); 30 + const collectiondir_sub = @import("collectiondir_subscriber.zig"); 31 + 32 + const log = std.log.scoped(.collectiondir); 33 + 34 + /// 8 MiB stacks — matches relay's default_stack_size for ReleaseSafe 35 + pub const default_stack_size = 8 * 1024 * 1024; 36 + 37 + var shutdown_flag: std.atomic.Value(bool) = .{ .raw = false }; 38 + 39 + pub fn main() !void { 40 + const allocator = std.heap.c_allocator; 41 + 42 + // parse config from env 43 + const listen_port = parseEnvInt(u16, "LISTEN_PORT", 2510); 44 + const relay_url = std.posix.getenv("RELAY_URL") orelse "zlay"; 45 + const relay_port = parseEnvInt(u16, "RELAY_PORT", 3000); 46 + const use_tls = if (std.posix.getenv("RELAY_TLS")) |v| std.mem.eql(u8, v, "true") else false; 47 + const ci_dir = std.posix.getenv("COLLECTION_INDEX_DIR") orelse "data/collection-index"; 48 + 49 + // install signal handlers 50 + installSignalHandlers(); 51 + 52 + // init RocksDB collection index (all state lives here) 53 + var ci = collection_index_mod.CollectionIndex.open(allocator, ci_dir) catch |err| { 54 + log.err("failed to open collection index at {s}: {s}", .{ ci_dir, @errorName(err) }); 55 + return err; 56 + }; 57 + defer ci.deinit(); 58 + 59 + // init backfiller (RocksDB-backed progress) 60 + var backfiller = backfill_mod.Backfiller.init(allocator, &ci); 61 + 62 + // init firehose subscriber (cursor in RocksDB) 63 + var subscriber = collectiondir_sub.Subscriber.init( 64 + allocator, 65 + relay_url, 66 + relay_port, 67 + use_tls, 68 + &ci, 69 + &shutdown_flag, 70 + ); 71 + 72 + // start subscriber thread 73 + const sub_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, collectiondir_sub.Subscriber.run, .{&subscriber}); 74 + 75 + // start HTTP server 76 + log.info("collectiondir listening on :{d}", .{listen_port}); 77 + log.info("relay upstream: {s}:{d} (tls={s})", .{ relay_url, relay_port, if (use_tls) "true" else "false" }); 78 + log.info("collection index: {s}", .{ci_dir}); 79 + 80 + var http_ctx = HttpContext{ 81 + .collection_index = &ci, 82 + .backfiller = &backfiller, 83 + .subscriber = &subscriber, 84 + }; 85 + 86 + const address = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, listen_port); 87 + var server = address.listen(.{ .reuse_address = true }) catch |err| { 88 + log.err("failed to listen on :{d}: {s}", .{ listen_port, @errorName(err) }); 89 + return err; 90 + }; 91 + 92 + while (!shutdown_flag.load(.acquire)) { 93 + const conn = server.accept() catch |err| { 94 + if (shutdown_flag.load(.acquire)) break; 95 + log.debug("accept error: {s}", .{@errorName(err)}); 96 + continue; 97 + }; 98 + const timeout = std.posix.timeval{ .sec = 5, .usec = 0 }; 99 + std.posix.setsockopt(conn.stream.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, std.mem.asBytes(&timeout)) catch {}; 100 + handleConnection(conn.stream, &http_ctx); 101 + } 102 + 103 + log.info("shutdown signal received, stopping...", .{}); 104 + server.stream.close(); 105 + sub_thread.join(); 106 + log.info("collectiondir stopped cleanly", .{}); 107 + } 108 + 109 + const HttpContext = struct { 110 + collection_index: *collection_index_mod.CollectionIndex, 111 + backfiller: *backfill_mod.Backfiller, 112 + subscriber: *collectiondir_sub.Subscriber, 113 + }; 114 + 115 + fn handleConnection(stream: std.net.Stream, ctx: *HttpContext) void { 116 + defer stream.close(); 117 + 118 + var recv_buf: [4096]u8 = undefined; 119 + var send_buf: [4096]u8 = undefined; 120 + var connection_reader = stream.reader(&recv_buf); 121 + var connection_writer = stream.writer(&send_buf); 122 + var server = http.Server.init(connection_reader.interface(), &connection_writer.interface); 123 + 124 + var request = server.receiveHead() catch return; 125 + const target = request.head.target; 126 + const method = request.head.method; 127 + 128 + const qmark = std.mem.indexOfScalar(u8, target, '?'); 129 + const path = target[0..(qmark orelse target.len)]; 130 + const query = if (qmark) |q| target[q + 1 ..] else ""; 131 + 132 + if (method == .GET) { 133 + if (std.mem.eql(u8, path, "/_healthz")) { 134 + respond(&request, .ok, "application/json", "{\"status\":\"ok\"}"); 135 + } else if (std.mem.eql(u8, path, "/_health")) { 136 + handleReadiness(&request, ctx); 137 + } else if (std.mem.eql(u8, path, "/metrics")) { 138 + var metrics_buf: [8192]u8 = undefined; 139 + const body = ctx.subscriber.formatMetrics(&metrics_buf); 140 + respond(&request, .ok, "text/plain; version=0.0.4; charset=utf-8", body); 141 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 142 + handleListReposByCollection(&request, query, ctx.collection_index); 143 + } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 144 + if (!checkAdmin(&request)) return; 145 + handleBackfillStatus(&request, ctx.backfiller); 146 + } else { 147 + respond(&request, .not_found, "text/plain", "not found"); 148 + } 149 + } else if (method == .POST) { 150 + if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 151 + if (!checkAdmin(&request)) return; 152 + handleBackfillTrigger(&request, query, ctx.backfiller); 153 + } else { 154 + respond(&request, .not_found, "text/plain", "not found"); 155 + } 156 + } else { 157 + respond(&request, .method_not_allowed, "text/plain", "method not allowed"); 158 + } 159 + } 160 + 161 + /// readiness: subscriber connected + cursor advancing 162 + fn handleReadiness(request: *http.Server.Request, ctx: *HttpContext) void { 163 + const stats = &ctx.subscriber.stats; 164 + const connected = stats.connected.load(.monotonic); 165 + const last_event_time = stats.last_event_time.load(.monotonic); 166 + const now = std.time.timestamp(); 167 + 168 + // subscriber must be connected and have received an event within the last 60s 169 + // (allows for startup grace — last_event_time == 0 means never received) 170 + const stale_threshold = 60; 171 + const cursor_fresh = connected and (last_event_time == 0 or (now - last_event_time) < stale_threshold); 172 + 173 + if (cursor_fresh) { 174 + var buf: [256]u8 = undefined; 175 + const body = std.fmt.bufPrint(&buf, "{{\"status\":\"ok\",\"connected\":{},\"last_seq\":{d}}}", .{ 176 + connected, stats.last_seq.load(.monotonic), 177 + }) catch "{\"status\":\"ok\"}"; 178 + respond(request, .ok, "application/json", body); 179 + } else { 180 + var buf: [256]u8 = undefined; 181 + const body = std.fmt.bufPrint(&buf, "{{\"status\":\"unhealthy\",\"connected\":{},\"cursor_fresh\":{}}}", .{ 182 + connected, cursor_fresh, 183 + }) catch "{\"status\":\"unhealthy\"}"; 184 + respond(request, .service_unavailable, "application/json", body); 185 + } 186 + } 187 + 188 + fn handleListReposByCollection( 189 + request: *http.Server.Request, 190 + query: []const u8, 191 + ci: *collection_index_mod.CollectionIndex, 192 + ) void { 193 + const collection = queryParam(query, "collection") orelse { 194 + respond(request, .bad_request, "application/json", "{\"error\":\"BadRequest\",\"message\":\"collection parameter required\"}"); 195 + return; 196 + }; 197 + 198 + if (collection.len == 0 or !std.mem.containsAtLeast(u8, collection, 1, ".")) { 199 + respond(request, .bad_request, "application/json", "{\"error\":\"BadRequest\",\"message\":\"invalid collection NSID\"}"); 200 + return; 201 + } 202 + 203 + const limit_str = queryParam(query, "limit") orelse "500"; 204 + const limit = std.fmt.parseInt(usize, limit_str, 10) catch { 205 + respond(request, .bad_request, "application/json", "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 206 + return; 207 + }; 208 + if (limit < 1 or limit > 2000) { 209 + respond(request, .bad_request, "application/json", "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..2000\"}"); 210 + return; 211 + } 212 + 213 + var cursor_buf: [256]u8 = undefined; 214 + const cursor_did = queryParamDecoded(query, "cursor", &cursor_buf); 215 + 216 + var did_buf: [65536]u8 = undefined; 217 + const ci_result = ci.listReposByCollection(collection, limit, cursor_did, &did_buf) catch { 218 + respond(request, .internal_server_error, "application/json", "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); 219 + return; 220 + }; 221 + 222 + var buf: [65536]u8 = undefined; 223 + var fbs = std.io.fixedBufferStream(&buf); 224 + const w = fbs.writer(); 225 + 226 + w.writeAll("{\"repos\":[") catch return; 227 + for (0..ci_result.count) |i| { 228 + if (i > 0) w.writeByte(',') catch return; 229 + w.writeAll("{\"did\":\"") catch return; 230 + w.writeAll(ci_result.getDid(i)) catch return; 231 + w.writeAll("\"}") catch return; 232 + } 233 + w.writeByte(']') catch return; 234 + 235 + if (ci_result.last_did) |last| { 236 + if (ci_result.count >= limit) { 237 + w.writeAll(",\"cursor\":\"") catch return; 238 + w.writeAll(last) catch return; 239 + w.writeAll("\"") catch return; 240 + } 241 + } 242 + 243 + w.writeByte('}') catch return; 244 + respond(request, .ok, "application/json", fbs.getWritten()); 245 + } 246 + 247 + fn handleBackfillStatus(request: *http.Server.Request, backfiller: *backfill_mod.Backfiller) void { 248 + const body = backfiller.getStatus(backfiller.allocator) catch { 249 + respond(request, .internal_server_error, "application/json", "{\"error\":\"failed to query backfill status\"}"); 250 + return; 251 + }; 252 + defer backfiller.allocator.free(body); 253 + respond(request, .ok, "application/json", body); 254 + } 255 + 256 + fn handleBackfillTrigger(request: *http.Server.Request, query: []const u8, backfiller: *backfill_mod.Backfiller) void { 257 + const source = queryParam(query, "source") orelse "bsky.network"; 258 + backfiller.start(source) catch |err| { 259 + switch (err) { 260 + error.AlreadyRunning => { 261 + respond(request, .conflict, "application/json", "{\"error\":\"backfill already in progress\"}"); 262 + }, 263 + else => { 264 + respond(request, .internal_server_error, "application/json", "{\"error\":\"failed to start backfill\"}"); 265 + }, 266 + } 267 + return; 268 + }; 269 + var buf: [256]u8 = undefined; 270 + const resp_body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { 271 + respond(request, .ok, "application/json", "{\"status\":\"started\"}"); 272 + return; 273 + }; 274 + respond(request, .ok, "application/json", resp_body); 275 + } 276 + 277 + // --- auth --- 278 + 279 + fn checkAdmin(request: *http.Server.Request) bool { 280 + const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { 281 + respond(request, .forbidden, "application/json", "{\"error\":\"admin endpoint not configured\"}"); 282 + return false; 283 + }; 284 + 285 + var it = request.iterateHeaders(); 286 + while (it.next()) |header| { 287 + if (std.ascii.eqlIgnoreCase(header.name, "authorization")) { 288 + const bearer_prefix = "Bearer "; 289 + if (!std.mem.startsWith(u8, header.value, bearer_prefix)) { 290 + respond(request, .unauthorized, "application/json", "{\"error\":\"invalid authorization scheme\"}"); 291 + return false; 292 + } 293 + const token = header.value[bearer_prefix.len..]; 294 + if (!std.mem.eql(u8, token, admin_pw)) { 295 + respond(request, .unauthorized, "application/json", "{\"error\":\"invalid token\"}"); 296 + return false; 297 + } 298 + return true; 299 + } 300 + } 301 + 302 + respond(request, .unauthorized, "application/json", "{\"error\":\"missing authorization header\"}"); 303 + return false; 304 + } 305 + 306 + // --- response helpers --- 307 + 308 + fn respond(request: *http.Server.Request, status: http.Status, content_type: []const u8, body: []const u8) void { 309 + request.respond(body, .{ 310 + .status = status, 311 + .keep_alive = false, 312 + .extra_headers = &.{ 313 + .{ .name = "content-type", .value = content_type }, 314 + .{ .name = "server", .value = "collectiondir (zlay)" }, 315 + }, 316 + }) catch {}; 317 + } 318 + 319 + // --- query string helpers --- 320 + 321 + fn queryParam(query: []const u8, name: []const u8) ?[]const u8 { 322 + if (query.len == 0) return null; 323 + var iter = std.mem.splitScalar(u8, query, '&'); 324 + while (iter.next()) |pair| { 325 + const eq = std.mem.indexOfScalar(u8, pair, '=') orelse continue; 326 + if (std.mem.eql(u8, pair[0..eq], name)) { 327 + return pair[eq + 1 ..]; 328 + } 329 + } 330 + return null; 331 + } 332 + 333 + fn queryParamDecoded(query: []const u8, name: []const u8, buf: []u8) ?[]const u8 { 334 + const raw = queryParam(query, name) orelse return null; 335 + var i: usize = 0; 336 + var out: usize = 0; 337 + while (i < raw.len) { 338 + if (raw[i] == '%' and i + 2 < raw.len) { 339 + const hi = hexVal(raw[i + 1]) orelse { 340 + if (out >= buf.len) return null; 341 + buf[out] = raw[i]; 342 + out += 1; 343 + i += 1; 344 + continue; 345 + }; 346 + const lo = hexVal(raw[i + 2]) orelse { 347 + if (out >= buf.len) return null; 348 + buf[out] = raw[i]; 349 + out += 1; 350 + i += 1; 351 + continue; 352 + }; 353 + if (out >= buf.len) return null; 354 + buf[out] = (@as(u8, hi) << 4) | @as(u8, lo); 355 + out += 1; 356 + i += 3; 357 + } else if (raw[i] == '+') { 358 + if (out >= buf.len) return null; 359 + buf[out] = ' '; 360 + out += 1; 361 + i += 1; 362 + } else { 363 + if (out >= buf.len) return null; 364 + buf[out] = raw[i]; 365 + out += 1; 366 + i += 1; 367 + } 368 + } 369 + return buf[0..out]; 370 + } 371 + 372 + fn hexVal(c: u8) ?u4 { 373 + return switch (c) { 374 + '0'...'9' => @intCast(c - '0'), 375 + 'a'...'f' => @intCast(c - 'a' + 10), 376 + 'A'...'F' => @intCast(c - 'A' + 10), 377 + else => null, 378 + }; 379 + } 380 + 381 + // --- helpers --- 382 + 383 + fn signalHandler(_: c_int) callconv(.c) void { 384 + shutdown_flag.store(true, .release); 385 + } 386 + 387 + fn installSignalHandlers() void { 388 + const act: std.posix.Sigaction = .{ 389 + .handler = .{ .handler = signalHandler }, 390 + .mask = std.posix.sigemptyset(), 391 + .flags = 0, 392 + }; 393 + std.posix.sigaction(std.posix.SIG.INT, &act, null); 394 + std.posix.sigaction(std.posix.SIG.TERM, &act, null); 395 + 396 + const ignore_act: std.posix.Sigaction = .{ 397 + .handler = .{ .handler = std.posix.SIG.IGN }, 398 + .mask = std.posix.sigemptyset(), 399 + .flags = 0, 400 + }; 401 + std.posix.sigaction(std.posix.SIG.PIPE, &ignore_act, null); 402 + } 403 + 404 + fn parseEnvInt(comptime T: type, key: []const u8, default: T) T { 405 + const val = std.posix.getenv(key) orelse return default; 406 + return std.fmt.parseInt(T, val, 10) catch default; 407 + }
+423
src/collectiondir_backfill.zig
··· 1 + //! collectiondir backfill — discovers collections and imports DIDs from a source relay 2 + //! 3 + //! same logic as backfill.zig but stores progress in RocksDB default CF instead 4 + //! of postgres. no external database dependency. 5 + //! 6 + //! progress keys: "bf:{collection}\0{source}" → "{cursor}\0{count}\0{0|1}" 7 + //! (cursor string, imported count, completed flag) 8 + 9 + const std = @import("std"); 10 + const http = std.http; 11 + const collection_index_mod = @import("collection_index.zig"); 12 + 13 + const Allocator = std.mem.Allocator; 14 + const log = std.log.scoped(.backfill); 15 + 16 + pub const Backfiller = struct { 17 + allocator: Allocator, 18 + collection_index: *collection_index_mod.CollectionIndex, 19 + running: std.atomic.Value(bool), 20 + thread: ?std.Thread, 21 + source: []const u8, 22 + 23 + pub fn init( 24 + allocator: Allocator, 25 + collection_index: *collection_index_mod.CollectionIndex, 26 + ) Backfiller { 27 + return .{ 28 + .allocator = allocator, 29 + .collection_index = collection_index, 30 + .running = .{ .raw = false }, 31 + .thread = null, 32 + .source = "", 33 + }; 34 + } 35 + 36 + pub fn isRunning(self: *Backfiller) bool { 37 + return self.running.load(.acquire); 38 + } 39 + 40 + pub fn start(self: *Backfiller, source: []const u8) !void { 41 + if (self.running.cmpxchgStrong(false, true, .acq_rel, .acquire) != null) { 42 + return error.AlreadyRunning; 43 + } 44 + errdefer self.running.store(false, .release); 45 + 46 + self.source = try self.allocator.dupe(u8, source); 47 + self.thread = try std.Thread.spawn(.{ .stack_size = 8 * 1024 * 1024 }, run, .{self}); 48 + } 49 + 50 + fn run(self: *Backfiller) void { 51 + defer { 52 + self.allocator.free(self.source); 53 + self.source = ""; 54 + self.thread = null; 55 + self.running.store(false, .release); 56 + } 57 + 58 + const collections = self.discoverCollections() catch |err| { 59 + log.err("collection discovery failed: {s}", .{@errorName(err)}); 60 + return; 61 + }; 62 + defer { 63 + for (collections) |c| self.allocator.free(c); 64 + self.allocator.free(collections); 65 + } 66 + 67 + log.info("discovered {d} collections to backfill from {s}", .{ collections.len, self.source }); 68 + 69 + for (collections) |collection| { 70 + self.backfillCollection(collection) catch |err| { 71 + log.warn("backfill failed for {s}: {s}", .{ collection, @errorName(err) }); 72 + }; 73 + } 74 + 75 + log.info("backfill complete", .{}); 76 + } 77 + 78 + fn discoverCollections(self: *Backfiller) ![][]const u8 { 79 + var seen: std.StringHashMapUnmanaged(void) = .{}; 80 + defer seen.deinit(self.allocator); 81 + 82 + // source 1: lexicon garden 83 + const garden = self.fetchLexiconGarden() catch |err| blk: { 84 + log.warn("lexicon garden fetch failed: {s}", .{@errorName(err)}); 85 + break :blk &[_][]const u8{}; 86 + }; 87 + defer { 88 + for (garden) |c| self.allocator.free(c); 89 + self.allocator.free(garden); 90 + } 91 + for (garden) |c| { 92 + if (!seen.contains(c)) { 93 + const duped = try self.allocator.dupe(u8, c); 94 + errdefer self.allocator.free(duped); 95 + try seen.put(self.allocator, duped, {}); 96 + } 97 + } 98 + 99 + // source 2: observed collections from RBC scan 100 + const observed = self.collection_index.listKnownCollections(self.allocator) catch |err| blk: { 101 + log.warn("RBC scan failed: {s}", .{@errorName(err)}); 102 + break :blk &[_][]const u8{}; 103 + }; 104 + defer { 105 + for (observed) |c| self.allocator.free(c); 106 + self.allocator.free(observed); 107 + } 108 + for (observed) |c| { 109 + if (!seen.contains(c)) { 110 + const duped = try self.allocator.dupe(u8, c); 111 + errdefer self.allocator.free(duped); 112 + try seen.put(self.allocator, duped, {}); 113 + } 114 + } 115 + 116 + const result = try self.allocator.alloc([]const u8, seen.count()); 117 + var i: usize = 0; 118 + var key_iter = seen.keyIterator(); 119 + while (key_iter.next()) |key| { 120 + result[i] = key.*; 121 + i += 1; 122 + } 123 + return result; 124 + } 125 + 126 + fn fetchLexiconGarden(self: *Backfiller) ![][]const u8 { 127 + var client: http.Client = .{ .allocator = self.allocator }; 128 + defer client.deinit(); 129 + 130 + var aw: std.Io.Writer.Allocating = .init(self.allocator); 131 + defer aw.deinit(); 132 + 133 + const result = client.fetch(.{ 134 + .location = .{ .url = "https://lexicon.garden/llms.txt" }, 135 + .response_writer = &aw.writer, 136 + .method = .GET, 137 + }) catch return error.FetchFailed; 138 + 139 + if (result.status != .ok) return error.FetchFailed; 140 + 141 + const body = aw.written(); 142 + 143 + var nsids: std.ArrayListUnmanaged([]const u8) = .{}; 144 + defer nsids.deinit(self.allocator); 145 + 146 + var lines = std.mem.splitScalar(u8, body, '\n'); 147 + while (lines.next()) |line| { 148 + const backtick_start = std.mem.indexOf(u8, line, "- [`") orelse continue; 149 + const nsid_start = backtick_start + 4; 150 + const rest = line[nsid_start..]; 151 + const backtick_end = std.mem.indexOf(u8, rest, "`](") orelse continue; 152 + const nsid = rest[0..backtick_end]; 153 + if (!std.mem.containsAtLeast(u8, nsid, 1, ".")) continue; 154 + 155 + const duped = try self.allocator.dupe(u8, nsid); 156 + errdefer self.allocator.free(duped); 157 + try nsids.append(self.allocator, duped); 158 + } 159 + 160 + log.info("lexicon garden: found {d} NSIDs", .{nsids.items.len}); 161 + return try nsids.toOwnedSlice(self.allocator); 162 + } 163 + 164 + // --- progress storage in RocksDB default CF --- 165 + // key: "bf:{collection}\0{source}" 166 + // value: "{cursor}\0{count}\0{0|1}" (cursor, imported count, completed) 167 + 168 + fn progressKey(self: *const Backfiller, collection: []const u8, buf: []u8) ?[]const u8 { 169 + const prefix = "bf:"; 170 + const needed = prefix.len + collection.len + 1 + self.source.len; 171 + if (needed > buf.len) return null; 172 + @memcpy(buf[0..prefix.len], prefix); 173 + @memcpy(buf[prefix.len..][0..collection.len], collection); 174 + buf[prefix.len + collection.len] = 0; 175 + @memcpy(buf[prefix.len + collection.len + 1 ..][0..self.source.len], self.source); 176 + return buf[0..needed]; 177 + } 178 + 179 + const Progress = struct { 180 + cursor: []const u8, 181 + imported: i64, 182 + completed: bool, 183 + /// rocksdb-owned memory — caller must call deinit() when done 184 + _data: @import("rocksdb").Data, 185 + 186 + pub fn deinit(self: Progress) void { 187 + self._data.deinit(); 188 + } 189 + }; 190 + 191 + fn loadProgress(self: *Backfiller, collection: []const u8) ?Progress { 192 + var key_buf: [512]u8 = undefined; 193 + const key = self.progressKey(collection, &key_buf) orelse return null; 194 + const data = self.collection_index.getMeta(key) orelse return null; 195 + const val = data.data; 196 + 197 + // parse: "{cursor}\0{count}\0{0|1}" 198 + const sep1 = std.mem.indexOfScalar(u8, val, 0) orelse { 199 + data.deinit(); 200 + return null; 201 + }; 202 + const rest = val[sep1 + 1 ..]; 203 + const sep2 = std.mem.indexOfScalar(u8, rest, 0) orelse { 204 + data.deinit(); 205 + return null; 206 + }; 207 + 208 + return .{ 209 + .cursor = val[0..sep1], 210 + .imported = std.fmt.parseInt(i64, rest[0..sep2], 10) catch 0, 211 + .completed = rest.len > sep2 + 1 and rest[sep2 + 1] == '1', 212 + ._data = data, 213 + }; 214 + } 215 + 216 + fn saveProgress(self: *Backfiller, collection: []const u8, cursor: []const u8, imported: i64, completed: bool) void { 217 + var key_buf: [512]u8 = undefined; 218 + const key = self.progressKey(collection, &key_buf) orelse return; 219 + 220 + var val_buf: [512]u8 = undefined; 221 + var pos: usize = 0; 222 + if (cursor.len > 0) { 223 + @memcpy(val_buf[0..cursor.len], cursor); 224 + pos = cursor.len; 225 + } 226 + val_buf[pos] = 0; 227 + pos += 1; 228 + const count_str = std.fmt.bufPrint(val_buf[pos..], "{d}", .{imported}) catch return; 229 + pos += count_str.len; 230 + val_buf[pos] = 0; 231 + pos += 1; 232 + val_buf[pos] = if (completed) '1' else '0'; 233 + pos += 1; 234 + 235 + self.collection_index.putMeta(key, val_buf[0..pos]); 236 + } 237 + 238 + fn backfillCollection(self: *Backfiller, collection: []const u8) !void { 239 + // check saved progress 240 + var cursor: ?[]const u8 = null; 241 + defer if (cursor) |c| self.allocator.free(c); 242 + var imported: i64 = 0; 243 + 244 + if (self.loadProgress(collection)) |progress| { 245 + defer progress.deinit(); 246 + if (progress.completed) return; // already done 247 + imported = progress.imported; 248 + if (progress.cursor.len > 0) { 249 + cursor = try self.allocator.dupe(u8, progress.cursor); 250 + log.info("{s}: resuming from cursor (imported {d} so far)", .{ collection, imported }); 251 + } 252 + } 253 + 254 + var client: http.Client = .{ .allocator = self.allocator }; 255 + defer client.deinit(); 256 + 257 + var page_count: usize = 0; 258 + while (true) { 259 + const fetch_result = self.fetchPage(&client, collection, cursor) catch |err| { 260 + log.warn("{s}: fetch page failed: {s}", .{ collection, @errorName(err) }); 261 + break; 262 + }; 263 + defer { 264 + for (fetch_result.dids) |d| self.allocator.free(d); 265 + self.allocator.free(fetch_result.dids); 266 + if (fetch_result.next_cursor) |nc| self.allocator.free(nc); 267 + } 268 + 269 + for (fetch_result.dids) |did| { 270 + self.collection_index.addCollection(did, collection) catch {}; 271 + imported += 1; 272 + } 273 + 274 + page_count += 1; 275 + 276 + const new_cursor = fetch_result.next_cursor orelse ""; 277 + self.saveProgress(collection, new_cursor, imported, false); 278 + 279 + if (fetch_result.next_cursor) |nc| { 280 + if (cursor) |old| self.allocator.free(old); 281 + cursor = self.allocator.dupe(u8, nc) catch break; 282 + std.posix.nanosleep(0, 100 * std.time.ns_per_ms); 283 + } else { 284 + self.saveProgress(collection, "", imported, true); 285 + log.info("{s}: complete ({d} DIDs, {d} pages)", .{ collection, imported, page_count }); 286 + break; 287 + } 288 + } 289 + } 290 + 291 + fn fetchPage(self: *Backfiller, client: *http.Client, collection: []const u8, cursor: ?[]const u8) !FetchResult { 292 + var url_buf: [1024]u8 = undefined; 293 + const url = if (cursor) |c| 294 + std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000&cursor={s}", .{ self.source, collection, c }) catch return error.UrlTooLong 295 + else 296 + std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000", .{ self.source, collection }) catch return error.UrlTooLong; 297 + 298 + var aw: std.Io.Writer.Allocating = .init(self.allocator); 299 + defer aw.deinit(); 300 + 301 + const result = client.fetch(.{ 302 + .location = .{ .url = url }, 303 + .response_writer = &aw.writer, 304 + .method = .GET, 305 + }) catch return error.FetchFailed; 306 + 307 + if (result.status != .ok) return error.FetchFailed; 308 + 309 + const body = aw.written(); 310 + 311 + const parsed = std.json.parseFromSlice(ListReposResponse, self.allocator, body, .{ .ignore_unknown_fields = true }) catch return error.ParseFailed; 312 + defer parsed.deinit(); 313 + 314 + const repos = parsed.value.repos orelse return .{ 315 + .dids = try self.allocator.alloc([]const u8, 0), 316 + .next_cursor = null, 317 + }; 318 + 319 + var dids: std.ArrayListUnmanaged([]const u8) = .{}; 320 + defer dids.deinit(self.allocator); 321 + 322 + for (repos) |repo| { 323 + const duped = self.allocator.dupe(u8, repo.did) catch continue; 324 + dids.append(self.allocator, duped) catch { 325 + self.allocator.free(duped); 326 + continue; 327 + }; 328 + } 329 + 330 + const next_cursor = if (parsed.value.cursor) |c| 331 + self.allocator.dupe(u8, c) catch null 332 + else 333 + null; 334 + 335 + return .{ 336 + .dids = dids.toOwnedSlice(self.allocator) catch return error.OutOfMemory, 337 + .next_cursor = next_cursor, 338 + }; 339 + } 340 + 341 + const FetchResult = struct { 342 + dids: [][]const u8, 343 + next_cursor: ?[]const u8, 344 + }; 345 + 346 + const ListReposResponse = struct { 347 + repos: ?[]const RepoEntry = null, 348 + cursor: ?[]const u8 = null, 349 + }; 350 + 351 + const RepoEntry = struct { 352 + did: []const u8, 353 + }; 354 + 355 + /// return status summary for the admin endpoint 356 + pub fn getStatus(self: *Backfiller, allocator: Allocator) ![]u8 { 357 + var list: std.ArrayListUnmanaged(u8) = .{}; 358 + defer list.deinit(allocator); 359 + const w = list.writer(allocator); 360 + 361 + var total: usize = 0; 362 + var completed: usize = 0; 363 + var total_imported: i64 = 0; 364 + 365 + // scan all bf: keys in the default CF 366 + const ci = self.collection_index; 367 + var err_str: ?@import("rocksdb").Data = null; 368 + var iter = ci.db.iterator(ci.default, .forward, "bf:"); 369 + defer iter.deinit(); 370 + 371 + var first_detail = true; 372 + var details: std.ArrayListUnmanaged(u8) = .{}; 373 + defer details.deinit(allocator); 374 + const dw = details.writer(allocator); 375 + 376 + while (iter.next(&err_str) catch null) |entry| { 377 + const key_data = entry[0].data; 378 + if (!std.mem.startsWith(u8, key_data, "bf:")) break; 379 + 380 + const val_data = entry[1].data; 381 + 382 + // parse key: "bf:{collection}\0{source}" 383 + const key_rest = key_data[3..]; // skip "bf:" 384 + const sep = std.mem.indexOfScalar(u8, key_rest, 0) orelse continue; 385 + const collection = key_rest[0..sep]; 386 + const source = key_rest[sep + 1 ..]; 387 + 388 + // parse value: "{cursor}\0{count}\0{0|1}" 389 + const val_sep1 = std.mem.indexOfScalar(u8, val_data, 0) orelse continue; 390 + const val_rest = val_data[val_sep1 + 1 ..]; 391 + const val_sep2 = std.mem.indexOfScalar(u8, val_rest, 0) orelse continue; 392 + const count = std.fmt.parseInt(i64, val_rest[0..val_sep2], 10) catch 0; 393 + const is_completed = val_rest.len > val_sep2 + 1 and val_rest[val_sep2 + 1] == '1'; 394 + 395 + total += 1; 396 + if (is_completed) completed += 1; 397 + total_imported += count; 398 + 399 + if (!first_detail) dw.writeByte(',') catch {}; 400 + first_detail = false; 401 + 402 + std.fmt.format(dw, "{{\"collection\":\"{s}\",\"source\":\"{s}\",\"imported\":{d},\"completed\":{}}}", .{ 403 + collection, 404 + source, 405 + count, 406 + is_completed, 407 + }) catch {}; 408 + } 409 + 410 + std.fmt.format(w, "{{\"running\":{},\"total\":{d},\"completed\":{d},\"in_progress\":{d},\"total_imported\":{d},\"collections\":[", .{ 411 + self.isRunning(), 412 + total, 413 + completed, 414 + total - completed, 415 + total_imported, 416 + }) catch return error.FormatError; 417 + 418 + w.writeAll(details.items) catch {}; 419 + w.writeAll("]}") catch {}; 420 + 421 + return try list.toOwnedSlice(allocator); 422 + } 423 + };
+314
src/collectiondir_subscriber.zig
··· 1 + //! collectiondir firehose subscriber — consumes relay's subscribeRepos stream 2 + //! 3 + //! simpler than the relay's subscriber.zig — no validation, no host management, 4 + //! no rate limiting. just CBOR frame decode → collection index updates. 5 + //! 6 + //! on #commit: extract ops, call trackCommitOps 7 + //! on #account: if inactive, removeAll from index 8 + //! on #sync: count only (resync needs PDS hostname — requires DID doc resolution, stage 2) 9 + //! 10 + //! cursor persisted to RocksDB default CF (key "cursor"), not postgres. 11 + 12 + const std = @import("std"); 13 + const websocket = @import("websocket"); 14 + const zat = @import("zat"); 15 + const collection_index_mod = @import("collection_index.zig"); 16 + 17 + const Allocator = std.mem.Allocator; 18 + const log = std.log.scoped(.collectiondir); 19 + 20 + const cursor_flush_interval_sec = 4; 21 + 22 + pub const Stats = struct { 23 + commits: std.atomic.Value(u64) = .{ .raw = 0 }, 24 + accounts: std.atomic.Value(u64) = .{ .raw = 0 }, 25 + syncs: std.atomic.Value(u64) = .{ .raw = 0 }, 26 + identities: std.atomic.Value(u64) = .{ .raw = 0 }, 27 + decode_errors: std.atomic.Value(u64) = .{ .raw = 0 }, 28 + ops_tracked: std.atomic.Value(u64) = .{ .raw = 0 }, 29 + accounts_removed: std.atomic.Value(u64) = .{ .raw = 0 }, 30 + reconnects: std.atomic.Value(u64) = .{ .raw = 0 }, 31 + last_seq: std.atomic.Value(u64) = .{ .raw = 0 }, 32 + connected: std.atomic.Value(bool) = .{ .raw = false }, 33 + last_event_time: std.atomic.Value(i64) = .{ .raw = 0 }, 34 + }; 35 + 36 + pub const Subscriber = struct { 37 + allocator: Allocator, 38 + relay_url: []const u8, 39 + relay_port: u16, 40 + use_tls: bool, 41 + collection_index: *collection_index_mod.CollectionIndex, 42 + shutdown: *std.atomic.Value(bool), 43 + stats: Stats = .{}, 44 + last_seq: ?u64 = null, 45 + last_cursor_flush: i64 = 0, 46 + 47 + pub fn init( 48 + allocator: Allocator, 49 + relay_url: []const u8, 50 + relay_port: u16, 51 + use_tls: bool, 52 + collection_index: *collection_index_mod.CollectionIndex, 53 + shutdown: *std.atomic.Value(bool), 54 + ) Subscriber { 55 + return .{ 56 + .allocator = allocator, 57 + .relay_url = relay_url, 58 + .relay_port = relay_port, 59 + .use_tls = use_tls, 60 + .collection_index = collection_index, 61 + .shutdown = shutdown, 62 + }; 63 + } 64 + 65 + /// load saved cursor from RocksDB default CF 66 + pub fn loadCursor(self: *Subscriber) void { 67 + const val = self.collection_index.getMeta("cursor") orelse return; 68 + defer val.deinit(); 69 + const seq = std.fmt.parseInt(u64, val.data, 10) catch return; 70 + if (seq > 0) { 71 + self.last_seq = seq; 72 + log.info("resuming from cursor {d}", .{seq}); 73 + } 74 + } 75 + 76 + /// save cursor to RocksDB default CF 77 + fn flushCursor(self: *Subscriber) void { 78 + const seq = self.last_seq orelse return; 79 + var buf: [20]u8 = undefined; 80 + const val = std.fmt.bufPrint(&buf, "{d}", .{seq}) catch return; 81 + self.collection_index.putMeta("cursor", val); 82 + } 83 + 84 + /// run the subscriber loop with reconnect + backoff 85 + pub fn run(self: *Subscriber) void { 86 + self.loadCursor(); 87 + 88 + var backoff: u64 = 1; 89 + const max_backoff: u64 = 60; 90 + 91 + while (!self.shutdown.load(.acquire)) { 92 + log.info("connecting to {s}:{d}...", .{ self.relay_url, self.relay_port }); 93 + 94 + if (self.connectAndRead()) { 95 + // clean disconnect (EOF) — reset backoff 96 + backoff = 1; 97 + } else |err| { 98 + if (self.shutdown.load(.acquire)) return; 99 + log.err("connection error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 100 + _ = self.stats.reconnects.fetchAdd(1, .monotonic); 101 + } 102 + 103 + if (self.shutdown.load(.acquire)) return; 104 + 105 + // flush cursor before backoff 106 + self.flushCursor(); 107 + 108 + // backoff sleep in 1s chunks 109 + var remaining: u64 = backoff; 110 + while (remaining > 0 and !self.shutdown.load(.acquire)) { 111 + std.posix.nanosleep(1, 0); 112 + remaining -= 1; 113 + } 114 + backoff = @min(backoff * 2, max_backoff); 115 + } 116 + 117 + // final cursor flush 118 + self.flushCursor(); 119 + } 120 + 121 + fn connectAndRead(self: *Subscriber) !void { 122 + var path_buf: [256]u8 = undefined; 123 + var w: std.Io.Writer = .fixed(&path_buf); 124 + 125 + try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos"); 126 + if (self.last_seq) |cursor| { 127 + try w.print("?cursor={d}", .{cursor}); 128 + } 129 + const path = w.buffered(); 130 + 131 + var client = try websocket.Client.init(self.allocator, .{ 132 + .host = self.relay_url, 133 + .port = self.relay_port, 134 + .tls = self.use_tls, 135 + .max_size = 5 * 1024 * 1024, 136 + }); 137 + defer client.deinit(); 138 + 139 + var host_header_buf: [256]u8 = undefined; 140 + const host_header = std.fmt.bufPrint( 141 + &host_header_buf, 142 + "Host: {s}\r\n", 143 + .{self.relay_url}, 144 + ) catch self.relay_url; 145 + 146 + try client.handshake(path, .{ .headers = host_header }); 147 + log.info("connected to {s}", .{self.relay_url}); 148 + self.stats.connected.store(true, .release); 149 + defer self.stats.connected.store(false, .release); 150 + 151 + var handler = FrameHandler{ .sub = self }; 152 + try client.readLoop(&handler); 153 + } 154 + 155 + pub fn formatMetrics(self: *Subscriber, buf: []u8) []const u8 { 156 + var fbs = std.io.fixedBufferStream(buf); 157 + const w = fbs.writer(); 158 + 159 + const commits = self.stats.commits.load(.monotonic); 160 + const accounts = self.stats.accounts.load(.monotonic); 161 + const syncs = self.stats.syncs.load(.monotonic); 162 + const identities = self.stats.identities.load(.monotonic); 163 + const decode_errors = self.stats.decode_errors.load(.monotonic); 164 + const ops_tracked = self.stats.ops_tracked.load(.monotonic); 165 + const accounts_removed = self.stats.accounts_removed.load(.monotonic); 166 + const reconnects = self.stats.reconnects.load(.monotonic); 167 + const last_seq = self.stats.last_seq.load(.monotonic); 168 + const connected: u64 = if (self.stats.connected.load(.monotonic)) 1 else 0; 169 + const last_event_time = self.stats.last_event_time.load(.monotonic); 170 + 171 + std.fmt.format(w, 172 + \\# HELP collectiondir_events_total events processed by type 173 + \\# TYPE collectiondir_events_total counter 174 + \\collectiondir_events_total{{type="commit"}} {d} 175 + \\collectiondir_events_total{{type="account"}} {d} 176 + \\collectiondir_events_total{{type="sync"}} {d} 177 + \\collectiondir_events_total{{type="identity"}} {d} 178 + \\# HELP collectiondir_decode_errors_total CBOR decode failures 179 + \\# TYPE collectiondir_decode_errors_total counter 180 + \\collectiondir_decode_errors_total {d} 181 + \\# HELP collectiondir_ops_tracked_total collection index ops processed 182 + \\# TYPE collectiondir_ops_tracked_total counter 183 + \\collectiondir_ops_tracked_total {d} 184 + \\# HELP collectiondir_accounts_removed_total accounts removed from index (inactive) 185 + \\# TYPE collectiondir_accounts_removed_total counter 186 + \\collectiondir_accounts_removed_total {d} 187 + \\# HELP collectiondir_reconnects_total WebSocket reconnect count 188 + \\# TYPE collectiondir_reconnects_total counter 189 + \\collectiondir_reconnects_total {d} 190 + \\# HELP collectiondir_last_seq last processed firehose sequence number 191 + \\# TYPE collectiondir_last_seq gauge 192 + \\collectiondir_last_seq {d} 193 + \\# HELP collectiondir_connected whether firehose subscriber is connected 194 + \\# TYPE collectiondir_connected gauge 195 + \\collectiondir_connected {d} 196 + \\# HELP collectiondir_last_event_time_seconds unix timestamp of last processed event 197 + \\# TYPE collectiondir_last_event_time_seconds gauge 198 + \\collectiondir_last_event_time_seconds {d} 199 + \\ 200 + , .{ 201 + commits, 202 + accounts, 203 + syncs, 204 + identities, 205 + decode_errors, 206 + ops_tracked, 207 + accounts_removed, 208 + reconnects, 209 + last_seq, 210 + connected, 211 + last_event_time, 212 + }) catch {}; 213 + 214 + return fbs.getWritten(); 215 + } 216 + }; 217 + 218 + const FrameHandler = struct { 219 + sub: *Subscriber, 220 + 221 + pub fn serverMessage(self: *FrameHandler, data: []const u8) !void { 222 + const sub = self.sub; 223 + 224 + var arena = std.heap.ArenaAllocator.init(sub.allocator); 225 + defer arena.deinit(); 226 + const alloc = arena.allocator(); 227 + 228 + // decode header 229 + const header_result = zat.cbor.decode(alloc, data) catch |err| { 230 + log.debug("frame header decode failed: {s}", .{@errorName(err)}); 231 + _ = sub.stats.decode_errors.fetchAdd(1, .monotonic); 232 + return; 233 + }; 234 + const header = header_result.value; 235 + const payload_data = data[header_result.consumed..]; 236 + 237 + const op = header.getInt("op") orelse return; 238 + if (op != 1) return; // skip error frames 239 + 240 + const frame_type = header.getString("t") orelse return; 241 + const payload = zat.cbor.decodeAll(alloc, payload_data) catch |err| { 242 + log.debug("frame payload decode failed: {s} (type={s})", .{ @errorName(err), frame_type }); 243 + _ = sub.stats.decode_errors.fetchAdd(1, .monotonic); 244 + return; 245 + }; 246 + 247 + const upstream_seq = payload.getUint("seq"); 248 + 249 + // route by frame type 250 + const is_commit = std.mem.eql(u8, frame_type, "#commit"); 251 + const is_account = std.mem.eql(u8, frame_type, "#account"); 252 + const is_sync = std.mem.eql(u8, frame_type, "#sync"); 253 + const is_identity = std.mem.eql(u8, frame_type, "#identity"); 254 + 255 + if (is_commit) { 256 + _ = sub.stats.commits.fetchAdd(1, .monotonic); 257 + 258 + const did = payload.getString("repo") orelse { 259 + if (upstream_seq) |s| sub.last_seq = s; 260 + return; 261 + }; 262 + 263 + // track collection ops — the core purpose of this service 264 + if (payload.get("ops")) |ops| { 265 + sub.collection_index.trackCommitOps(did, ops); 266 + 267 + // count individual ops for metrics 268 + switch (ops) { 269 + .array => |items| { 270 + _ = sub.stats.ops_tracked.fetchAdd(items.len, .monotonic); 271 + }, 272 + else => {}, 273 + } 274 + } 275 + } else if (is_account) { 276 + _ = sub.stats.accounts.fetchAdd(1, .monotonic); 277 + 278 + const did = payload.getString("did") orelse { 279 + if (upstream_seq) |s| sub.last_seq = s; 280 + return; 281 + }; 282 + 283 + const is_active = payload.getBool("active") orelse false; 284 + if (!is_active) { 285 + sub.collection_index.removeAll(did) catch |err| { 286 + log.debug("removeAll failed for {s}: {s}", .{ did, @errorName(err) }); 287 + }; 288 + _ = sub.stats.accounts_removed.fetchAdd(1, .monotonic); 289 + } 290 + } else if (is_sync) { 291 + _ = sub.stats.syncs.fetchAdd(1, .monotonic); 292 + // stage 1: count only — resync requires PDS hostname via DID doc resolution (stage 2) 293 + } else if (is_identity) { 294 + _ = sub.stats.identities.fetchAdd(1, .monotonic); 295 + } else { 296 + if (upstream_seq) |s| sub.last_seq = s; 297 + return; 298 + } 299 + 300 + // advance cursor 301 + if (upstream_seq) |s| { 302 + sub.last_seq = s; 303 + sub.stats.last_seq.store(s, .monotonic); 304 + sub.stats.last_event_time.store(std.time.timestamp(), .monotonic); 305 + } 306 + 307 + // periodic cursor flush 308 + const now = std.time.timestamp(); 309 + if (now - sub.last_cursor_flush >= cursor_flush_interval_sec) { 310 + sub.flushCursor(); 311 + sub.last_cursor_flush = now; 312 + } 313 + } 314 + };