atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 523 lines 20 kB view raw
1//! collection index — tracks which collections exist per repo 2//! 3//! inspired by lightrail (microcosm.blue/lightrail), uses RocksDB with two 4//! column families for bidirectional lookup: 5//! rbc: <collection>\0<did> → () — "repos by collection" (prefix scan by collection) 6//! cbr: <did>\0<collection> → () — "collections by repo" (per-repo lookups, deletion) 7//! 8//! live indexing: the subscriber calls trackCommitOps on each validated commit 9//! to maintain the index from firehose data. backfill via getRecord probing 10//! (lightrail's technique) is planned for phase 2. 11 12const std = @import("std"); 13const rocksdb = @import("rocksdb"); 14 15const Allocator = std.mem.Allocator; 16const log = std.log.scoped(.collection_index); 17 18const separator = '\x00'; 19 20pub const CollectionIndex = struct { 21 db: rocksdb.DB, 22 rbc: rocksdb.ColumnFamilyHandle, 23 cbr: rocksdb.ColumnFamilyHandle, 24 allocator: Allocator, 25 26 pub fn open(allocator: Allocator, data_dir: []const u8) !CollectionIndex { 27 var err_str: ?rocksdb.Data = null; 28 29 const db, const families = rocksdb.DB.open( 30 allocator, 31 data_dir, 32 .{ 33 .create_if_missing = true, 34 .create_missing_column_families = true, 35 }, 36 &.{ 37 .{ .name = "default" }, 38 .{ .name = "rbc" }, 39 .{ .name = "cbr" }, 40 }, 41 false, 42 &err_str, 43 ) catch { 44 if (err_str) |e| { 45 log.err("rocksdb open failed: {s}", .{e.data}); 46 e.deinit(); 47 } 48 return error.RocksDBOpen; 49 }; 50 defer allocator.free(families); 51 52 // find column family handles by name 53 var rbc: ?rocksdb.ColumnFamilyHandle = null; 54 var cbr: ?rocksdb.ColumnFamilyHandle = null; 55 for (families) |cf| { 56 if (std.mem.eql(u8, cf.name, "rbc")) rbc = cf.handle; 57 if (std.mem.eql(u8, cf.name, "cbr")) cbr = cf.handle; 58 } 59 60 log.info("collection index opened at {s}", .{data_dir}); 61 62 return .{ 63 .db = db, 64 .rbc = rbc orelse return error.MissingColumnFamily, 65 .cbr = cbr orelse return error.MissingColumnFamily, 66 .allocator = allocator, 67 }; 68 } 69 70 pub fn deinit(self: *CollectionIndex) void { 71 self.db.deinit(); 72 } 73 74 /// add a (collection, did) entry to both indexes. 75 /// idempotent — overwrites are no-ops for empty values. 76 pub fn addCollection(self: *CollectionIndex, did: []const u8, collection: []const u8) !void { 77 var err_str: ?rocksdb.Data = null; 78 79 const rbc_key = makeKey(self.allocator, collection, did) catch return; 80 defer self.allocator.free(rbc_key); 81 const cbr_key = makeKey(self.allocator, did, collection) catch return; 82 defer self.allocator.free(cbr_key); 83 84 var batch = rocksdb.WriteBatch.init(); 85 defer batch.deinit(); 86 87 batch.put(self.rbc, rbc_key, ""); 88 batch.put(self.cbr, cbr_key, ""); 89 90 self.db.write(batch, &err_str) catch { 91 if (err_str) |e| { 92 log.warn("addCollection write failed: {s}", .{e.data}); 93 e.deinit(); 94 } 95 return error.WriteFailed; 96 }; 97 } 98 99 /// remove a (collection, did) entry from both indexes. 100 pub fn removeCollection(self: *CollectionIndex, did: []const u8, collection: []const u8) !void { 101 var err_str: ?rocksdb.Data = null; 102 103 const rbc_key = makeKey(self.allocator, collection, did) catch return; 104 defer self.allocator.free(rbc_key); 105 const cbr_key = makeKey(self.allocator, did, collection) catch return; 106 defer self.allocator.free(cbr_key); 107 108 var batch = rocksdb.WriteBatch.init(); 109 defer batch.deinit(); 110 111 batch.delete(self.rbc, rbc_key); 112 batch.delete(self.cbr, cbr_key); 113 114 self.db.write(batch, &err_str) catch { 115 if (err_str) |e| { 116 log.warn("removeCollection write failed: {s}", .{e.data}); 117 e.deinit(); 118 } 119 return error.WriteFailed; 120 }; 121 } 122 123 /// remove all collection entries for a DID (e.g. account tombstone). 124 /// scans cbr for all collections, then batch-deletes from both indexes. 125 pub fn removeAll(self: *CollectionIndex, did: []const u8) !void { 126 var err_str: ?rocksdb.Data = null; 127 128 // build prefix: did\0 129 var prefix_buf: [512]u8 = undefined; 130 if (did.len + 1 > prefix_buf.len) return error.KeyTooLong; 131 @memcpy(prefix_buf[0..did.len], did); 132 prefix_buf[did.len] = separator; 133 const prefix = prefix_buf[0 .. did.len + 1]; 134 135 var batch = rocksdb.WriteBatch.init(); 136 defer batch.deinit(); 137 138 // scan cbr for all collections belonging to this DID 139 var iter = self.db.iterator(self.cbr, .forward, prefix); 140 defer iter.deinit(); 141 142 while (try iter.next(&err_str)) |entry| { 143 // iterator data points to internal rocksdb buffers — do NOT deinit 144 const key_data = entry[0].data; 145 146 // stop when prefix no longer matches 147 if (!std.mem.startsWith(u8, key_data, prefix)) break; 148 149 // extract collection name after the separator 150 const collection = key_data[prefix.len..]; 151 152 // delete from rbc — must copy collection since iterator advances 153 const rbc_key = makeKey(self.allocator, collection, did) catch continue; 154 defer self.allocator.free(rbc_key); 155 batch.delete(self.rbc, rbc_key); 156 157 // delete from cbr — must copy key since iterator advances 158 const cbr_key = self.allocator.dupe(u8, key_data) catch continue; 159 defer self.allocator.free(cbr_key); 160 batch.delete(self.cbr, cbr_key); 161 } 162 163 self.db.write(batch, &err_str) catch { 164 if (err_str) |e| { 165 log.warn("removeAll write failed: {s}", .{e.data}); 166 e.deinit(); 167 } 168 return error.WriteFailed; 169 }; 170 } 171 172 /// process commit ops to update collection index. 173 /// extracts collection from each op path (<collection>/<rkey>). 174 /// for creates: ensures collection is tracked. 175 /// 176 /// `ops` is a cbor Value (expected to be an array of {action, path, cid?} maps). 177 pub fn trackCommitOps(self: *CollectionIndex, did: []const u8, ops: anytype) void { 178 // ops is a tagged union — check it's an array 179 const items = switch (ops) { 180 .array => |a| a, 181 else => return, 182 }; 183 for (items) |op| { 184 const action = op.getString("action") orelse continue; 185 const path = op.getString("path") orelse continue; 186 187 // extract collection from path: "collection/rkey" 188 const slash = std.mem.indexOfScalar(u8, path, '/') orelse continue; 189 const collection = path[0..slash]; 190 if (collection.len == 0) continue; 191 192 if (std.mem.eql(u8, action, "create") or std.mem.eql(u8, action, "update")) { 193 // create or update — ensure collection is tracked (matches indigo semantics) 194 self.addCollection(did, collection) catch |err| { 195 log.debug("trackCommitOps add failed: {s}", .{@errorName(err)}); 196 }; 197 } 198 // note: we don't remove on "delete" in phase 1 — we'd need to know 199 // if the collection still has other records (requires getRecord probing 200 // or counting). lightrail handles this in phase 4. 201 } 202 } 203 204 /// list DIDs that have records in a collection. returns up to `limit` DIDs 205 /// starting after `cursor_did` (or from the beginning if null). 206 pub fn listReposByCollection( 207 self: *CollectionIndex, 208 collection: []const u8, 209 limit: usize, 210 cursor_did: ?[]const u8, 211 out_buf: []u8, 212 ) !ListResult { 213 var err_str: ?rocksdb.Data = null; 214 215 // seek key: either collection\0cursor_did\x01 (after cursor) or collection\0 216 var seek_buf: [1024]u8 = undefined; 217 const seek_key = blk: { 218 if (cursor_did) |cursor| { 219 if (collection.len + 1 + cursor.len + 1 > seek_buf.len) return error.KeyTooLong; 220 @memcpy(seek_buf[0..collection.len], collection); 221 seek_buf[collection.len] = separator; 222 @memcpy(seek_buf[collection.len + 1 ..][0..cursor.len], cursor); 223 // \x01 is one byte past \0 — skips the cursor DID itself 224 seek_buf[collection.len + 1 + cursor.len] = 0x01; 225 break :blk seek_buf[0 .. collection.len + 1 + cursor.len + 1]; 226 } else { 227 if (collection.len + 1 > seek_buf.len) return error.KeyTooLong; 228 @memcpy(seek_buf[0..collection.len], collection); 229 seek_buf[collection.len] = separator; 230 break :blk seek_buf[0 .. collection.len + 1]; 231 } 232 }; 233 234 const prefix = seek_key[0 .. collection.len + 1]; // collection\0 235 236 var iter = self.db.iterator(self.rbc, .forward, seek_key); 237 defer iter.deinit(); 238 239 var count: usize = 0; 240 var buf_pos: usize = 0; 241 var did_offsets: [2000]Span = undefined; 242 var last_did: ?[]const u8 = null; 243 244 while (count < limit) { 245 const entry = (try iter.next(&err_str)) orelse break; 246 // iterator data points to internal rocksdb buffers — do NOT deinit 247 const key_data = entry[0].data; 248 249 // stop when prefix no longer matches 250 if (!std.mem.startsWith(u8, key_data, prefix)) break; 251 252 // extract DID after prefix 253 const did = key_data[prefix.len..]; 254 if (did.len == 0) continue; 255 256 // copy DID into output buffer 257 if (buf_pos + did.len > out_buf.len) break; 258 @memcpy(out_buf[buf_pos..][0..did.len], did); 259 did_offsets[count] = .{ .start = buf_pos, .len = did.len }; 260 buf_pos += did.len; 261 262 last_did = out_buf[did_offsets[count].start..][0..did_offsets[count].len]; 263 count += 1; 264 } 265 266 return .{ 267 .count = count, 268 .offsets = did_offsets[0..count], 269 .buf = out_buf[0..buf_pos], 270 .last_did = last_did, 271 }; 272 } 273 274 /// scan the RBC column family for unique collection prefixes. 275 /// returns a deduplicated list of collection names (caller owns the slice and each string). 276 pub fn listKnownCollections(self: *CollectionIndex, allocator: Allocator) ![][]const u8 { 277 var err_str: ?rocksdb.Data = null; 278 279 var seen: std.StringHashMapUnmanaged(void) = .empty; 280 defer seen.deinit(allocator); 281 282 // full scan of RBC — keys are collection\0did 283 var iter = self.db.iterator(self.rbc, .forward, ""); 284 defer iter.deinit(); 285 286 while (try iter.next(&err_str)) |entry| { 287 const key_data = entry[0].data; 288 const sep_pos = std.mem.indexOfScalar(u8, key_data, separator) orelse continue; 289 const collection = key_data[0..sep_pos]; 290 if (collection.len == 0) continue; 291 292 if (!seen.contains(collection)) { 293 const duped = try allocator.dupe(u8, collection); 294 errdefer allocator.free(duped); 295 try seen.put(allocator, duped, {}); 296 } 297 } 298 299 // collect into a slice 300 const result = try allocator.alloc([]const u8, seen.count()); 301 var i: usize = 0; 302 var key_iter = seen.keyIterator(); 303 while (key_iter.next()) |key| { 304 result[i] = key.*; 305 i += 1; 306 } 307 308 return result; 309 } 310 311 /// check if a DID has any collection entries 312 pub fn hasCollections(self: *CollectionIndex, did: []const u8) bool { 313 var err_str: ?rocksdb.Data = null; 314 var prefix_buf: [512]u8 = undefined; 315 if (did.len + 1 > prefix_buf.len) return false; 316 @memcpy(prefix_buf[0..did.len], did); 317 prefix_buf[did.len] = separator; 318 const prefix = prefix_buf[0 .. did.len + 1]; 319 320 var iter = self.db.iterator(self.cbr, .forward, prefix); 321 defer iter.deinit(); 322 323 if (iter.next(&err_str) catch null) |entry| { 324 // iterator data points to internal rocksdb buffers — do NOT deinit 325 return std.mem.startsWith(u8, entry[0].data, prefix); 326 } 327 return false; 328 } 329}; 330 331pub const Span = struct { start: usize, len: usize }; 332 333pub const ListResult = struct { 334 count: usize, 335 offsets: []const Span, 336 buf: []const u8, 337 last_did: ?[]const u8, 338 339 /// get the DID at index i 340 pub fn getDid(self: ListResult, i: usize) []const u8 { 341 const off = self.offsets[i]; 342 return self.buf[off.start..][0..off.len]; 343 } 344}; 345 346fn makeKey(allocator: Allocator, a: []const u8, b: []const u8) ![]u8 { 347 const key = try allocator.alloc(u8, a.len + 1 + b.len); 348 @memcpy(key[0..a.len], a); 349 key[a.len] = separator; 350 @memcpy(key[a.len + 1 ..][0..b.len], b); 351 return key; 352} 353 354// --- tests --- 355 356test "collection index: open and close" { 357 const testing = std.testing; 358 const allocator = testing.allocator; 359 360 var dir = std.testing.tmpDir(.{}); 361 defer dir.cleanup(); 362 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)}); 363 defer allocator.free(path); 364 365 var ci = CollectionIndex.open(allocator, path) catch |err| { 366 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)}); 367 return error.SkipZigTest; 368 }; 369 ci.deinit(); 370} 371 372test "collection index: basic put and get" { 373 const testing = std.testing; 374 const allocator = testing.allocator; 375 376 var dir = std.testing.tmpDir(.{}); 377 defer dir.cleanup(); 378 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)}); 379 defer allocator.free(path); 380 381 var ci = CollectionIndex.open(allocator, path) catch |err| { 382 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)}); 383 return error.SkipZigTest; 384 }; 385 defer ci.deinit(); 386 387 // add a single collection entry 388 try ci.addCollection("did:plc:alice", "app.bsky.feed.post"); 389 390 // verify via direct rocksdb get 391 var err_str: ?rocksdb.Data = null; 392 const rbc_key = try makeKey(allocator, "app.bsky.feed.post", "did:plc:alice"); 393 defer allocator.free(rbc_key); 394 const val = try ci.db.get(ci.rbc, rbc_key, &err_str); 395 try testing.expect(val != null); 396 if (val) |v| v.deinit(); 397} 398 399test "collection index: list and cursor" { 400 const testing = std.testing; 401 const allocator = testing.allocator; 402 403 var dir = std.testing.tmpDir(.{}); 404 defer dir.cleanup(); 405 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)}); 406 defer allocator.free(path); 407 408 var ci = CollectionIndex.open(allocator, path) catch |err| { 409 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)}); 410 return error.SkipZigTest; 411 }; 412 defer ci.deinit(); 413 414 try ci.addCollection("did:plc:alice", "app.bsky.feed.post"); 415 try ci.addCollection("did:plc:bob", "app.bsky.feed.post"); 416 try ci.addCollection("did:plc:carol", "app.bsky.graph.follow"); 417 418 // list repos by collection 419 var buf: [4096]u8 = undefined; 420 const result = try ci.listReposByCollection("app.bsky.feed.post", 100, null, &buf); 421 try testing.expectEqual(@as(usize, 2), result.count); 422 try testing.expectEqualStrings("did:plc:alice", result.getDid(0)); 423 try testing.expectEqualStrings("did:plc:bob", result.getDid(1)); 424 425 // cursor pagination 426 const result2 = try ci.listReposByCollection("app.bsky.feed.post", 100, "did:plc:alice", &buf); 427 try testing.expectEqual(@as(usize, 1), result2.count); 428 try testing.expectEqualStrings("did:plc:bob", result2.getDid(0)); 429} 430 431test "collection index: removeAll" { 432 const testing = std.testing; 433 const allocator = testing.allocator; 434 435 var dir = std.testing.tmpDir(.{}); 436 defer dir.cleanup(); 437 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)}); 438 defer allocator.free(path); 439 440 var ci = CollectionIndex.open(allocator, path) catch |err| { 441 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)}); 442 return error.SkipZigTest; 443 }; 444 defer ci.deinit(); 445 446 try ci.addCollection("did:plc:alice", "app.bsky.feed.post"); 447 try ci.addCollection("did:plc:alice", "app.bsky.feed.like"); 448 try ci.addCollection("did:plc:bob", "app.bsky.feed.post"); 449 450 try ci.removeAll("did:plc:alice"); 451 try testing.expect(!ci.hasCollections("did:plc:alice")); 452 453 var buf: [4096]u8 = undefined; 454 const result = try ci.listReposByCollection("app.bsky.feed.post", 100, null, &buf); 455 try testing.expectEqual(@as(usize, 1), result.count); 456 try testing.expectEqualStrings("did:plc:bob", result.getDid(0)); 457} 458 459test "collection index: listKnownCollections" { 460 const testing = std.testing; 461 const allocator = testing.allocator; 462 463 var dir = std.testing.tmpDir(.{}); 464 defer dir.cleanup(); 465 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)}); 466 defer allocator.free(path); 467 468 var ci = CollectionIndex.open(allocator, path) catch |err| { 469 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)}); 470 return error.SkipZigTest; 471 }; 472 defer ci.deinit(); 473 474 try ci.addCollection("did:plc:alice", "app.bsky.feed.post"); 475 try ci.addCollection("did:plc:bob", "app.bsky.feed.post"); 476 try ci.addCollection("did:plc:alice", "app.bsky.feed.like"); 477 try ci.addCollection("did:plc:carol", "app.bsky.graph.follow"); 478 479 const collections = try ci.listKnownCollections(allocator); 480 defer { 481 for (collections) |c| allocator.free(c); 482 allocator.free(collections); 483 } 484 485 try testing.expectEqual(@as(usize, 3), collections.len); 486 487 // verify all three collections are present (order not guaranteed) 488 var found_post = false; 489 var found_like = false; 490 var found_follow = false; 491 for (collections) |c| { 492 if (std.mem.eql(u8, c, "app.bsky.feed.post")) found_post = true; 493 if (std.mem.eql(u8, c, "app.bsky.feed.like")) found_like = true; 494 if (std.mem.eql(u8, c, "app.bsky.graph.follow")) found_follow = true; 495 } 496 try testing.expect(found_post); 497 try testing.expect(found_like); 498 try testing.expect(found_follow); 499} 500 501test "collection index: idempotent add" { 502 const testing = std.testing; 503 const allocator = testing.allocator; 504 505 var dir = std.testing.tmpDir(.{}); 506 defer dir.cleanup(); 507 const path = try std.fmt.allocPrint(allocator, ".zig-cache/tmp/{s}", .{@as([]const u8, &dir.sub_path)}); 508 defer allocator.free(path); 509 510 var ci = CollectionIndex.open(allocator, path) catch |err| { 511 log.warn("skipping test (rocksdb open failed): {s}", .{@errorName(err)}); 512 return error.SkipZigTest; 513 }; 514 defer ci.deinit(); 515 516 // add same entry twice — should not duplicate 517 try ci.addCollection("did:plc:alice", "app.bsky.feed.post"); 518 try ci.addCollection("did:plc:alice", "app.bsky.feed.post"); 519 520 var buf: [4096]u8 = undefined; 521 const result = try ci.listReposByCollection("app.bsky.feed.post", 100, null, &buf); 522 try testing.expectEqual(@as(usize, 1), result.count); 523}