atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 433 lines 16 kB view raw
1//! collection index backfill — discovers collections and imports DIDs from a source relay 2//! 3//! two collection discovery sources (unioned): 4//! 1. lexicon garden llms.txt — ~700 known NSIDs 5//! 2. RBC column family scan — firehose-observed collections already in the index 6//! 7//! progress tracked in postgres (backfill_progress table) for resumability. 8//! triggered via POST /admin/backfill-collections, status via GET. 9 10const std = @import("std"); 11const Io = std.Io; 12const http = std.http; 13const pg = @import("pg"); 14const collection_index_mod = @import("collection_index.zig"); 15const event_log_mod = @import("event_log.zig"); 16 17const Allocator = std.mem.Allocator; 18const log = std.log.scoped(.backfill); 19 20const FetchResult = struct { 21 dids: [][]const u8, 22 next_cursor: ?[]const u8, 23}; 24 25pub const Backfiller = struct { 26 allocator: Allocator, 27 collection_index: *collection_index_mod.CollectionIndex, 28 persist: *event_log_mod.DiskPersist, 29 running: std.atomic.Value(bool), 30 thread: ?std.Thread, 31 source: []const u8, 32 io: Io, 33 shutdown: *std.atomic.Value(bool), 34 35 fn db(self: *Backfiller) *pg.Pool { 36 return self.persist.db; 37 } 38 39 pub fn init( 40 allocator: Allocator, 41 collection_index: *collection_index_mod.CollectionIndex, 42 persist: *event_log_mod.DiskPersist, 43 io: Io, 44 shutdown: *std.atomic.Value(bool), 45 ) Backfiller { 46 return .{ 47 .allocator = allocator, 48 .collection_index = collection_index, 49 .persist = persist, 50 .running = .{ .raw = false }, 51 .thread = null, 52 .source = "", 53 .io = io, 54 .shutdown = shutdown, 55 }; 56 } 57 58 pub fn isRunning(self: *Backfiller) bool { 59 return self.running.load(.acquire); 60 } 61 62 /// block until any in-progress backfill thread completes. 63 /// must be called before tearing down DiskPersist or CollectionIndex. 64 pub fn waitForCompletion(self: *Backfiller) void { 65 if (self.thread) |t| { 66 t.join(); 67 self.thread = null; 68 } 69 } 70 71 /// start a backfill from the given source relay. returns error if already running. 72 pub fn start(self: *Backfiller, source: []const u8) !void { 73 if (self.running.cmpxchgStrong(false, true, .acq_rel, .acquire) != null) { 74 return error.AlreadyRunning; 75 } 76 errdefer self.running.store(false, .release); 77 78 self.source = try self.allocator.dupe(u8, source); 79 errdefer { 80 self.allocator.free(self.source); 81 self.source = ""; 82 } 83 self.thread = std.Thread.spawn(.{}, run, .{self}) catch return error.SpawnFailed; 84 } 85 86 fn run(self: *Backfiller) void { 87 defer { 88 self.allocator.free(self.source); 89 self.source = ""; 90 // note: do NOT clear self.thread here — waitForCompletion() needs 91 // the handle to join this thread before dp/ci teardown. the running 92 // flag gates start(), so a stale thread handle is harmless. 93 self.running.store(false, .release); 94 } 95 96 const pool = self.db(); 97 98 // discover collections 99 const collections = self.discoverCollections() catch |err| { 100 log.err("collection discovery failed: {s}", .{@errorName(err)}); 101 return; 102 }; 103 defer { 104 for (collections) |c| self.allocator.free(c); 105 self.allocator.free(collections); 106 } 107 108 log.info("discovered {d} collections to backfill from {s}", .{ collections.len, self.source }); 109 110 // insert progress rows (skip existing) 111 for (collections) |collection| { 112 _ = pool.exec( 113 "INSERT INTO backfill_progress (collection, source) VALUES ($1, $2) ON CONFLICT (collection, source) DO NOTHING", 114 .{ collection, self.source }, 115 ) catch |err| { 116 log.warn("failed to insert progress for {s}: {s}", .{ collection, @errorName(err) }); 117 }; 118 } 119 120 // backfill each collection 121 for (collections) |collection| { 122 if (self.shutdown.load(.acquire)) { 123 log.info("backfill interrupted by shutdown", .{}); 124 return; 125 } 126 self.backfillCollection(collection) catch |err| { 127 log.warn("backfill failed for {s}: {s}", .{ collection, @errorName(err) }); 128 }; 129 } 130 131 log.info("backfill complete", .{}); 132 } 133 134 fn discoverCollections(self: *Backfiller) ![][]const u8 { 135 var seen: std.StringHashMapUnmanaged(void) = .empty; 136 defer seen.deinit(self.allocator); 137 138 // source 1: lexicon garden 139 const garden = self.fetchLexiconGarden() catch |err| blk: { 140 log.warn("lexicon garden fetch failed: {s}", .{@errorName(err)}); 141 break :blk &[_][]const u8{}; 142 }; 143 defer { 144 for (garden) |c| self.allocator.free(c); 145 self.allocator.free(garden); 146 } 147 for (garden) |c| { 148 if (!seen.contains(c)) { 149 const duped = try self.allocator.dupe(u8, c); 150 errdefer self.allocator.free(duped); 151 try seen.put(self.allocator, duped, {}); 152 } 153 } 154 155 // source 2: observed collections from RBC scan 156 const observed = self.collection_index.listKnownCollections(self.allocator) catch |err| blk: { 157 log.warn("RBC scan failed: {s}", .{@errorName(err)}); 158 break :blk &[_][]const u8{}; 159 }; 160 defer { 161 for (observed) |c| self.allocator.free(c); 162 self.allocator.free(observed); 163 } 164 for (observed) |c| { 165 if (!seen.contains(c)) { 166 const duped = try self.allocator.dupe(u8, c); 167 errdefer self.allocator.free(duped); 168 try seen.put(self.allocator, duped, {}); 169 } 170 } 171 172 // collect into result 173 const result = try self.allocator.alloc([]const u8, seen.count()); 174 var i: usize = 0; 175 var key_iter = seen.keyIterator(); 176 while (key_iter.next()) |key| { 177 result[i] = key.*; 178 i += 1; 179 } 180 181 return result; 182 } 183 184 /// fetch NSIDs from https://lexicon.garden/llms.txt 185 /// parses lines matching `- [`<nsid>`](` 186 fn fetchLexiconGarden(self: *Backfiller) ![][]const u8 { 187 var client: http.Client = .{ .allocator = self.allocator, .io = self.io }; 188 defer client.deinit(); 189 190 var aw: std.Io.Writer.Allocating = .init(self.allocator); 191 defer aw.deinit(); 192 193 const result = client.fetch(.{ 194 .location = .{ .url = "https://lexicon.garden/llms.txt" }, 195 .response_writer = &aw.writer, 196 .method = .GET, 197 }) catch return error.FetchFailed; 198 199 if (result.status != .ok) return error.FetchFailed; 200 201 const body = aw.toArrayList().items; 202 203 var nsids: std.ArrayListUnmanaged([]const u8) = .empty; 204 defer nsids.deinit(self.allocator); 205 206 // parse lines like: - [`app.bsky.feed.post`]( 207 var lines = std.mem.splitScalar(u8, body, '\n'); 208 while (lines.next()) |line| { 209 // look for pattern: - [`<nsid>`]( 210 const backtick_start = std.mem.indexOf(u8, line, "- [`") orelse continue; 211 const nsid_start = backtick_start + 4; // skip "- [`" 212 const rest = line[nsid_start..]; 213 const backtick_end = std.mem.indexOf(u8, rest, "`](") orelse continue; 214 const nsid = rest[0..backtick_end]; 215 216 // basic NSID validation: must contain at least one dot 217 if (!std.mem.containsAtLeast(u8, nsid, 1, ".")) continue; 218 // skip obviously non-record types (but include them anyway — they'll return empty) 219 220 const duped = try self.allocator.dupe(u8, nsid); 221 errdefer self.allocator.free(duped); 222 try nsids.append(self.allocator, duped); 223 } 224 225 log.info("lexicon garden: found {d} NSIDs", .{nsids.items.len}); 226 return try nsids.toOwnedSlice(self.allocator); 227 } 228 229 fn backfillCollection(self: *Backfiller, collection: []const u8) !void { 230 const pool = self.db(); 231 232 // single query: check completion, get cursor + count for resume 233 var cursor: ?[]const u8 = null; 234 defer if (cursor) |c| self.allocator.free(c); 235 var imported: i64 = 0; 236 { 237 var row = (pool.rowUnsafe( 238 "SELECT completed_at IS NOT NULL, cursor, imported_count FROM backfill_progress WHERE collection = $1 AND source = $2", 239 .{ collection, self.source }, 240 ) catch return error.DatabaseError) orelse return; 241 defer row.deinit() catch {}; 242 243 if (row.get(bool, 0)) return; // already done 244 245 const saved_cursor = row.get([]const u8, 1); 246 imported = row.get(i64, 2); 247 248 if (saved_cursor.len > 0) { 249 cursor = try self.allocator.dupe(u8, saved_cursor); 250 log.info("{s}: resuming from cursor (imported {d} so far)", .{ collection, imported }); 251 } 252 } 253 254 // reuse one HTTP client across all pages for this collection 255 var client: http.Client = .{ .allocator = self.allocator, .io = self.io }; 256 defer client.deinit(); 257 258 var page_count: usize = 0; 259 while (!self.shutdown.load(.acquire)) { 260 const fetch_result = self.fetchPage(&client, collection, cursor) catch |err| { 261 log.warn("{s}: fetch page failed: {s}", .{ collection, @errorName(err) }); 262 break; 263 }; 264 defer { 265 for (fetch_result.dids) |d| self.allocator.free(d); 266 self.allocator.free(fetch_result.dids); 267 if (fetch_result.next_cursor) |nc| self.allocator.free(nc); 268 } 269 270 // add each DID to collection index 271 for (fetch_result.dids) |did| { 272 self.collection_index.addCollection(did, collection) catch {}; 273 imported += 1; 274 } 275 276 page_count += 1; 277 278 // update cursor in progress table 279 const new_cursor = fetch_result.next_cursor orelse ""; 280 _ = pool.exec( 281 "UPDATE backfill_progress SET cursor = $1, imported_count = $2 WHERE collection = $3 AND source = $4", 282 .{ new_cursor, imported, collection, self.source }, 283 ) catch {}; 284 285 if (fetch_result.next_cursor) |nc| { 286 // advance cursor 287 if (cursor) |old| self.allocator.free(old); 288 cursor = self.allocator.dupe(u8, nc) catch break; 289 290 // brief pause between pages 291 self.io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch {}; 292 } else { 293 // no more pages — mark complete 294 _ = pool.exec( 295 "UPDATE backfill_progress SET completed_at = now(), cursor = '', imported_count = $1 WHERE collection = $2 AND source = $3", 296 .{ imported, collection, self.source }, 297 ) catch {}; 298 log.info("{s}: complete ({d} DIDs, {d} pages)", .{ collection, imported, page_count }); 299 break; 300 } 301 } 302 } 303 304 fn fetchPage(self: *Backfiller, client: *http.Client, collection: []const u8, cursor: ?[]const u8) !FetchResult { 305 // build URL 306 var url_buf: [1024]u8 = undefined; 307 const url = if (cursor) |c| 308 std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000&cursor={s}", .{ self.source, collection, c }) catch return error.UrlTooLong 309 else 310 std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000", .{ self.source, collection }) catch return error.UrlTooLong; 311 312 var aw: std.Io.Writer.Allocating = .init(self.allocator); 313 defer aw.deinit(); 314 315 const result = client.fetch(.{ 316 .location = .{ .url = url }, 317 .response_writer = &aw.writer, 318 .method = .GET, 319 }) catch return error.FetchFailed; 320 321 if (result.status != .ok) return error.FetchFailed; 322 323 const body = aw.toArrayList().items; 324 325 // parse: {"repos": [{"did": "..."}, ...], "cursor": "..."} 326 const parsed = std.json.parseFromSlice(ListReposResponse, self.allocator, body, .{ .ignore_unknown_fields = true }) catch return error.ParseFailed; 327 defer parsed.deinit(); 328 329 const repos = parsed.value.repos orelse return .{ 330 .dids = try self.allocator.alloc([]const u8, 0), 331 .next_cursor = null, 332 }; 333 334 var dids: std.ArrayListUnmanaged([]const u8) = .empty; 335 defer dids.deinit(self.allocator); 336 337 for (repos) |repo| { 338 const duped = self.allocator.dupe(u8, repo.did) catch continue; 339 dids.append(self.allocator, duped) catch { 340 self.allocator.free(duped); 341 continue; 342 }; 343 } 344 345 const next_cursor = if (parsed.value.cursor) |c| 346 self.allocator.dupe(u8, c) catch null 347 else 348 null; 349 350 return .{ 351 .dids = dids.toOwnedSlice(self.allocator) catch return error.OutOfMemory, 352 .next_cursor = next_cursor, 353 }; 354 } 355 356 const ListReposResponse = struct { 357 repos: ?[]const RepoEntry = null, 358 cursor: ?[]const u8 = null, 359 }; 360 361 const RepoEntry = struct { 362 did: []const u8, 363 }; 364 365 /// return status summary for the admin endpoint 366 pub fn getStatus(self: *Backfiller, allocator: Allocator) ![]u8 { 367 const pool = self.db(); 368 369 var aw: Io.Writer.Allocating = .init(allocator); 370 defer aw.deinit(); 371 const w = &aw.writer; 372 373 // query aggregate stats 374 var total: i64 = 0; 375 var completed: i64 = 0; 376 var total_imported: i64 = 0; 377 { 378 var row = (pool.rowUnsafe( 379 "SELECT COUNT(*)::bigint, COUNT(completed_at)::bigint, COALESCE(SUM(imported_count), 0)::bigint FROM backfill_progress", 380 .{}, 381 ) catch null) orelse null; 382 if (row) |*r| { 383 total = r.get(i64, 0); 384 completed = r.get(i64, 1); 385 total_imported = r.get(i64, 2); 386 r.deinit() catch {}; 387 } 388 } 389 390 w.print("{{\"running\":{},\"total\":{d},\"completed\":{d},\"in_progress\":{d},\"total_imported\":{d},\"collections\":[", .{ 391 self.isRunning(), 392 total, 393 completed, 394 total - completed, 395 total_imported, 396 }) catch return error.FormatError; 397 398 // per-collection detail 399 var result = pool.query( 400 "SELECT collection, source, cursor, imported_count, completed_at IS NOT NULL FROM backfill_progress ORDER BY collection, source", 401 .{}, 402 ) catch return error.DatabaseError; 403 defer result.deinit(); 404 405 var first = true; 406 while (result.nextUnsafe() catch null) |dbrow| { 407 if (!first) w.writeByte(',') catch {}; 408 first = false; 409 410 const collection = dbrow.get([]const u8, 0); 411 const source = dbrow.get([]const u8, 1); 412 const cursor_val = dbrow.get([]const u8, 2); 413 const count = dbrow.get(i64, 3); 414 const is_completed = dbrow.get(bool, 4); 415 416 w.print("{{\"collection\":\"{s}\",\"source\":\"{s}\",\"imported\":{d},\"completed\":{}", .{ 417 collection, 418 source, 419 count, 420 is_completed, 421 }) catch {}; 422 423 if (cursor_val.len > 0 and !is_completed) { 424 w.print(",\"cursor\":\"{s}\"", .{cursor_val}) catch {}; 425 } 426 427 w.writeByte('}') catch {}; 428 } 429 430 w.writeAll("]}") catch {}; 431 return try aw.toOwnedSlice(); 432 } 433};