atproto relay implementation in zig
zlay.waow.tech
1//! collection index backfill — discovers collections and imports DIDs from a source relay
2//!
3//! two collection discovery sources (unioned):
4//! 1. lexicon garden llms.txt — ~700 known NSIDs
5//! 2. RBC column family scan — firehose-observed collections already in the index
6//!
7//! progress tracked in postgres (backfill_progress table) for resumability.
8//! triggered via POST /admin/backfill-collections, status via GET.
9
10const std = @import("std");
11const Io = std.Io;
12const http = std.http;
13const pg = @import("pg");
14const collection_index_mod = @import("collection_index.zig");
15const event_log_mod = @import("event_log.zig");
16
17const Allocator = std.mem.Allocator;
18const log = std.log.scoped(.backfill);
19
20const FetchResult = struct {
21 dids: [][]const u8,
22 next_cursor: ?[]const u8,
23};
24
25pub const Backfiller = struct {
26 allocator: Allocator,
27 collection_index: *collection_index_mod.CollectionIndex,
28 persist: *event_log_mod.DiskPersist,
29 running: std.atomic.Value(bool),
30 thread: ?std.Thread,
31 source: []const u8,
32 io: Io,
33 shutdown: *std.atomic.Value(bool),
34
35 fn db(self: *Backfiller) *pg.Pool {
36 return self.persist.db;
37 }
38
39 pub fn init(
40 allocator: Allocator,
41 collection_index: *collection_index_mod.CollectionIndex,
42 persist: *event_log_mod.DiskPersist,
43 io: Io,
44 shutdown: *std.atomic.Value(bool),
45 ) Backfiller {
46 return .{
47 .allocator = allocator,
48 .collection_index = collection_index,
49 .persist = persist,
50 .running = .{ .raw = false },
51 .thread = null,
52 .source = "",
53 .io = io,
54 .shutdown = shutdown,
55 };
56 }
57
58 pub fn isRunning(self: *Backfiller) bool {
59 return self.running.load(.acquire);
60 }
61
62 /// block until any in-progress backfill thread completes.
63 /// must be called before tearing down DiskPersist or CollectionIndex.
64 pub fn waitForCompletion(self: *Backfiller) void {
65 if (self.thread) |t| {
66 t.join();
67 self.thread = null;
68 }
69 }
70
71 /// start a backfill from the given source relay. returns error if already running.
72 pub fn start(self: *Backfiller, source: []const u8) !void {
73 if (self.running.cmpxchgStrong(false, true, .acq_rel, .acquire) != null) {
74 return error.AlreadyRunning;
75 }
76 errdefer self.running.store(false, .release);
77
78 self.source = try self.allocator.dupe(u8, source);
79 errdefer {
80 self.allocator.free(self.source);
81 self.source = "";
82 }
83 self.thread = std.Thread.spawn(.{}, run, .{self}) catch return error.SpawnFailed;
84 }
85
86 fn run(self: *Backfiller) void {
87 defer {
88 self.allocator.free(self.source);
89 self.source = "";
90 // note: do NOT clear self.thread here — waitForCompletion() needs
91 // the handle to join this thread before dp/ci teardown. the running
92 // flag gates start(), so a stale thread handle is harmless.
93 self.running.store(false, .release);
94 }
95
96 const pool = self.db();
97
98 // discover collections
99 const collections = self.discoverCollections() catch |err| {
100 log.err("collection discovery failed: {s}", .{@errorName(err)});
101 return;
102 };
103 defer {
104 for (collections) |c| self.allocator.free(c);
105 self.allocator.free(collections);
106 }
107
108 log.info("discovered {d} collections to backfill from {s}", .{ collections.len, self.source });
109
110 // insert progress rows (skip existing)
111 for (collections) |collection| {
112 _ = pool.exec(
113 "INSERT INTO backfill_progress (collection, source) VALUES ($1, $2) ON CONFLICT (collection, source) DO NOTHING",
114 .{ collection, self.source },
115 ) catch |err| {
116 log.warn("failed to insert progress for {s}: {s}", .{ collection, @errorName(err) });
117 };
118 }
119
120 // backfill each collection
121 for (collections) |collection| {
122 if (self.shutdown.load(.acquire)) {
123 log.info("backfill interrupted by shutdown", .{});
124 return;
125 }
126 self.backfillCollection(collection) catch |err| {
127 log.warn("backfill failed for {s}: {s}", .{ collection, @errorName(err) });
128 };
129 }
130
131 log.info("backfill complete", .{});
132 }
133
134 fn discoverCollections(self: *Backfiller) ![][]const u8 {
135 var seen: std.StringHashMapUnmanaged(void) = .empty;
136 defer seen.deinit(self.allocator);
137
138 // source 1: lexicon garden
139 const garden = self.fetchLexiconGarden() catch |err| blk: {
140 log.warn("lexicon garden fetch failed: {s}", .{@errorName(err)});
141 break :blk &[_][]const u8{};
142 };
143 defer {
144 for (garden) |c| self.allocator.free(c);
145 self.allocator.free(garden);
146 }
147 for (garden) |c| {
148 if (!seen.contains(c)) {
149 const duped = try self.allocator.dupe(u8, c);
150 errdefer self.allocator.free(duped);
151 try seen.put(self.allocator, duped, {});
152 }
153 }
154
155 // source 2: observed collections from RBC scan
156 const observed = self.collection_index.listKnownCollections(self.allocator) catch |err| blk: {
157 log.warn("RBC scan failed: {s}", .{@errorName(err)});
158 break :blk &[_][]const u8{};
159 };
160 defer {
161 for (observed) |c| self.allocator.free(c);
162 self.allocator.free(observed);
163 }
164 for (observed) |c| {
165 if (!seen.contains(c)) {
166 const duped = try self.allocator.dupe(u8, c);
167 errdefer self.allocator.free(duped);
168 try seen.put(self.allocator, duped, {});
169 }
170 }
171
172 // collect into result
173 const result = try self.allocator.alloc([]const u8, seen.count());
174 var i: usize = 0;
175 var key_iter = seen.keyIterator();
176 while (key_iter.next()) |key| {
177 result[i] = key.*;
178 i += 1;
179 }
180
181 return result;
182 }
183
184 /// fetch NSIDs from https://lexicon.garden/llms.txt
185 /// parses lines matching `- [`<nsid>`](`
186 fn fetchLexiconGarden(self: *Backfiller) ![][]const u8 {
187 var client: http.Client = .{ .allocator = self.allocator, .io = self.io };
188 defer client.deinit();
189
190 var aw: std.Io.Writer.Allocating = .init(self.allocator);
191 defer aw.deinit();
192
193 const result = client.fetch(.{
194 .location = .{ .url = "https://lexicon.garden/llms.txt" },
195 .response_writer = &aw.writer,
196 .method = .GET,
197 }) catch return error.FetchFailed;
198
199 if (result.status != .ok) return error.FetchFailed;
200
201 const body = aw.toArrayList().items;
202
203 var nsids: std.ArrayListUnmanaged([]const u8) = .empty;
204 defer nsids.deinit(self.allocator);
205
206 // parse lines like: - [`app.bsky.feed.post`](
207 var lines = std.mem.splitScalar(u8, body, '\n');
208 while (lines.next()) |line| {
209 // look for pattern: - [`<nsid>`](
210 const backtick_start = std.mem.indexOf(u8, line, "- [`") orelse continue;
211 const nsid_start = backtick_start + 4; // skip "- [`"
212 const rest = line[nsid_start..];
213 const backtick_end = std.mem.indexOf(u8, rest, "`](") orelse continue;
214 const nsid = rest[0..backtick_end];
215
216 // basic NSID validation: must contain at least one dot
217 if (!std.mem.containsAtLeast(u8, nsid, 1, ".")) continue;
218 // skip obviously non-record types (but include them anyway — they'll return empty)
219
220 const duped = try self.allocator.dupe(u8, nsid);
221 errdefer self.allocator.free(duped);
222 try nsids.append(self.allocator, duped);
223 }
224
225 log.info("lexicon garden: found {d} NSIDs", .{nsids.items.len});
226 return try nsids.toOwnedSlice(self.allocator);
227 }
228
229 fn backfillCollection(self: *Backfiller, collection: []const u8) !void {
230 const pool = self.db();
231
232 // single query: check completion, get cursor + count for resume
233 var cursor: ?[]const u8 = null;
234 defer if (cursor) |c| self.allocator.free(c);
235 var imported: i64 = 0;
236 {
237 var row = (pool.rowUnsafe(
238 "SELECT completed_at IS NOT NULL, cursor, imported_count FROM backfill_progress WHERE collection = $1 AND source = $2",
239 .{ collection, self.source },
240 ) catch return error.DatabaseError) orelse return;
241 defer row.deinit() catch {};
242
243 if (row.get(bool, 0)) return; // already done
244
245 const saved_cursor = row.get([]const u8, 1);
246 imported = row.get(i64, 2);
247
248 if (saved_cursor.len > 0) {
249 cursor = try self.allocator.dupe(u8, saved_cursor);
250 log.info("{s}: resuming from cursor (imported {d} so far)", .{ collection, imported });
251 }
252 }
253
254 // reuse one HTTP client across all pages for this collection
255 var client: http.Client = .{ .allocator = self.allocator, .io = self.io };
256 defer client.deinit();
257
258 var page_count: usize = 0;
259 while (!self.shutdown.load(.acquire)) {
260 const fetch_result = self.fetchPage(&client, collection, cursor) catch |err| {
261 log.warn("{s}: fetch page failed: {s}", .{ collection, @errorName(err) });
262 break;
263 };
264 defer {
265 for (fetch_result.dids) |d| self.allocator.free(d);
266 self.allocator.free(fetch_result.dids);
267 if (fetch_result.next_cursor) |nc| self.allocator.free(nc);
268 }
269
270 // add each DID to collection index
271 for (fetch_result.dids) |did| {
272 self.collection_index.addCollection(did, collection) catch {};
273 imported += 1;
274 }
275
276 page_count += 1;
277
278 // update cursor in progress table
279 const new_cursor = fetch_result.next_cursor orelse "";
280 _ = pool.exec(
281 "UPDATE backfill_progress SET cursor = $1, imported_count = $2 WHERE collection = $3 AND source = $4",
282 .{ new_cursor, imported, collection, self.source },
283 ) catch {};
284
285 if (fetch_result.next_cursor) |nc| {
286 // advance cursor
287 if (cursor) |old| self.allocator.free(old);
288 cursor = self.allocator.dupe(u8, nc) catch break;
289
290 // brief pause between pages
291 self.io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch {};
292 } else {
293 // no more pages — mark complete
294 _ = pool.exec(
295 "UPDATE backfill_progress SET completed_at = now(), cursor = '', imported_count = $1 WHERE collection = $2 AND source = $3",
296 .{ imported, collection, self.source },
297 ) catch {};
298 log.info("{s}: complete ({d} DIDs, {d} pages)", .{ collection, imported, page_count });
299 break;
300 }
301 }
302 }
303
304 fn fetchPage(self: *Backfiller, client: *http.Client, collection: []const u8, cursor: ?[]const u8) !FetchResult {
305 // build URL
306 var url_buf: [1024]u8 = undefined;
307 const url = if (cursor) |c|
308 std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000&cursor={s}", .{ self.source, collection, c }) catch return error.UrlTooLong
309 else
310 std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.listReposByCollection?collection={s}&limit=1000", .{ self.source, collection }) catch return error.UrlTooLong;
311
312 var aw: std.Io.Writer.Allocating = .init(self.allocator);
313 defer aw.deinit();
314
315 const result = client.fetch(.{
316 .location = .{ .url = url },
317 .response_writer = &aw.writer,
318 .method = .GET,
319 }) catch return error.FetchFailed;
320
321 if (result.status != .ok) return error.FetchFailed;
322
323 const body = aw.toArrayList().items;
324
325 // parse: {"repos": [{"did": "..."}, ...], "cursor": "..."}
326 const parsed = std.json.parseFromSlice(ListReposResponse, self.allocator, body, .{ .ignore_unknown_fields = true }) catch return error.ParseFailed;
327 defer parsed.deinit();
328
329 const repos = parsed.value.repos orelse return .{
330 .dids = try self.allocator.alloc([]const u8, 0),
331 .next_cursor = null,
332 };
333
334 var dids: std.ArrayListUnmanaged([]const u8) = .empty;
335 defer dids.deinit(self.allocator);
336
337 for (repos) |repo| {
338 const duped = self.allocator.dupe(u8, repo.did) catch continue;
339 dids.append(self.allocator, duped) catch {
340 self.allocator.free(duped);
341 continue;
342 };
343 }
344
345 const next_cursor = if (parsed.value.cursor) |c|
346 self.allocator.dupe(u8, c) catch null
347 else
348 null;
349
350 return .{
351 .dids = dids.toOwnedSlice(self.allocator) catch return error.OutOfMemory,
352 .next_cursor = next_cursor,
353 };
354 }
355
356 const ListReposResponse = struct {
357 repos: ?[]const RepoEntry = null,
358 cursor: ?[]const u8 = null,
359 };
360
361 const RepoEntry = struct {
362 did: []const u8,
363 };
364
365 /// return status summary for the admin endpoint
366 pub fn getStatus(self: *Backfiller, allocator: Allocator) ![]u8 {
367 const pool = self.db();
368
369 var aw: Io.Writer.Allocating = .init(allocator);
370 defer aw.deinit();
371 const w = &aw.writer;
372
373 // query aggregate stats
374 var total: i64 = 0;
375 var completed: i64 = 0;
376 var total_imported: i64 = 0;
377 {
378 var row = (pool.rowUnsafe(
379 "SELECT COUNT(*)::bigint, COUNT(completed_at)::bigint, COALESCE(SUM(imported_count), 0)::bigint FROM backfill_progress",
380 .{},
381 ) catch null) orelse null;
382 if (row) |*r| {
383 total = r.get(i64, 0);
384 completed = r.get(i64, 1);
385 total_imported = r.get(i64, 2);
386 r.deinit() catch {};
387 }
388 }
389
390 w.print("{{\"running\":{},\"total\":{d},\"completed\":{d},\"in_progress\":{d},\"total_imported\":{d},\"collections\":[", .{
391 self.isRunning(),
392 total,
393 completed,
394 total - completed,
395 total_imported,
396 }) catch return error.FormatError;
397
398 // per-collection detail
399 var result = pool.query(
400 "SELECT collection, source, cursor, imported_count, completed_at IS NOT NULL FROM backfill_progress ORDER BY collection, source",
401 .{},
402 ) catch return error.DatabaseError;
403 defer result.deinit();
404
405 var first = true;
406 while (result.nextUnsafe() catch null) |dbrow| {
407 if (!first) w.writeByte(',') catch {};
408 first = false;
409
410 const collection = dbrow.get([]const u8, 0);
411 const source = dbrow.get([]const u8, 1);
412 const cursor_val = dbrow.get([]const u8, 2);
413 const count = dbrow.get(i64, 3);
414 const is_completed = dbrow.get(bool, 4);
415
416 w.print("{{\"collection\":\"{s}\",\"source\":\"{s}\",\"imported\":{d},\"completed\":{}", .{
417 collection,
418 source,
419 count,
420 is_completed,
421 }) catch {};
422
423 if (cursor_val.len > 0 and !is_completed) {
424 w.print(",\"cursor\":\"{s}\"", .{cursor_val}) catch {};
425 }
426
427 w.writeByte('}') catch {};
428 }
429
430 w.writeAll("]}") catch {};
431 return try aw.toOwnedSlice();
432 }
433};