atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

Revert "collectiondir: standalone collection directory service"

This reverts commit cbe6907c471897c8b2adeead524589fc60b0b375.

-1214
-1
Dockerfile.runtime
··· 1 1 FROM debian:bookworm-slim 2 2 RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/* 3 3 COPY zig-out/bin/zlay /usr/local/bin/zlay 4 - COPY zig-out/bin/collectiondir /usr/local/bin/collectiondir 5 4 RUN mkdir -p /data/events /data/collection-index 6 5 ENV RELAY_DATA_DIR=/data/events 7 6 EXPOSE 3000 3001
-21
build.zig
··· 64 64 const run_step = b.step("run", "run the relay"); 65 65 run_step.dependOn(&run_relay.step); 66 66 67 - // collectiondir executable (shadow collection directory service) 68 - const collectiondir_mod = b.createModule(.{ 69 - .root_source_file = b.path("src/collectiondir.zig"), 70 - .target = target, 71 - .optimize = optimize, 72 - .imports = imports, 73 - }); 74 - collectiondir_mod.addImport("build_options", build_options.createModule()); 75 - const collectiondir = b.addExecutable(.{ 76 - .name = "collectiondir", 77 - .root_module = collectiondir_mod, 78 - }); 79 - collectiondir.linkLibC(); 80 - collectiondir.linkLibCpp(); 81 - b.installArtifact(collectiondir); 82 - 83 - const run_collectiondir = b.addRunArtifact(collectiondir); 84 - if (b.args) |args| run_collectiondir.addArgs(args); 85 - const run_collectiondir_step = b.step("run-collectiondir", "run the collection directory service"); 86 - run_collectiondir_step.dependOn(&run_collectiondir.step); 87 - 88 67 // tests 89 68 const test_step = b.step("test", "run unit tests"); 90 69 const test_files = .{
-48
src/collection_index.zig
··· 19 19 20 20 pub const CollectionIndex = struct { 21 21 db: rocksdb.DB, 22 - default: rocksdb.ColumnFamilyHandle, 23 22 rbc: rocksdb.ColumnFamilyHandle, 24 23 cbr: rocksdb.ColumnFamilyHandle, 25 24 allocator: Allocator, ··· 51 50 defer allocator.free(families); 52 51 53 52 // find column family handles by name 54 - var default: ?rocksdb.ColumnFamilyHandle = null; 55 53 var rbc: ?rocksdb.ColumnFamilyHandle = null; 56 54 var cbr: ?rocksdb.ColumnFamilyHandle = null; 57 55 for (families) |cf| { 58 - if (std.mem.eql(u8, cf.name, "default")) default = cf.handle; 59 56 if (std.mem.eql(u8, cf.name, "rbc")) rbc = cf.handle; 60 57 if (std.mem.eql(u8, cf.name, "cbr")) cbr = cf.handle; 61 58 } ··· 64 61 65 62 return .{ 66 63 .db = db, 67 - .default = default orelse return error.MissingColumnFamily, 68 64 .rbc = rbc orelse return error.MissingColumnFamily, 69 65 .cbr = cbr orelse return error.MissingColumnFamily, 70 66 .allocator = allocator, 71 - }; 72 - } 73 - 74 - // --- metadata helpers (default CF) --- 75 - 76 - /// read a metadata value from the default column family. 77 - /// caller must call .deinit() on the returned Data to free rocksdb memory. 78 - pub fn getMeta(self: *CollectionIndex, key: []const u8) ?rocksdb.Data { 79 - var err_str: ?rocksdb.Data = null; 80 - const val = self.db.get(self.default, key, &err_str) catch { 81 - if (err_str) |e| { 82 - log.debug("getMeta({s}) failed: {s}", .{ key, e.data }); 83 - e.deinit(); 84 - } 85 - return null; 86 - }; 87 - return val; 88 - } 89 - 90 - /// write a metadata value to the default column family 91 - pub fn putMeta(self: *CollectionIndex, key: []const u8, value: []const u8) void { 92 - var err_str: ?rocksdb.Data = null; 93 - var batch = rocksdb.WriteBatch.init(); 94 - defer batch.deinit(); 95 - batch.put(self.default, key, value); 96 - self.db.write(batch, &err_str) catch { 97 - if (err_str) |e| { 98 - log.warn("putMeta({s}) failed: {s}", .{ key, e.data }); 99 - e.deinit(); 100 - } 101 - }; 102 - } 103 - 104 - /// delete a metadata key from the default column family 105 - pub fn deleteMeta(self: *CollectionIndex, key: []const u8) void { 106 - var err_str: ?rocksdb.Data = null; 107 - var batch = rocksdb.WriteBatch.init(); 108 - defer batch.deinit(); 109 - batch.delete(self.default, key); 110 - self.db.write(batch, &err_str) catch { 111 - if (err_str) |e| { 112 - log.warn("deleteMeta({s}) failed: {s}", .{ key, e.data }); 113 - e.deinit(); 114 - } 115 67 }; 116 68 } 117 69
-407
src/collectiondir.zig
··· 1 - //! collectiondir — standalone collection directory service 2 - //! 3 - //! subscribes to the relay's firehose and maintains an independent RocksDB 4 - //! collection index. no postgres — all state (cursor, backfill progress, 5 - //! collection index) lives in RocksDB. 6 - //! 7 - //! reuses collection_index.zig (unchanged) and resync.zig (unchanged, stage 2). 8 - //! backfill uses collectiondir_backfill.zig (RocksDB-backed progress). 9 - //! 10 - //! LISTEN_PORT (default 2510): HTTP API 11 - //! GET /xrpc/com.atproto.sync.listReposByCollection 12 - //! GET /_health — readiness (subscriber connected + cursor fresh) 13 - //! GET /_healthz — liveness 14 - //! GET /metrics 15 - //! GET /admin/backfill-collections — backfill status 16 - //! POST /admin/backfill-collections — trigger backfill 17 - //! 18 - //! env: 19 - //! RELAY_URL — relay firehose host (e.g. "zlay" for in-cluster) 20 - //! RELAY_PORT — relay firehose port (default 3000) 21 - //! RELAY_TLS — "true" for wss:// (default "false") 22 - //! COLLECTION_INDEX_DIR — RocksDB data directory 23 - //! LISTEN_PORT — HTTP listen port (default 2510) 24 - //! RELAY_ADMIN_PASSWORD — admin auth token 25 - 26 - const std = @import("std"); 27 - const http = std.http; 28 - const collection_index_mod = @import("collection_index.zig"); 29 - const backfill_mod = @import("collectiondir_backfill.zig"); 30 - const collectiondir_sub = @import("collectiondir_subscriber.zig"); 31 - 32 - const log = std.log.scoped(.collectiondir); 33 - 34 - /// 8 MiB stacks — matches relay's default_stack_size for ReleaseSafe 35 - pub const default_stack_size = 8 * 1024 * 1024; 36 - 37 - var shutdown_flag: std.atomic.Value(bool) = .{ .raw = false }; 38 - 39 - pub fn main() !void { 40 - const allocator = std.heap.c_allocator; 41 - 42 - // parse config from env 43 - const listen_port = parseEnvInt(u16, "LISTEN_PORT", 2510); 44 - const relay_url = std.posix.getenv("RELAY_URL") orelse "zlay"; 45 - const relay_port = parseEnvInt(u16, "RELAY_PORT", 3000); 46 - const use_tls = if (std.posix.getenv("RELAY_TLS")) |v| std.mem.eql(u8, v, "true") else false; 47 - const ci_dir = std.posix.getenv("COLLECTION_INDEX_DIR") orelse "data/collection-index"; 48 - 49 - // install signal handlers 50 - installSignalHandlers(); 51 - 52 - // init RocksDB collection index (all state lives here) 53 - var ci = collection_index_mod.CollectionIndex.open(allocator, ci_dir) catch |err| { 54 - log.err("failed to open collection index at {s}: {s}", .{ ci_dir, @errorName(err) }); 55 - return err; 56 - }; 57 - defer ci.deinit(); 58 - 59 - // init backfiller (RocksDB-backed progress) 60 - var backfiller = backfill_mod.Backfiller.init(allocator, &ci); 61 - 62 - // init firehose subscriber (cursor in RocksDB) 63 - var subscriber = collectiondir_sub.Subscriber.init( 64 - allocator, 65 - relay_url, 66 - relay_port, 67 - use_tls, 68 - &ci, 69 - &shutdown_flag, 70 - ); 71 - 72 - // start subscriber thread 73 - const sub_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, collectiondir_sub.Subscriber.run, .{&subscriber}); 74 - 75 - // start HTTP server 76 - log.info("collectiondir listening on :{d}", .{listen_port}); 77 - log.info("relay upstream: {s}:{d} (tls={s})", .{ relay_url, relay_port, if (use_tls) "true" else "false" }); 78 - log.info("collection index: {s}", .{ci_dir}); 79 - 80 - var http_ctx = HttpContext{ 81 - .collection_index = &ci, 82 - .backfiller = &backfiller, 83 - .subscriber = &subscriber, 84 - }; 85 - 86 - const address = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, listen_port); 87 - var server = address.listen(.{ .reuse_address = true }) catch |err| { 88 - log.err("failed to listen on :{d}: {s}", .{ listen_port, @errorName(err) }); 89 - return err; 90 - }; 91 - 92 - while (!shutdown_flag.load(.acquire)) { 93 - const conn = server.accept() catch |err| { 94 - if (shutdown_flag.load(.acquire)) break; 95 - log.debug("accept error: {s}", .{@errorName(err)}); 96 - continue; 97 - }; 98 - const timeout = std.posix.timeval{ .sec = 5, .usec = 0 }; 99 - std.posix.setsockopt(conn.stream.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, std.mem.asBytes(&timeout)) catch {}; 100 - handleConnection(conn.stream, &http_ctx); 101 - } 102 - 103 - log.info("shutdown signal received, stopping...", .{}); 104 - server.stream.close(); 105 - sub_thread.join(); 106 - log.info("collectiondir stopped cleanly", .{}); 107 - } 108 - 109 - const HttpContext = struct { 110 - collection_index: *collection_index_mod.CollectionIndex, 111 - backfiller: *backfill_mod.Backfiller, 112 - subscriber: *collectiondir_sub.Subscriber, 113 - }; 114 - 115 - fn handleConnection(stream: std.net.Stream, ctx: *HttpContext) void { 116 - defer stream.close(); 117 - 118 - var recv_buf: [4096]u8 = undefined; 119 - var send_buf: [4096]u8 = undefined; 120 - var connection_reader = stream.reader(&recv_buf); 121 - var connection_writer = stream.writer(&send_buf); 122 - var server = http.Server.init(connection_reader.interface(), &connection_writer.interface); 123 - 124 - var request = server.receiveHead() catch return; 125 - const target = request.head.target; 126 - const method = request.head.method; 127 - 128 - const qmark = std.mem.indexOfScalar(u8, target, '?'); 129 - const path = target[0..(qmark orelse target.len)]; 130 - const query = if (qmark) |q| target[q + 1 ..] else ""; 131 - 132 - if (method == .GET) { 133 - if (std.mem.eql(u8, path, "/_healthz")) { 134 - respond(&request, .ok, "application/json", "{\"status\":\"ok\"}"); 135 - } else if (std.mem.eql(u8, path, "/_health")) { 136 - handleReadiness(&request, ctx); 137 - } else if (std.mem.eql(u8, path, "/metrics")) { 138 - var metrics_buf: [8192]u8 = undefined; 139 - const body = ctx.subscriber.formatMetrics(&metrics_buf); 140 - respond(&request, .ok, "text/plain; version=0.0.4; charset=utf-8", body); 141 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 142 - handleListReposByCollection(&request, query, ctx.collection_index); 143 - } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 144 - if (!checkAdmin(&request)) return; 145 - handleBackfillStatus(&request, ctx.backfiller); 146 - } else { 147 - respond(&request, .not_found, "text/plain", "not found"); 148 - } 149 - } else if (method == .POST) { 150 - if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 151 - if (!checkAdmin(&request)) return; 152 - handleBackfillTrigger(&request, query, ctx.backfiller); 153 - } else { 154 - respond(&request, .not_found, "text/plain", "not found"); 155 - } 156 - } else { 157 - respond(&request, .method_not_allowed, "text/plain", "method not allowed"); 158 - } 159 - } 160 - 161 - /// readiness: subscriber connected + cursor advancing 162 - fn handleReadiness(request: *http.Server.Request, ctx: *HttpContext) void { 163 - const stats = &ctx.subscriber.stats; 164 - const connected = stats.connected.load(.monotonic); 165 - const last_event_time = stats.last_event_time.load(.monotonic); 166 - const now = std.time.timestamp(); 167 - 168 - // subscriber must be connected and have received an event within the last 60s 169 - // (allows for startup grace — last_event_time == 0 means never received) 170 - const stale_threshold = 60; 171 - const cursor_fresh = connected and (last_event_time == 0 or (now - last_event_time) < stale_threshold); 172 - 173 - if (cursor_fresh) { 174 - var buf: [256]u8 = undefined; 175 - const body = std.fmt.bufPrint(&buf, "{{\"status\":\"ok\",\"connected\":{},\"last_seq\":{d}}}", .{ 176 - connected, stats.last_seq.load(.monotonic), 177 - }) catch "{\"status\":\"ok\"}"; 178 - respond(request, .ok, "application/json", body); 179 - } else { 180 - var buf: [256]u8 = undefined; 181 - const body = std.fmt.bufPrint(&buf, "{{\"status\":\"unhealthy\",\"connected\":{},\"cursor_fresh\":{}}}", .{ 182 - connected, cursor_fresh, 183 - }) catch "{\"status\":\"unhealthy\"}"; 184 - respond(request, .service_unavailable, "application/json", body); 185 - } 186 - } 187 - 188 - fn handleListReposByCollection( 189 - request: *http.Server.Request, 190 - query: []const u8, 191 - ci: *collection_index_mod.CollectionIndex, 192 - ) void { 193 - const collection = queryParam(query, "collection") orelse { 194 - respond(request, .bad_request, "application/json", "{\"error\":\"BadRequest\",\"message\":\"collection parameter required\"}"); 195 - return; 196 - }; 197 - 198 - if (collection.len == 0 or !std.mem.containsAtLeast(u8, collection, 1, ".")) { 199 - respond(request, .bad_request, "application/json", "{\"error\":\"BadRequest\",\"message\":\"invalid collection NSID\"}"); 200 - return; 201 - } 202 - 203 - const limit_str = queryParam(query, "limit") orelse "500"; 204 - const limit = std.fmt.parseInt(usize, limit_str, 10) catch { 205 - respond(request, .bad_request, "application/json", "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 206 - return; 207 - }; 208 - if (limit < 1 or limit > 2000) { 209 - respond(request, .bad_request, "application/json", "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..2000\"}"); 210 - return; 211 - } 212 - 213 - var cursor_buf: [256]u8 = undefined; 214 - const cursor_did = queryParamDecoded(query, "cursor", &cursor_buf); 215 - 216 - var did_buf: [65536]u8 = undefined; 217 - const ci_result = ci.listReposByCollection(collection, limit, cursor_did, &did_buf) catch { 218 - respond(request, .internal_server_error, "application/json", "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); 219 - return; 220 - }; 221 - 222 - var buf: [65536]u8 = undefined; 223 - var fbs = std.io.fixedBufferStream(&buf); 224 - const w = fbs.writer(); 225 - 226 - w.writeAll("{\"repos\":[") catch return; 227 - for (0..ci_result.count) |i| { 228 - if (i > 0) w.writeByte(',') catch return; 229 - w.writeAll("{\"did\":\"") catch return; 230 - w.writeAll(ci_result.getDid(i)) catch return; 231 - w.writeAll("\"}") catch return; 232 - } 233 - w.writeByte(']') catch return; 234 - 235 - if (ci_result.last_did) |last| { 236 - if (ci_result.count >= limit) { 237 - w.writeAll(",\"cursor\":\"") catch return; 238 - w.writeAll(last) catch return; 239 - w.writeAll("\"") catch return; 240 - } 241 - } 242 - 243 - w.writeByte('}') catch return; 244 - respond(request, .ok, "application/json", fbs.getWritten()); 245 - } 246 - 247 - fn handleBackfillStatus(request: *http.Server.Request, backfiller: *backfill_mod.Backfiller) void { 248 - const body = backfiller.getStatus(backfiller.allocator) catch { 249 - respond(request, .internal_server_error, "application/json", "{\"error\":\"failed to query backfill status\"}"); 250 - return; 251 - }; 252 - defer backfiller.allocator.free(body); 253 - respond(request, .ok, "application/json", body); 254 - } 255 - 256 - fn handleBackfillTrigger(request: *http.Server.Request, query: []const u8, backfiller: *backfill_mod.Backfiller) void { 257 - const source = queryParam(query, "source") orelse "bsky.network"; 258 - backfiller.start(source) catch |err| { 259 - switch (err) { 260 - error.AlreadyRunning => { 261 - respond(request, .conflict, "application/json", "{\"error\":\"backfill already in progress\"}"); 262 - }, 263 - else => { 264 - respond(request, .internal_server_error, "application/json", "{\"error\":\"failed to start backfill\"}"); 265 - }, 266 - } 267 - return; 268 - }; 269 - var buf: [256]u8 = undefined; 270 - const resp_body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { 271 - respond(request, .ok, "application/json", "{\"status\":\"started\"}"); 272 - return; 273 - }; 274 - respond(request, .ok, "application/json", resp_body); 275 - } 276 - 277 - // --- auth --- 278 - 279 - fn checkAdmin(request: *http.Server.Request) bool { 280 - const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { 281 - respond(request, .forbidden, "application/json", "{\"error\":\"admin endpoint not configured\"}"); 282 - return false; 283 - }; 284 - 285 - var it = request.iterateHeaders(); 286 - while (it.next()) |header| { 287 - if (std.ascii.eqlIgnoreCase(header.name, "authorization")) { 288 - const bearer_prefix = "Bearer "; 289 - if (!std.mem.startsWith(u8, header.value, bearer_prefix)) { 290 - respond(request, .unauthorized, "application/json", "{\"error\":\"invalid authorization scheme\"}"); 291 - return false; 292 - } 293 - const token = header.value[bearer_prefix.len..]; 294 - if (!std.mem.eql(u8, token, admin_pw)) { 295 - respond(request, .unauthorized, "application/json", "{\"error\":\"invalid token\"}"); 296 - return false; 297 - } 298 - return true; 299 - } 300 - } 301 - 302 - respond(request, .unauthorized, "application/json", "{\"error\":\"missing authorization header\"}"); 303 - return false; 304 - } 305 - 306 - // --- response helpers --- 307 - 308 - fn respond(request: *http.Server.Request, status: http.Status, content_type: []const u8, body: []const u8) void { 309 - request.respond(body, .{ 310 - .status = status, 311 - .keep_alive = false, 312 - .extra_headers = &.{ 313 - .{ .name = "content-type", .value = content_type }, 314 - .{ .name = "server", .value = "collectiondir (zlay)" }, 315 - }, 316 - }) catch {}; 317 - } 318 - 319 - // --- query string helpers --- 320 - 321 - fn queryParam(query: []const u8, name: []const u8) ?[]const u8 { 322 - if (query.len == 0) return null; 323 - var iter = std.mem.splitScalar(u8, query, '&'); 324 - while (iter.next()) |pair| { 325 - const eq = std.mem.indexOfScalar(u8, pair, '=') orelse continue; 326 - if (std.mem.eql(u8, pair[0..eq], name)) { 327 - return pair[eq + 1 ..]; 328 - } 329 - } 330 - return null; 331 - } 332 - 333 - fn queryParamDecoded(query: []const u8, name: []const u8, buf: []u8) ?[]const u8 { 334 - const raw = queryParam(query, name) orelse return null; 335 - var i: usize = 0; 336 - var out: usize = 0; 337 - while (i < raw.len) { 338 - if (raw[i] == '%' and i + 2 < raw.len) { 339 - const hi = hexVal(raw[i + 1]) orelse { 340 - if (out >= buf.len) return null; 341 - buf[out] = raw[i]; 342 - out += 1; 343 - i += 1; 344 - continue; 345 - }; 346 - const lo = hexVal(raw[i + 2]) orelse { 347 - if (out >= buf.len) return null; 348 - buf[out] = raw[i]; 349 - out += 1; 350 - i += 1; 351 - continue; 352 - }; 353 - if (out >= buf.len) return null; 354 - buf[out] = (@as(u8, hi) << 4) | @as(u8, lo); 355 - out += 1; 356 - i += 3; 357 - } else if (raw[i] == '+') { 358 - if (out >= buf.len) return null; 359 - buf[out] = ' '; 360 - out += 1; 361 - i += 1; 362 - } else { 363 - if (out >= buf.len) return null; 364 - buf[out] = raw[i]; 365 - out += 1; 366 - i += 1; 367 - } 368 - } 369 - return buf[0..out]; 370 - } 371 - 372 - fn hexVal(c: u8) ?u4 { 373 - return switch (c) { 374 - '0'...'9' => @intCast(c - '0'), 375 - 'a'...'f' => @intCast(c - 'a' + 10), 376 - 'A'...'F' => @intCast(c - 'A' + 10), 377 - else => null, 378 - }; 379 - } 380 - 381 - // --- helpers --- 382 - 383 - fn signalHandler(_: c_int) callconv(.c) void { 384 - shutdown_flag.store(true, .release); 385 - } 386 - 387 - fn installSignalHandlers() void { 388 - const act: std.posix.Sigaction = .{ 389 - .handler = .{ .handler = signalHandler }, 390 - .mask = std.posix.sigemptyset(), 391 - .flags = 0, 392 - }; 393 - std.posix.sigaction(std.posix.SIG.INT, &act, null); 394 - std.posix.sigaction(std.posix.SIG.TERM, &act, null); 395 - 396 - const ignore_act: std.posix.Sigaction = .{ 397 - .handler = .{ .handler = std.posix.SIG.IGN }, 398 - .mask = std.posix.sigemptyset(), 399 - .flags = 0, 400 - }; 401 - std.posix.sigaction(std.posix.SIG.PIPE, &ignore_act, null); 402 - } 403 - 404 - fn parseEnvInt(comptime T: type, key: []const u8, default: T) T { 405 - const val = std.posix.getenv(key) orelse return default; 406 - return std.fmt.parseInt(T, val, 10) catch default; 407 - }
-423
src/collectiondir_backfill.zig
··· 1 - //! collectiondir backfill — discovers collections and imports DIDs from a source relay 2 - //! 3 - //! same logic as backfill.zig but stores progress in RocksDB default CF instead 4 - //! of postgres. no external database dependency. 5 - //! 6 - //! progress keys: "bf:{collection}\0{source}" → "{cursor}\0{count}\0{0|1}" 7 - //! (cursor string, imported count, completed flag) 8 - 9 - const std = @import("std"); 10 - const http = std.http; 11 - const collection_index_mod = @import("collection_index.zig"); 12 - 13 - const Allocator = std.mem.Allocator; 14 - const log = std.log.scoped(.backfill); 15 - 16 - pub const Backfiller = struct { 17 - allocator: Allocator, 18 - collection_index: *collection_index_mod.CollectionIndex, 19 - running: std.atomic.Value(bool), 20 - thread: ?std.Thread, 21 - source: []const u8, 22 - 23 - pub fn init( 24 - allocator: Allocator, 25 - collection_index: *collection_index_mod.CollectionIndex, 26 - ) Backfiller { 27 - return .{ 28 - .allocator = allocator, 29 - .collection_index = collection_index, 30 - .running = .{ .raw = false }, 31 - .thread = null, 32 - .source = "", 33 - }; 34 - } 35 - 36 - pub fn isRunning(self: *Backfiller) bool { 37 - return self.running.load(.acquire); 38 - } 39 - 40 - pub fn start(self: *Backfiller, source: []const u8) !void { 41 - if (self.running.cmpxchgStrong(false, true, .acq_rel, .acquire) != null) { 42 - return error.AlreadyRunning; 43 - } 44 - errdefer self.running.store(false, .release); 45 - 46 - self.source = try self.allocator.dupe(u8, source); 47 - self.thread = try std.Thread.spawn(.{ .stack_size = 8 * 1024 * 1024 }, run, .{self}); 48 - } 49 - 50 - fn run(self: *Backfiller) void { 51 - defer { 52 - self.allocator.free(self.source); 53 - self.source = ""; 54 - self.thread = null; 55 - self.running.store(false, .release); 56 - } 57 - 58 - const collections = self.discoverCollections() catch |err| { 59 - log.err("collection discovery failed: {s}", .{@errorName(err)}); 60 - return; 61 - }; 62 - defer { 63 - for (collections) |c| self.allocator.free(c); 64 - self.allocator.free(collections); 65 - } 66 - 67 - log.info("discovered {d} collections to backfill from {s}", .{ collections.len, self.source }); 68 - 69 - for (collections) |collection| { 70 - self.backfillCollection(collection) catch |err| { 71 - log.warn("backfill failed for {s}: {s}", .{ collection, @errorName(err) }); 72 - }; 73 - } 74 - 75 - log.info("backfill complete", .{}); 76 - } 77 - 78 - fn discoverCollections(self: *Backfiller) ![][]const u8 { 79 - var seen: std.StringHashMapUnmanaged(void) = .{}; 80 - defer seen.deinit(self.allocator); 81 - 82 - // source 1: lexicon garden 83 - const garden = self.fetchLexiconGarden() catch |err| blk: { 84 - log.warn("lexicon garden fetch failed: {s}", .{@errorName(err)}); 85 - break :blk &[_][]const u8{}; 86 - }; 87 - defer { 88 - for (garden) |c| self.allocator.free(c); 89 - self.allocator.free(garden); 90 - } 91 - for (garden) |c| { 92 - if (!seen.contains(c)) { 93 - const duped = try self.allocator.dupe(u8, c); 94 - errdefer self.allocator.free(duped); 95 - try seen.put(self.allocator, duped, {}); 96 - } 97 - } 98 - 99 - // source 2: observed collections from RBC scan 100 - const observed = self.collection_index.listKnownCollections(self.allocator) catch |err| blk: { 101 - log.warn("RBC scan failed: {s}", .{@errorName(err)}); 102 - break :blk &[_][]const u8{}; 103 - }; 104 - defer { 105 - for (observed) |c| self.allocator.free(c); 106 - self.allocator.free(observed); 107 - } 108 - for (observed) |c| { 109 - if (!seen.contains(c)) { 110 - const duped = try self.allocator.dupe(u8, c); 111 - errdefer self.allocator.free(duped); 112 - try seen.put(self.allocator, duped, {}); 113 - } 114 - } 115 - 116 - const result = try self.allocator.alloc([]const u8, seen.count()); 117 - var i: usize = 0; 118 - var key_iter = seen.keyIterator(); 119 - while (key_iter.next()) |key| { 120 - result[i] = key.*; 121 - i += 1; 122 - } 123 - return result; 124 - } 125 - 126 - fn fetchLexiconGarden(self: *Backfiller) ![][]const u8 { 127 - var client: http.Client = .{ .allocator = self.allocator }; 128 - defer client.deinit(); 129 - 130 - var aw: std.Io.Writer.Allocating = .init(self.allocator); 131 - defer aw.deinit(); 132 - 133 - const result = client.fetch(.{ 134 - .location = .{ .url = "https://lexicon.garden/llms.txt" }, 135 - .response_writer = &aw.writer, 136 - .method = .GET, 137 - }) catch return error.FetchFailed; 138 - 139 - if (result.status != .ok) return error.FetchFailed; 140 - 141 - const body = aw.written(); 142 - 143 - var nsids: std.ArrayListUnmanaged([]const u8) = .{}; 144 - defer nsids.deinit(self.allocator); 145 - 146 - var lines = std.mem.splitScalar(u8, body, '\n'); 147 - while (lines.next()) |line| { 148 - const backtick_start = std.mem.indexOf(u8, line, "- [`") orelse continue; 149 - const nsid_start = backtick_start + 4; 150 - const rest = line[nsid_start..]; 151 - const backtick_end = std.mem.indexOf(u8, rest, "`](") orelse continue; 152 - const nsid = rest[0..backtick_end]; 153 - if (!std.mem.containsAtLeast(u8, nsid, 1, ".")) continue; 154 - 155 - const duped = try self.allocator.dupe(u8, nsid); 156 - errdefer self.allocator.free(duped); 157 - try nsids.append(self.allocator, duped); 158 - } 159 - 160 - log.info("lexicon garden: found {d} NSIDs", .{nsids.items.len}); 161 - return try nsids.toOwnedSlice(self.allocator); 162 - } 163 - 164 - // --- progress storage in RocksDB default CF --- 165 - // key: "bf:{collection}\0{source}" 166 - // value: "{cursor}\0{count}\0{0|1}" (cursor, imported count, completed) 167 - 168 - fn progressKey(self: *const Backfiller, collection: []const u8, buf: []u8) ?[]const u8 { 169 - const prefix = "bf:"; 170 - const needed = prefix.len + collection.len + 1 + self.source.len; 171 - if (needed > buf.len) return null; 172 - @memcpy(buf[0..prefix.len], prefix); 173 - @memcpy(buf[prefix.len..][0..collection.len], collection); 174 - buf[prefix.len + collection.len] = 0; 175 - @memcpy(buf[prefix.len + collection.len + 1 ..][0..self.source.len], self.source); 176 - return buf[0..needed]; 177 - } 178 - 179 - const Progress = struct { 180 - cursor: []const u8, 181 - imported: i64, 182 - completed: bool, 183 - /// rocksdb-owned memory — caller must call deinit() when done 184 - _data: @import("rocksdb").Data, 185 - 186 - pub fn deinit(self: Progress) void { 187 - self._data.deinit(); 188 - } 189 - }; 190 - 191 - fn loadProgress(self: *Backfiller, collection: []const u8) ?Progress { 192 - var key_buf: [512]u8 = undefined; 193 - const key = self.progressKey(collection, &key_buf) orelse return null; 194 - const data = self.collection_index.getMeta(key) orelse return null; 195 - const val = data.data; 196 - 197 - // parse: "{cursor}\0{count}\0{0|1}" 198 - const sep1 = std.mem.indexOfScalar(u8, val, 0) orelse { 199 - data.deinit(); 200 - return null; 201 - }; 202 - const rest = val[sep1 + 1 ..]; 203 - const sep2 = std.mem.indexOfScalar(u8, rest, 0) orelse { 204 - data.deinit(); 205 - return null; 206 - }; 207 - 208 - return .{ 209 - .cursor = val[0..sep1], 210 - .imported = std.fmt.parseInt(i64, rest[0..sep2], 10) catch 0, 211 - .completed = rest.len > sep2 + 1 and rest[sep2 + 1] == '1', 212 - ._data = data, 213 - }; 214 - } 215 - 216 - fn saveProgress(self: *Backfiller, collection: []const u8, cursor: []const u8, imported: i64, completed: bool) void { 217 - var key_buf: [512]u8 = undefined; 218 - const key = self.progressKey(collection, &key_buf) orelse return; 219 - 220 - var val_buf: [512]u8 = undefined; 221 - var pos: usize = 0; 222 - if (cursor.len > 0) { 223 - @memcpy(val_buf[0..cursor.len], cursor); 224 - pos = cursor.len; 225 - } 226 - val_buf[pos] = 0; 227 - pos += 1; 228 - const count_str = std.fmt.bufPrint(val_buf[pos..], "{d}", .{imported}) catch return; 229 - pos += count_str.len; 230 - val_buf[pos] = 0; 231 - pos += 1; 232 - val_buf[pos] = if (completed) '1' else '0'; 233 - pos += 1; 234 - 235 - self.collection_index.putMeta(key, val_buf[0..pos]); 236 - } 237 - 238 - fn backfillCollection(self: *Backfiller, collection: []const u8) !void { 239 - // check saved progress 240 - var cursor: ?[]const u8 = null; 241 - defer if (cursor) |c| self.allocator.free(c); 242 - var imported: i64 = 0; 243 - 244 - if (self.loadProgress(collection)) |progress| { 245 - defer progress.deinit(); 246 - if (progress.completed) return; // already done 247 - imported = progress.imported; 248 - if (progress.cursor.len > 0) { 249 - cursor = try self.allocator.dupe(u8, progress.cursor); 250 - log.info("{s}: resuming from cursor (imported {d} so far)", .{ collection, imported }); 251 - } 252 - } 253 - 254 - var client: http.Client = .{ .allocator = self.allocator }; 255 - defer client.deinit(); 256 - 257 - var page_count: usize = 0; 258 - while (true) { 259 - const fetch_result = self.fetchPage(&client, collection, cursor) catch |err| { 260 - log.warn("{s}: fetch page failed: {s}", .{ collection, @errorName(err) }); 261 - break; 262 - }; 263 - defer { 264 - for (fetch_result.dids) |d| self.allocator.free(d); 265 - self.allocator.free(fetch_result.dids); 266 - if (fetch_result.next_cursor) |nc| self.allocator.free(nc); 267 - } 268 - 269 - for (fetch_result.dids) |did| { 270 - self.collection_index.addCollection(did, collection) catch {}; 271 - imported += 1; 272 - } 273 - 274 - page_count += 1; 275 - 276 - const new_cursor = fetch_result.next_cursor orelse ""; 277 - self.saveProgress(collection, new_cursor, imported, false); 278 - 279 - if (fetch_result.next_cursor) |nc| { 280 - if (cursor) |old| self.allocator.free(old); 281 - cursor = self.allocator.dupe(u8, nc) catch break; 282 - std.posix.nanosleep(0, 100 * std.time.ns_per_ms); 283 - } else { 284 - self.saveProgress(collection, "", imported, true); 285 - log.info("{s}: complete ({d} DIDs, {d} pages)", .{ collection, imported, page_count }); 286 - break; 287 - } 288 - } 289 - } 290 - 291 - fn fetchPage(self: *Backfiller, client: *http.Client, collection: []const u8, cursor: ?[]const u8) !FetchResult { 292 - var url_buf: [1024]u8 = undefined; 293 - const url = if (cursor) |c| 294 - std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000&cursor={s}", .{ self.source, collection, c }) catch return error.UrlTooLong 295 - else 296 - std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000", .{ self.source, collection }) catch return error.UrlTooLong; 297 - 298 - var aw: std.Io.Writer.Allocating = .init(self.allocator); 299 - defer aw.deinit(); 300 - 301 - const result = client.fetch(.{ 302 - .location = .{ .url = url }, 303 - .response_writer = &aw.writer, 304 - .method = .GET, 305 - }) catch return error.FetchFailed; 306 - 307 - if (result.status != .ok) return error.FetchFailed; 308 - 309 - const body = aw.written(); 310 - 311 - const parsed = std.json.parseFromSlice(ListReposResponse, self.allocator, body, .{ .ignore_unknown_fields = true }) catch return error.ParseFailed; 312 - defer parsed.deinit(); 313 - 314 - const repos = parsed.value.repos orelse return .{ 315 - .dids = try self.allocator.alloc([]const u8, 0), 316 - .next_cursor = null, 317 - }; 318 - 319 - var dids: std.ArrayListUnmanaged([]const u8) = .{}; 320 - defer dids.deinit(self.allocator); 321 - 322 - for (repos) |repo| { 323 - const duped = self.allocator.dupe(u8, repo.did) catch continue; 324 - dids.append(self.allocator, duped) catch { 325 - self.allocator.free(duped); 326 - continue; 327 - }; 328 - } 329 - 330 - const next_cursor = if (parsed.value.cursor) |c| 331 - self.allocator.dupe(u8, c) catch null 332 - else 333 - null; 334 - 335 - return .{ 336 - .dids = dids.toOwnedSlice(self.allocator) catch return error.OutOfMemory, 337 - .next_cursor = next_cursor, 338 - }; 339 - } 340 - 341 - const FetchResult = struct { 342 - dids: [][]const u8, 343 - next_cursor: ?[]const u8, 344 - }; 345 - 346 - const ListReposResponse = struct { 347 - repos: ?[]const RepoEntry = null, 348 - cursor: ?[]const u8 = null, 349 - }; 350 - 351 - const RepoEntry = struct { 352 - did: []const u8, 353 - }; 354 - 355 - /// return status summary for the admin endpoint 356 - pub fn getStatus(self: *Backfiller, allocator: Allocator) ![]u8 { 357 - var list: std.ArrayListUnmanaged(u8) = .{}; 358 - defer list.deinit(allocator); 359 - const w = list.writer(allocator); 360 - 361 - var total: usize = 0; 362 - var completed: usize = 0; 363 - var total_imported: i64 = 0; 364 - 365 - // scan all bf: keys in the default CF 366 - const ci = self.collection_index; 367 - var err_str: ?@import("rocksdb").Data = null; 368 - var iter = ci.db.iterator(ci.default, .forward, "bf:"); 369 - defer iter.deinit(); 370 - 371 - var first_detail = true; 372 - var details: std.ArrayListUnmanaged(u8) = .{}; 373 - defer details.deinit(allocator); 374 - const dw = details.writer(allocator); 375 - 376 - while (iter.next(&err_str) catch null) |entry| { 377 - const key_data = entry[0].data; 378 - if (!std.mem.startsWith(u8, key_data, "bf:")) break; 379 - 380 - const val_data = entry[1].data; 381 - 382 - // parse key: "bf:{collection}\0{source}" 383 - const key_rest = key_data[3..]; // skip "bf:" 384 - const sep = std.mem.indexOfScalar(u8, key_rest, 0) orelse continue; 385 - const collection = key_rest[0..sep]; 386 - const source = key_rest[sep + 1 ..]; 387 - 388 - // parse value: "{cursor}\0{count}\0{0|1}" 389 - const val_sep1 = std.mem.indexOfScalar(u8, val_data, 0) orelse continue; 390 - const val_rest = val_data[val_sep1 + 1 ..]; 391 - const val_sep2 = std.mem.indexOfScalar(u8, val_rest, 0) orelse continue; 392 - const count = std.fmt.parseInt(i64, val_rest[0..val_sep2], 10) catch 0; 393 - const is_completed = val_rest.len > val_sep2 + 1 and val_rest[val_sep2 + 1] == '1'; 394 - 395 - total += 1; 396 - if (is_completed) completed += 1; 397 - total_imported += count; 398 - 399 - if (!first_detail) dw.writeByte(',') catch {}; 400 - first_detail = false; 401 - 402 - std.fmt.format(dw, "{{\"collection\":\"{s}\",\"source\":\"{s}\",\"imported\":{d},\"completed\":{}}}", .{ 403 - collection, 404 - source, 405 - count, 406 - is_completed, 407 - }) catch {}; 408 - } 409 - 410 - std.fmt.format(w, "{{\"running\":{},\"total\":{d},\"completed\":{d},\"in_progress\":{d},\"total_imported\":{d},\"collections\":[", .{ 411 - self.isRunning(), 412 - total, 413 - completed, 414 - total - completed, 415 - total_imported, 416 - }) catch return error.FormatError; 417 - 418 - w.writeAll(details.items) catch {}; 419 - w.writeAll("]}") catch {}; 420 - 421 - return try list.toOwnedSlice(allocator); 422 - } 423 - };
-314
src/collectiondir_subscriber.zig
··· 1 - //! collectiondir firehose subscriber — consumes relay's subscribeRepos stream 2 - //! 3 - //! simpler than the relay's subscriber.zig — no validation, no host management, 4 - //! no rate limiting. just CBOR frame decode → collection index updates. 5 - //! 6 - //! on #commit: extract ops, call trackCommitOps 7 - //! on #account: if inactive, removeAll from index 8 - //! on #sync: count only (resync needs PDS hostname — requires DID doc resolution, stage 2) 9 - //! 10 - //! cursor persisted to RocksDB default CF (key "cursor"), not postgres. 11 - 12 - const std = @import("std"); 13 - const websocket = @import("websocket"); 14 - const zat = @import("zat"); 15 - const collection_index_mod = @import("collection_index.zig"); 16 - 17 - const Allocator = std.mem.Allocator; 18 - const log = std.log.scoped(.collectiondir); 19 - 20 - const cursor_flush_interval_sec = 4; 21 - 22 - pub const Stats = struct { 23 - commits: std.atomic.Value(u64) = .{ .raw = 0 }, 24 - accounts: std.atomic.Value(u64) = .{ .raw = 0 }, 25 - syncs: std.atomic.Value(u64) = .{ .raw = 0 }, 26 - identities: std.atomic.Value(u64) = .{ .raw = 0 }, 27 - decode_errors: std.atomic.Value(u64) = .{ .raw = 0 }, 28 - ops_tracked: std.atomic.Value(u64) = .{ .raw = 0 }, 29 - accounts_removed: std.atomic.Value(u64) = .{ .raw = 0 }, 30 - reconnects: std.atomic.Value(u64) = .{ .raw = 0 }, 31 - last_seq: std.atomic.Value(u64) = .{ .raw = 0 }, 32 - connected: std.atomic.Value(bool) = .{ .raw = false }, 33 - last_event_time: std.atomic.Value(i64) = .{ .raw = 0 }, 34 - }; 35 - 36 - pub const Subscriber = struct { 37 - allocator: Allocator, 38 - relay_url: []const u8, 39 - relay_port: u16, 40 - use_tls: bool, 41 - collection_index: *collection_index_mod.CollectionIndex, 42 - shutdown: *std.atomic.Value(bool), 43 - stats: Stats = .{}, 44 - last_seq: ?u64 = null, 45 - last_cursor_flush: i64 = 0, 46 - 47 - pub fn init( 48 - allocator: Allocator, 49 - relay_url: []const u8, 50 - relay_port: u16, 51 - use_tls: bool, 52 - collection_index: *collection_index_mod.CollectionIndex, 53 - shutdown: *std.atomic.Value(bool), 54 - ) Subscriber { 55 - return .{ 56 - .allocator = allocator, 57 - .relay_url = relay_url, 58 - .relay_port = relay_port, 59 - .use_tls = use_tls, 60 - .collection_index = collection_index, 61 - .shutdown = shutdown, 62 - }; 63 - } 64 - 65 - /// load saved cursor from RocksDB default CF 66 - pub fn loadCursor(self: *Subscriber) void { 67 - const val = self.collection_index.getMeta("cursor") orelse return; 68 - defer val.deinit(); 69 - const seq = std.fmt.parseInt(u64, val.data, 10) catch return; 70 - if (seq > 0) { 71 - self.last_seq = seq; 72 - log.info("resuming from cursor {d}", .{seq}); 73 - } 74 - } 75 - 76 - /// save cursor to RocksDB default CF 77 - fn flushCursor(self: *Subscriber) void { 78 - const seq = self.last_seq orelse return; 79 - var buf: [20]u8 = undefined; 80 - const val = std.fmt.bufPrint(&buf, "{d}", .{seq}) catch return; 81 - self.collection_index.putMeta("cursor", val); 82 - } 83 - 84 - /// run the subscriber loop with reconnect + backoff 85 - pub fn run(self: *Subscriber) void { 86 - self.loadCursor(); 87 - 88 - var backoff: u64 = 1; 89 - const max_backoff: u64 = 60; 90 - 91 - while (!self.shutdown.load(.acquire)) { 92 - log.info("connecting to {s}:{d}...", .{ self.relay_url, self.relay_port }); 93 - 94 - if (self.connectAndRead()) { 95 - // clean disconnect (EOF) — reset backoff 96 - backoff = 1; 97 - } else |err| { 98 - if (self.shutdown.load(.acquire)) return; 99 - log.err("connection error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 100 - _ = self.stats.reconnects.fetchAdd(1, .monotonic); 101 - } 102 - 103 - if (self.shutdown.load(.acquire)) return; 104 - 105 - // flush cursor before backoff 106 - self.flushCursor(); 107 - 108 - // backoff sleep in 1s chunks 109 - var remaining: u64 = backoff; 110 - while (remaining > 0 and !self.shutdown.load(.acquire)) { 111 - std.posix.nanosleep(1, 0); 112 - remaining -= 1; 113 - } 114 - backoff = @min(backoff * 2, max_backoff); 115 - } 116 - 117 - // final cursor flush 118 - self.flushCursor(); 119 - } 120 - 121 - fn connectAndRead(self: *Subscriber) !void { 122 - var path_buf: [256]u8 = undefined; 123 - var w: std.Io.Writer = .fixed(&path_buf); 124 - 125 - try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos"); 126 - if (self.last_seq) |cursor| { 127 - try w.print("?cursor={d}", .{cursor}); 128 - } 129 - const path = w.buffered(); 130 - 131 - var client = try websocket.Client.init(self.allocator, .{ 132 - .host = self.relay_url, 133 - .port = self.relay_port, 134 - .tls = self.use_tls, 135 - .max_size = 5 * 1024 * 1024, 136 - }); 137 - defer client.deinit(); 138 - 139 - var host_header_buf: [256]u8 = undefined; 140 - const host_header = std.fmt.bufPrint( 141 - &host_header_buf, 142 - "Host: {s}\r\n", 143 - .{self.relay_url}, 144 - ) catch self.relay_url; 145 - 146 - try client.handshake(path, .{ .headers = host_header }); 147 - log.info("connected to {s}", .{self.relay_url}); 148 - self.stats.connected.store(true, .release); 149 - defer self.stats.connected.store(false, .release); 150 - 151 - var handler = FrameHandler{ .sub = self }; 152 - try client.readLoop(&handler); 153 - } 154 - 155 - pub fn formatMetrics(self: *Subscriber, buf: []u8) []const u8 { 156 - var fbs = std.io.fixedBufferStream(buf); 157 - const w = fbs.writer(); 158 - 159 - const commits = self.stats.commits.load(.monotonic); 160 - const accounts = self.stats.accounts.load(.monotonic); 161 - const syncs = self.stats.syncs.load(.monotonic); 162 - const identities = self.stats.identities.load(.monotonic); 163 - const decode_errors = self.stats.decode_errors.load(.monotonic); 164 - const ops_tracked = self.stats.ops_tracked.load(.monotonic); 165 - const accounts_removed = self.stats.accounts_removed.load(.monotonic); 166 - const reconnects = self.stats.reconnects.load(.monotonic); 167 - const last_seq = self.stats.last_seq.load(.monotonic); 168 - const connected: u64 = if (self.stats.connected.load(.monotonic)) 1 else 0; 169 - const last_event_time = self.stats.last_event_time.load(.monotonic); 170 - 171 - std.fmt.format(w, 172 - \\# HELP collectiondir_events_total events processed by type 173 - \\# TYPE collectiondir_events_total counter 174 - \\collectiondir_events_total{{type="commit"}} {d} 175 - \\collectiondir_events_total{{type="account"}} {d} 176 - \\collectiondir_events_total{{type="sync"}} {d} 177 - \\collectiondir_events_total{{type="identity"}} {d} 178 - \\# HELP collectiondir_decode_errors_total CBOR decode failures 179 - \\# TYPE collectiondir_decode_errors_total counter 180 - \\collectiondir_decode_errors_total {d} 181 - \\# HELP collectiondir_ops_tracked_total collection index ops processed 182 - \\# TYPE collectiondir_ops_tracked_total counter 183 - \\collectiondir_ops_tracked_total {d} 184 - \\# HELP collectiondir_accounts_removed_total accounts removed from index (inactive) 185 - \\# TYPE collectiondir_accounts_removed_total counter 186 - \\collectiondir_accounts_removed_total {d} 187 - \\# HELP collectiondir_reconnects_total WebSocket reconnect count 188 - \\# TYPE collectiondir_reconnects_total counter 189 - \\collectiondir_reconnects_total {d} 190 - \\# HELP collectiondir_last_seq last processed firehose sequence number 191 - \\# TYPE collectiondir_last_seq gauge 192 - \\collectiondir_last_seq {d} 193 - \\# HELP collectiondir_connected whether firehose subscriber is connected 194 - \\# TYPE collectiondir_connected gauge 195 - \\collectiondir_connected {d} 196 - \\# HELP collectiondir_last_event_time_seconds unix timestamp of last processed event 197 - \\# TYPE collectiondir_last_event_time_seconds gauge 198 - \\collectiondir_last_event_time_seconds {d} 199 - \\ 200 - , .{ 201 - commits, 202 - accounts, 203 - syncs, 204 - identities, 205 - decode_errors, 206 - ops_tracked, 207 - accounts_removed, 208 - reconnects, 209 - last_seq, 210 - connected, 211 - last_event_time, 212 - }) catch {}; 213 - 214 - return fbs.getWritten(); 215 - } 216 - }; 217 - 218 - const FrameHandler = struct { 219 - sub: *Subscriber, 220 - 221 - pub fn serverMessage(self: *FrameHandler, data: []const u8) !void { 222 - const sub = self.sub; 223 - 224 - var arena = std.heap.ArenaAllocator.init(sub.allocator); 225 - defer arena.deinit(); 226 - const alloc = arena.allocator(); 227 - 228 - // decode header 229 - const header_result = zat.cbor.decode(alloc, data) catch |err| { 230 - log.debug("frame header decode failed: {s}", .{@errorName(err)}); 231 - _ = sub.stats.decode_errors.fetchAdd(1, .monotonic); 232 - return; 233 - }; 234 - const header = header_result.value; 235 - const payload_data = data[header_result.consumed..]; 236 - 237 - const op = header.getInt("op") orelse return; 238 - if (op != 1) return; // skip error frames 239 - 240 - const frame_type = header.getString("t") orelse return; 241 - const payload = zat.cbor.decodeAll(alloc, payload_data) catch |err| { 242 - log.debug("frame payload decode failed: {s} (type={s})", .{ @errorName(err), frame_type }); 243 - _ = sub.stats.decode_errors.fetchAdd(1, .monotonic); 244 - return; 245 - }; 246 - 247 - const upstream_seq = payload.getUint("seq"); 248 - 249 - // route by frame type 250 - const is_commit = std.mem.eql(u8, frame_type, "#commit"); 251 - const is_account = std.mem.eql(u8, frame_type, "#account"); 252 - const is_sync = std.mem.eql(u8, frame_type, "#sync"); 253 - const is_identity = std.mem.eql(u8, frame_type, "#identity"); 254 - 255 - if (is_commit) { 256 - _ = sub.stats.commits.fetchAdd(1, .monotonic); 257 - 258 - const did = payload.getString("repo") orelse { 259 - if (upstream_seq) |s| sub.last_seq = s; 260 - return; 261 - }; 262 - 263 - // track collection ops — the core purpose of this service 264 - if (payload.get("ops")) |ops| { 265 - sub.collection_index.trackCommitOps(did, ops); 266 - 267 - // count individual ops for metrics 268 - switch (ops) { 269 - .array => |items| { 270 - _ = sub.stats.ops_tracked.fetchAdd(items.len, .monotonic); 271 - }, 272 - else => {}, 273 - } 274 - } 275 - } else if (is_account) { 276 - _ = sub.stats.accounts.fetchAdd(1, .monotonic); 277 - 278 - const did = payload.getString("did") orelse { 279 - if (upstream_seq) |s| sub.last_seq = s; 280 - return; 281 - }; 282 - 283 - const is_active = payload.getBool("active") orelse false; 284 - if (!is_active) { 285 - sub.collection_index.removeAll(did) catch |err| { 286 - log.debug("removeAll failed for {s}: {s}", .{ did, @errorName(err) }); 287 - }; 288 - _ = sub.stats.accounts_removed.fetchAdd(1, .monotonic); 289 - } 290 - } else if (is_sync) { 291 - _ = sub.stats.syncs.fetchAdd(1, .monotonic); 292 - // stage 1: count only — resync requires PDS hostname via DID doc resolution (stage 2) 293 - } else if (is_identity) { 294 - _ = sub.stats.identities.fetchAdd(1, .monotonic); 295 - } else { 296 - if (upstream_seq) |s| sub.last_seq = s; 297 - return; 298 - } 299 - 300 - // advance cursor 301 - if (upstream_seq) |s| { 302 - sub.last_seq = s; 303 - sub.stats.last_seq.store(s, .monotonic); 304 - sub.stats.last_event_time.store(std.time.timestamp(), .monotonic); 305 - } 306 - 307 - // periodic cursor flush 308 - const now = std.time.timestamp(); 309 - if (now - sub.last_cursor_flush >= cursor_flush_interval_sec) { 310 - sub.flushCursor(); 311 - sub.last_cursor_flush = now; 312 - } 313 - } 314 - };