atproto relay implementation in zig
zlay.waow.tech
1//! collection index — tracks which collections exist per repo
2//!
3//! inspired by lightrail (microcosm.blue/lightrail), uses RocksDB with two
4//! column families for bidirectional lookup:
5//! rbc: <collection>\0<did> → () — "repos by collection" (prefix scan by collection)
6//! cbr: <did>\0<collection> → () — "collections by repo" (per-repo lookups, deletion)
7//!
8//! live indexing: the subscriber calls trackCommitOps on each validated commit
9//! to maintain the index from firehose data. backfill via getRecord probing
10//! (lightrail's technique) is planned for phase 2.
11
12const std = @import("std");
13const rocksdb = @import("rocksdb");
14
15const Allocator = std.mem.Allocator;
16const log = std.log.scoped(.collection_index);
17
18const separator = '\x00';
19
20pub const CollectionIndex = struct {
21 db: rocksdb.DB,
22 rbc: rocksdb.ColumnFamilyHandle,
23 cbr: rocksdb.ColumnFamilyHandle,
24 allocator: Allocator,
25
26 pub fn open(allocator: Allocator, data_dir: []const u8) !CollectionIndex {
27 var err_str: ?rocksdb.Data = null;
28
29 const db, const families = rocksdb.DB.open(
30 allocator,
31 data_dir,
32 .{
33 .create_if_missing = true,
34 .create_missing_column_families = true,
35 },
36 &.{
37 .{ .name = "default" },
38 .{ .name = "rbc" },
39 .{ .name = "cbr" },
40 },
41 false,
42 &err_str,
43 ) catch {
44 if (err_str) |e| {
45 log.err("rocksdb open failed: {s}", .{e.data});
46 e.deinit();
47 }
48 return error.RocksDBOpen;
49 };
50 defer allocator.free(families);
51
52 // find column family handles by name
53 var rbc: ?rocksdb.ColumnFamilyHandle = null;
54 var cbr: ?rocksdb.ColumnFamilyHandle = null;
55 for (families) |cf| {
56 if (std.mem.eql(u8, cf.name, "rbc")) rbc = cf.handle;
57 if (std.mem.eql(u8, cf.name, "cbr")) cbr = cf.handle;
58 }
59
60 log.info("collection index opened at {s}", .{data_dir});
61
62 return .{
63 .db = db,
64 .rbc = rbc orelse return error.MissingColumnFamily,
65 .cbr = cbr orelse return error.MissingColumnFamily,
66 .allocator = allocator,
67 };
68 }
69
70 pub fn deinit(self: *CollectionIndex) void {
71 self.db.deinit();
72 }
73
74 /// add a (collection, did) entry to both indexes.
75 /// idempotent — overwrites are no-ops for empty values.
76 pub fn addCollection(self: *CollectionIndex, did: []const u8, collection: []const u8) !void {
77 var err_str: ?rocksdb.Data = null;
78
79 const rbc_key = makeKey(self.allocator, collection, did) catch return;
80 defer self.allocator.free(rbc_key);
81 const cbr_key = makeKey(self.allocator, did, collection) catch return;
82 defer self.allocator.free(cbr_key);
83
84 var batch = rocksdb.WriteBatch.init();
85 defer batch.deinit();
86
87 batch.put(self.rbc, rbc_key, "");
88 batch.put(self.cbr, cbr_key, "");
89
90 self.db.write(batch, &err_str) catch {
91 if (err_str) |e| {
92 log.warn("addCollection write failed: {s}", .{e.data});
93 e.deinit();
94 }
95 return error.WriteFailed;
96 };
97 }
98
99 /// remove a (collection, did) entry from both indexes.
100 pub fn removeCollection(self: *CollectionIndex, did: []const u8, collection: []const u8) !void {
101 var err_str: ?rocksdb.Data = null;
102
103 const rbc_key = makeKey(self.allocator, collection, did) catch return;
104 defer self.allocator.free(rbc_key);
105 const cbr_key = makeKey(self.allocator, did, collection) catch return;
106 defer self.allocator.free(cbr_key);
107
108 var batch = rocksdb.WriteBatch.init();
109 defer batch.deinit();
110
111 batch.delete(self.rbc, rbc_key);
112 batch.delete(self.cbr, cbr_key);
113
114 self.db.write(batch, &err_str) catch {
115 if (err_str) |e| {
116 log.warn("removeCollection write failed: {s}", .{e.data});
117 e.deinit();
118 }
119 return error.WriteFailed;
120 };
121 }
122
123 /// remove all collection entries for a DID (e.g. account tombstone).
124 /// scans cbr for all collections, then batch-deletes from both indexes.
125 pub fn removeAll(self: *CollectionIndex, did: []const u8) !void {
126 var err_str: ?rocksdb.Data = null;
127
128 // build prefix: did\0
129 var prefix_buf: [512]u8 = undefined;
130 if (did.len + 1 > prefix_buf.len) return error.KeyTooLong;
131 @memcpy(prefix_buf[0..did.len], did);
132 prefix_buf[did.len] = separator;
133 const prefix = prefix_buf[0 .. did.len + 1];
134
135 var batch = rocksdb.WriteBatch.init();
136 defer batch.deinit();
137
138 // scan cbr for all collections belonging to this DID
139 var iter = self.db.iterator(self.cbr, .forward, prefix);
140 defer iter.deinit();
141
142 while (try iter.next(&err_str)) |entry| {
143 // iterator data points to internal rocksdb buffers — do NOT deinit
144 const key_data = entry[0].data;
145
146 // stop when prefix no longer matches
147 if (!std.mem.startsWith(u8, key_data, prefix)) break;
148
149 // extract collection name after the separator
150 const collection = key_data[prefix.len..];
151
152 // delete from rbc — must copy collection since iterator advances
153 const rbc_key = makeKey(self.allocator, collection, did) catch continue;
154 defer self.allocator.free(rbc_key);
155 batch.delete(self.rbc, rbc_key);
156
157 // delete from cbr — must copy key since iterator advances
158 const cbr_key = self.allocator.dupe(u8, key_data) catch continue;
159 defer self.allocator.free(cbr_key);
160 batch.delete(self.cbr, cbr_key);
161 }
162
163 self.db.write(batch, &err_str) catch {
164 if (err_str) |e| {
165 log.warn("removeAll write failed: {s}", .{e.data});
166 e.deinit();
167 }
168 return error.WriteFailed;
169 };
170 }
171
172 /// process commit ops to update collection index.
173 /// extracts collection from each op path (<collection>/<rkey>).
174 /// for creates: ensures collection is tracked.
175 ///
176 /// `ops` is a cbor Value (expected to be an array of {action, path, cid?} maps).
177 pub fn trackCommitOps(self: *CollectionIndex, did: []const u8, ops: anytype) void {
178 // ops is a tagged union — check it's an array
179 const items = switch (ops) {
180 .array => |a| a,
181 else => return,
182 };
183 for (items) |op| {
184 const action = op.getString("action") orelse continue;
185 const path = op.getString("path") orelse continue;
186
187 // extract collection from path: "collection/rkey"
188 const slash = std.mem.indexOfScalar(u8, path, '/') orelse continue;
189 const collection = path[0..slash];
190 if (collection.len == 0) continue;
191
192 if (std.mem.eql(u8, action, "create") or std.mem.eql(u8, action, "update")) {
193 // create or update — ensure collection is tracked (matches indigo semantics)
194 self.addCollection(did, collection) catch |err| {
195 log.debug("trackCommitOps add failed: {s}", .{@errorName(err)});
196 };
197 }
198 // note: we don't remove on "delete" in phase 1 — we'd need to know
199 // if the collection still has other records (requires getRecord probing
200 // or counting). lightrail handles this in phase 4.
201 }
202 }
203
204 /// list DIDs that have records in a collection. returns up to `limit` DIDs
205 /// starting after `cursor_did` (or from the beginning if null).
206 pub fn listReposByCollection(
207 self: *CollectionIndex,
208 collection: []const u8,
209 limit: usize,
210 cursor_did: ?[]const u8,
211 out_buf: []u8,
212 ) !ListResult {
213 var err_str: ?rocksdb.Data = null;
214
215 // seek key: either collection\0cursor_did\x01 (after cursor) or collection\0
216 var seek_buf: [1024]u8 = undefined;
217 const seek_key = blk: {
218 if (cursor_did) |cursor| {
219 if (collection.len + 1 + cursor.len + 1 > seek_buf.len) return error.KeyTooLong;
220 @memcpy(seek_buf[0..collection.len], collection);
221 seek_buf[collection.len] = separator;
222 @memcpy(seek_buf[collection.len + 1 ..][0..cursor.len], cursor);
223 // \x01 is one byte past \0 — skips the cursor DID itself
224 seek_buf[collection.len + 1 + cursor.len] = 0x01;
225 break :blk seek_buf[0 .. collection.len + 1 + cursor.len + 1];
226 } else {
227 if (collection.len + 1 > seek_buf.len) return error.KeyTooLong;
228 @memcpy(seek_buf[0..collection.len], collection);
229 seek_buf[collection.len] = separator;
230 break :blk seek_buf[0 .. collection.len + 1];
231 }
232 };
233
234 const prefix = seek_key[0 .. collection.len + 1]; // collection\0
235
236 var iter = self.db.iterator(self.rbc, .forward, seek_key);
237 defer iter.deinit();
238
239 var count: usize = 0;
240 var buf_pos: usize = 0;
241 var did_offsets: [2000]Span = undefined;
242 var last_did: ?[]const u8 = null;
243
244 while (count < limit) {
245 const entry = (try iter.next(&err_str)) orelse break;
246 // iterator data points to internal rocksdb buffers — do NOT deinit
247 const key_data = entry[0].data;
248
249 // stop when prefix no longer matches
250 if (!std.mem.startsWith(u8, key_data, prefix)) break;
251
252 // extract DID after prefix
253 const did = key_data[prefix.len..];
254 if (did.len == 0) continue;
255
256 // copy DID into output buffer
257 if (buf_pos + did.len > out_buf.len) break;
258 @memcpy(out_buf[buf_pos..][0..did.len], did);
259 did_offsets[count] = .{ .start = buf_pos, .len = did.len };
260 buf_pos += did.len;
261
262 last_did = out_buf[did_offsets[count].start..][0..did_offsets[count].len];
263 count += 1;
264 }
265
266 return .{
267 .count = count,
268 .offsets = did_offsets[0..count],
269 .buf = out_buf[0..buf_pos],
270 .last_did = last_did,
271 };
272 }
273
274 /// scan the RBC column family for unique collection prefixes.
275 /// returns a deduplicated list of collection names (caller owns the slice and each string).
276 pub fn listKnownCollections(self: *CollectionIndex, allocator: Allocator) ![][]const u8 {
277 var err_str: ?rocksdb.Data = null;
278
279 var seen: std.StringHashMapUnmanaged(void) = .empty;
280 defer seen.deinit(allocator);
281
282 // full scan of RBC — keys are collection\0did
283 var iter = self.db.iterator(self.rbc, .forward, "");
284 defer iter.deinit();
285
286 while (try iter.next(&err_str)) |entry| {
287 const key_data = entry[0].data;
288 const sep_pos = std.mem.indexOfScalar(u8, key_data, separator) orelse continue;
289 const collection = key_data[0..sep_pos];
290 if (collection.len == 0) continue;
291
292 if (!seen.contains(collection)) {
293 const duped = try allocator.dupe(u8, collection);
294 errdefer allocator.free(duped);
295 try seen.put(allocator, duped, {});
296 }
297 }
298
299 // collect into a slice
300 const result = try allocator.alloc([]const u8, seen.count());
301 var i: usize = 0;
302 var key_iter = seen.keyIterator();
303 while (key_iter.next()) |key| {
304 result[i] = key.*;
305 i += 1;
306 }
307
308 return result;
309 }
310
311 /// check if a DID has any collection entries
312 pub fn hasCollections(self: *CollectionIndex, did: []const u8) bool {
313 var err_str: ?rocksdb.Data = null;
314 var prefix_buf: [512]u8 = undefined;
315 if (did.len + 1 > prefix_buf.len) return false;
316 @memcpy(prefix_buf[0..did.len], did);
317 prefix_buf[did.len] = separator;
318 const prefix = prefix_buf[0 .. did.len + 1];
319
320 var iter = self.db.iterator(self.cbr, .forward, prefix);
321 defer iter.deinit();
322
323 if (iter.next(&err_str) catch null) |entry| {
324 // iterator data points to internal rocksdb buffers — do NOT deinit
325 return std.mem.startsWith(u8, entry[0].data, prefix);
326 }
327 return false;
328 }
329};
330
331pub const Span = struct { start: usize, len: usize };
332
333pub const ListResult = struct {
334 count: usize,
335 offsets: []const Span,
336 buf: []const u8,
337 last_did: ?[]const u8,
338
339 /// get the DID at index i
340 pub fn getDid(self: ListResult, i: usize) []const u8 {
341 const off = self.offsets[i];
342 return self.buf[off.start..][0..off.len];
343 }
344};
345
346fn makeKey(allocator: Allocator, a: []const u8, b: []const u8) ![]u8 {
347 const key = try allocator.alloc(u8, a.len + 1 + b.len);
348 @memcpy(key[0..a.len], a);
349 key[a.len] = separator;
350 @memcpy(key[a.len + 1 ..][0..b.len], b);
351 return key;
352}
353
354// --- tests ---
355
356test "collection index: open and close" {
357 const testing = std.testing;
358 const allocator = testing.allocator;
359
360 var dir = std.testing.tmpDir(.{});
361 defer dir.cleanup();
362 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)});
363 defer allocator.free(path);
364
365 var ci = CollectionIndex.open(allocator, path) catch |err| {
366 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)});
367 return error.SkipZigTest;
368 };
369 ci.deinit();
370}
371
372test "collection index: basic put and get" {
373 const testing = std.testing;
374 const allocator = testing.allocator;
375
376 var dir = std.testing.tmpDir(.{});
377 defer dir.cleanup();
378 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)});
379 defer allocator.free(path);
380
381 var ci = CollectionIndex.open(allocator, path) catch |err| {
382 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)});
383 return error.SkipZigTest;
384 };
385 defer ci.deinit();
386
387 // add a single collection entry
388 try ci.addCollection("did:plc:alice", "app.bsky.feed.post");
389
390 // verify via direct rocksdb get
391 var err_str: ?rocksdb.Data = null;
392 const rbc_key = try makeKey(allocator, "app.bsky.feed.post", "did:plc:alice");
393 defer allocator.free(rbc_key);
394 const val = try ci.db.get(ci.rbc, rbc_key, &err_str);
395 try testing.expect(val != null);
396 if (val) |v| v.deinit();
397}
398
399test "collection index: list and cursor" {
400 const testing = std.testing;
401 const allocator = testing.allocator;
402
403 var dir = std.testing.tmpDir(.{});
404 defer dir.cleanup();
405 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)});
406 defer allocator.free(path);
407
408 var ci = CollectionIndex.open(allocator, path) catch |err| {
409 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)});
410 return error.SkipZigTest;
411 };
412 defer ci.deinit();
413
414 try ci.addCollection("did:plc:alice", "app.bsky.feed.post");
415 try ci.addCollection("did:plc:bob", "app.bsky.feed.post");
416 try ci.addCollection("did:plc:carol", "app.bsky.graph.follow");
417
418 // list repos by collection
419 var buf: [4096]u8 = undefined;
420 const result = try ci.listReposByCollection("app.bsky.feed.post", 100, null, &buf);
421 try testing.expectEqual(@as(usize, 2), result.count);
422 try testing.expectEqualStrings("did:plc:alice", result.getDid(0));
423 try testing.expectEqualStrings("did:plc:bob", result.getDid(1));
424
425 // cursor pagination
426 const result2 = try ci.listReposByCollection("app.bsky.feed.post", 100, "did:plc:alice", &buf);
427 try testing.expectEqual(@as(usize, 1), result2.count);
428 try testing.expectEqualStrings("did:plc:bob", result2.getDid(0));
429}
430
431test "collection index: removeAll" {
432 const testing = std.testing;
433 const allocator = testing.allocator;
434
435 var dir = std.testing.tmpDir(.{});
436 defer dir.cleanup();
437 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)});
438 defer allocator.free(path);
439
440 var ci = CollectionIndex.open(allocator, path) catch |err| {
441 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)});
442 return error.SkipZigTest;
443 };
444 defer ci.deinit();
445
446 try ci.addCollection("did:plc:alice", "app.bsky.feed.post");
447 try ci.addCollection("did:plc:alice", "app.bsky.feed.like");
448 try ci.addCollection("did:plc:bob", "app.bsky.feed.post");
449
450 try ci.removeAll("did:plc:alice");
451 try testing.expect(!ci.hasCollections("did:plc:alice"));
452
453 var buf: [4096]u8 = undefined;
454 const result = try ci.listReposByCollection("app.bsky.feed.post", 100, null, &buf);
455 try testing.expectEqual(@as(usize, 1), result.count);
456 try testing.expectEqualStrings("did:plc:bob", result.getDid(0));
457}
458
459test "collection index: listKnownCollections" {
460 const testing = std.testing;
461 const allocator = testing.allocator;
462
463 var dir = std.testing.tmpDir(.{});
464 defer dir.cleanup();
465 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)});
466 defer allocator.free(path);
467
468 var ci = CollectionIndex.open(allocator, path) catch |err| {
469 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)});
470 return error.SkipZigTest;
471 };
472 defer ci.deinit();
473
474 try ci.addCollection("did:plc:alice", "app.bsky.feed.post");
475 try ci.addCollection("did:plc:bob", "app.bsky.feed.post");
476 try ci.addCollection("did:plc:alice", "app.bsky.feed.like");
477 try ci.addCollection("did:plc:carol", "app.bsky.graph.follow");
478
479 const collections = try ci.listKnownCollections(allocator);
480 defer {
481 for (collections) |c| allocator.free(c);
482 allocator.free(collections);
483 }
484
485 try testing.expectEqual(@as(usize, 3), collections.len);
486
487 // verify all three collections are present (order not guaranteed)
488 var found_post = false;
489 var found_like = false;
490 var found_follow = false;
491 for (collections) |c| {
492 if (std.mem.eql(u8, c, "app.bsky.feed.post")) found_post = true;
493 if (std.mem.eql(u8, c, "app.bsky.feed.like")) found_like = true;
494 if (std.mem.eql(u8, c, "app.bsky.graph.follow")) found_follow = true;
495 }
496 try testing.expect(found_post);
497 try testing.expect(found_like);
498 try testing.expect(found_follow);
499}
500
501test "collection index: idempotent add" {
502 const testing = std.testing;
503 const allocator = testing.allocator;
504
505 var dir = std.testing.tmpDir(.{});
506 defer dir.cleanup();
507 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)});
508 defer allocator.free(path);
509
510 var ci = CollectionIndex.open(allocator, path) catch |err| {
511 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)});
512 return error.SkipZigTest;
513 };
514 defer ci.deinit();
515
516 // add same entry twice — should not duplicate
517 try ci.addCollection("did:plc:alice", "app.bsky.feed.post");
518 try ci.addCollection("did:plc:alice", "app.bsky.feed.post");
519
520 var buf: [4096]u8 = undefined;
521 const result = try ci.listReposByCollection("app.bsky.feed.post", 100, null, &buf);
522 try testing.expectEqual(@as(usize, 1), result.count);
523}