atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: bound RocksDB memory with shared block cache and index caching

Open collection index with tuned options via raw C API:
- 256 MB shared LRU block cache across all column families
- cache_index_and_filter_blocks=true (prevents unbounded index growth)
- pin_l0_filter_and_index_blocks_in_cache=true
- 16 KB block size (4x default, reduces index size)
- 32 MB write buffers, max 2 concurrent per CF

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz a65e8e68 c27db6a0

+131 -66
+1
build.zig
··· 26 26 .{ .name = "websocket", .module = websocket.module("websocket") }, 27 27 .{ .name = "pg", .module = pg.module("pg") }, 28 28 .{ .name = "rocksdb", .module = rocksdb.module("bindings") }, 29 + .{ .name = "rocksdb_c", .module = rocksdb.module("rocksdb") }, 29 30 }; 30 31 31 32 // relay executable
+130 -66
src/collection_index.zig
··· 11 11 12 12 const std = @import("std"); 13 13 const rocksdb = @import("rocksdb"); 14 + const rdb = @import("rocksdb_c"); 14 15 15 16 const Allocator = std.mem.Allocator; 16 17 const log = std.log.scoped(.collection_index); 17 18 18 19 const separator = '\x00'; 20 + 21 + /// shared block cache size — bounds total read memory (indexes + data + filters) 22 + /// across all column families. with cache_index_and_filter_blocks=true, this is 23 + /// the hard cap on RocksDB's read-path memory. 256 MB is conservative for the 24 + /// collection index workload (mostly sequential writes + prefix scans). 25 + const block_cache_bytes: usize = 256 * 1024 * 1024; 19 26 20 27 pub const CollectionIndex = struct { 21 - db: rocksdb.DB, 22 - rbc: rocksdb.ColumnFamilyHandle, 23 - cbr: rocksdb.ColumnFamilyHandle, 28 + db: *rdb.rocksdb_t, 29 + rbc: *rdb.rocksdb_column_family_handle_t, 30 + cbr: *rdb.rocksdb_column_family_handle_t, 31 + default_cf: *rdb.rocksdb_column_family_handle_t, 32 + cache: *rdb.rocksdb_cache_t, 24 33 allocator: Allocator, 25 34 26 35 pub fn open(allocator: Allocator, data_dir: []const u8) !CollectionIndex { 27 - var err_str: ?rocksdb.Data = null; 36 + // shared LRU block cache — bounds total read memory across all CFs 37 + const cache = rdb.rocksdb_cache_create_lru(block_cache_bytes) orelse return error.RocksDBOpen; 38 + errdefer rdb.rocksdb_cache_destroy(cache); 39 + 40 + // block-based table options — force indexes/filters into the bounded cache 41 + const bbo = rdb.rocksdb_block_based_options_create() orelse return error.RocksDBOpen; 42 + defer rdb.rocksdb_block_based_options_destroy(bbo); 43 + rdb.rocksdb_block_based_options_set_block_cache(bbo, cache); 44 + rdb.rocksdb_block_based_options_set_block_size(bbo, 16 * 1024); // 16 KB (reduces index size vs 4 KB default) 45 + rdb.rocksdb_block_based_options_set_cache_index_and_filter_blocks(bbo, 1); 46 + rdb.rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(bbo, 1); 28 47 29 - const db, const families = rocksdb.DB.open( 30 - allocator, 31 - data_dir, 32 - .{ 33 - .create_if_missing = true, 34 - .create_missing_column_families = true, 35 - }, 36 - &.{ 37 - .{ .name = "default" }, 38 - .{ .name = "rbc" }, 39 - .{ .name = "cbr" }, 40 - }, 41 - false, 42 - &err_str, 43 - ) catch { 44 - if (err_str) |e| { 45 - log.err("rocksdb open failed: {s}", .{e.data}); 46 - e.deinit(); 48 + // per-CF options: 32 MB write buffer, max 2 concurrent memtables 49 + const cf_opts = rdb.rocksdb_options_create() orelse return error.RocksDBOpen; 50 + defer rdb.rocksdb_options_destroy(cf_opts); 51 + rdb.rocksdb_options_set_block_based_table_factory(cf_opts, bbo); 52 + rdb.rocksdb_options_set_write_buffer_size(cf_opts, 32 * 1024 * 1024); 53 + rdb.rocksdb_options_set_max_write_buffer_number(cf_opts, 2); 54 + 55 + // DB-level options 56 + const db_opts = rdb.rocksdb_options_create() orelse return error.RocksDBOpen; 57 + defer rdb.rocksdb_options_destroy(db_opts); 58 + rdb.rocksdb_options_set_create_if_missing(db_opts, 1); 59 + rdb.rocksdb_options_set_create_missing_column_families(db_opts, 1); 60 + 61 + // null-terminate path for C API 62 + var path_buf: [4096]u8 = @splat(0); 63 + if (data_dir.len >= path_buf.len) return error.RocksDBOpen; 64 + @memcpy(path_buf[0..data_dir.len], data_dir); 65 + 66 + const cf_names: [3][*c]const u8 = .{ "default", "rbc", "cbr" }; 67 + const cf_options: [3]?*const rdb.rocksdb_options_t = .{ cf_opts, cf_opts, cf_opts }; 68 + var cf_handles: [3]?*rdb.rocksdb_column_family_handle_t = .{ null, null, null }; 69 + var err: ?[*:0]u8 = null; 70 + 71 + const db = rdb.rocksdb_open_column_families( 72 + db_opts, 73 + &path_buf, 74 + 3, 75 + &cf_names, 76 + @ptrCast(&cf_options), 77 + @ptrCast(&cf_handles), 78 + @ptrCast(&err), 79 + ) orelse { 80 + if (err) |e| { 81 + log.err("rocksdb open failed: {s}", .{std.mem.span(e)}); 82 + rdb.rocksdb_free(@ptrCast(e)); 47 83 } 48 84 return error.RocksDBOpen; 49 85 }; 50 - defer allocator.free(families); 51 86 52 - // find column family handles by name 53 - var rbc: ?rocksdb.ColumnFamilyHandle = null; 54 - var cbr: ?rocksdb.ColumnFamilyHandle = null; 55 - for (families) |cf| { 56 - if (std.mem.eql(u8, cf.name, "rbc")) rbc = cf.handle; 57 - if (std.mem.eql(u8, cf.name, "cbr")) cbr = cf.handle; 58 - } 59 - 60 - log.info("collection index opened at {s}", .{data_dir}); 87 + log.info("collection index opened at {s} (block cache: {d} MB, indexes in cache)", .{ 88 + data_dir, 89 + block_cache_bytes / (1024 * 1024), 90 + }); 61 91 62 92 return .{ 63 93 .db = db, 64 - .rbc = rbc orelse return error.MissingColumnFamily, 65 - .cbr = cbr orelse return error.MissingColumnFamily, 94 + .default_cf = cf_handles[0] orelse return error.MissingColumnFamily, 95 + .rbc = cf_handles[1] orelse return error.MissingColumnFamily, 96 + .cbr = cf_handles[2] orelse return error.MissingColumnFamily, 97 + .cache = cache, 66 98 .allocator = allocator, 67 99 }; 68 100 } 69 101 70 102 pub fn deinit(self: *CollectionIndex) void { 71 - self.db.deinit(); 103 + rdb.rocksdb_column_family_handle_destroy(self.default_cf); 104 + rdb.rocksdb_column_family_handle_destroy(self.rbc); 105 + rdb.rocksdb_column_family_handle_destroy(self.cbr); 106 + rdb.rocksdb_close(self.db); 107 + rdb.rocksdb_cache_destroy(self.cache); 108 + } 109 + 110 + // --- raw C API helpers (replacing rocksdb-zig wrapper methods) --- 111 + 112 + fn dbWrite(self: *CollectionIndex, batch: rocksdb.WriteBatch) !void { 113 + const opts = rdb.rocksdb_writeoptions_create() orelse return error.WriteFailed; 114 + defer rdb.rocksdb_writeoptions_destroy(opts); 115 + var err: ?[*:0]u8 = null; 116 + rdb.rocksdb_write(self.db, opts, @ptrCast(batch.inner), @ptrCast(&err)); 117 + if (err) |e| { 118 + defer rdb.rocksdb_free(@ptrCast(e)); 119 + return error.WriteFailed; 120 + } 121 + } 122 + 123 + fn dbIterator(self: *CollectionIndex, cf: *rdb.rocksdb_column_family_handle_t, direction: rocksdb.IteratorDirection, start: ?[]const u8) rocksdb.Iterator { 124 + const opts = rdb.rocksdb_readoptions_create(); 125 + defer if (opts) |o| rdb.rocksdb_readoptions_destroy(o); 126 + const it: *rdb.rocksdb_iterator_t = rdb.rocksdb_create_iterator_cf(self.db, opts, cf) orelse unreachable; 127 + if (start) |s| { 128 + switch (direction) { 129 + .forward => rdb.rocksdb_iter_seek(it, s.ptr, s.len), 130 + .reverse => rdb.rocksdb_iter_seek_for_prev(it, s.ptr, s.len), 131 + } 132 + } else { 133 + switch (direction) { 134 + .forward => rdb.rocksdb_iter_seek_to_first(it), 135 + .reverse => rdb.rocksdb_iter_seek_to_last(it), 136 + } 137 + } 138 + return .{ 139 + .raw = .{ .inner = @ptrCast(it) }, 140 + .direction = direction, 141 + .done = false, 142 + }; 143 + } 144 + 145 + fn dbGet(self: *CollectionIndex, cf: *rdb.rocksdb_column_family_handle_t, key: []const u8) ?rocksdb.Data { 146 + var val_len: usize = 0; 147 + const opts = rdb.rocksdb_readoptions_create(); 148 + defer if (opts) |o| rdb.rocksdb_readoptions_destroy(o); 149 + var err: ?[*:0]u8 = null; 150 + const val = rdb.rocksdb_get_cf(self.db, opts, cf, key.ptr, key.len, &val_len, @ptrCast(&err)); 151 + if (err) |e| { 152 + rdb.rocksdb_free(@ptrCast(e)); 153 + return null; 154 + } 155 + if (val) |v| { 156 + return .{ .data = v[0..val_len], .free = @ptrCast(&rdb.rocksdb_free) }; 157 + } 158 + return null; 72 159 } 73 160 74 161 /// add a (collection, did) entry to both indexes. 75 162 /// idempotent — overwrites are no-ops for empty values. 76 163 pub fn addCollection(self: *CollectionIndex, did: []const u8, collection: []const u8) !void { 77 - var err_str: ?rocksdb.Data = null; 78 - 79 164 const rbc_key = makeKey(self.allocator, collection, did) catch return; 80 165 defer self.allocator.free(rbc_key); 81 166 const cbr_key = makeKey(self.allocator, did, collection) catch return; ··· 87 172 batch.put(self.rbc, rbc_key, ""); 88 173 batch.put(self.cbr, cbr_key, ""); 89 174 90 - self.db.write(batch, &err_str) catch { 91 - if (err_str) |e| { 92 - log.warn("addCollection write failed: {s}", .{e.data}); 93 - e.deinit(); 94 - } 95 - return error.WriteFailed; 96 - }; 175 + self.dbWrite(batch) catch return error.WriteFailed; 97 176 } 98 177 99 178 /// remove a (collection, did) entry from both indexes. 100 179 pub fn removeCollection(self: *CollectionIndex, did: []const u8, collection: []const u8) !void { 101 - var err_str: ?rocksdb.Data = null; 102 - 103 180 const rbc_key = makeKey(self.allocator, collection, did) catch return; 104 181 defer self.allocator.free(rbc_key); 105 182 const cbr_key = makeKey(self.allocator, did, collection) catch return; ··· 111 188 batch.delete(self.rbc, rbc_key); 112 189 batch.delete(self.cbr, cbr_key); 113 190 114 - self.db.write(batch, &err_str) catch { 115 - if (err_str) |e| { 116 - log.warn("removeCollection write failed: {s}", .{e.data}); 117 - e.deinit(); 118 - } 119 - return error.WriteFailed; 120 - }; 191 + self.dbWrite(batch) catch return error.WriteFailed; 121 192 } 122 193 123 194 /// remove all collection entries for a DID (e.g. account tombstone). ··· 136 207 defer batch.deinit(); 137 208 138 209 // scan cbr for all collections belonging to this DID 139 - var iter = self.db.iterator(self.cbr, .forward, prefix); 210 + var iter = self.dbIterator(self.cbr, .forward, prefix); 140 211 defer iter.deinit(); 141 212 142 213 while (try iter.next(&err_str)) |entry| { ··· 160 231 batch.delete(self.cbr, cbr_key); 161 232 } 162 233 163 - self.db.write(batch, &err_str) catch { 164 - if (err_str) |e| { 165 - log.warn("removeAll write failed: {s}", .{e.data}); 166 - e.deinit(); 167 - } 168 - return error.WriteFailed; 169 - }; 234 + self.dbWrite(batch) catch return error.WriteFailed; 170 235 } 171 236 172 237 /// process commit ops to update collection index. ··· 233 298 234 299 const prefix = seek_key[0 .. collection.len + 1]; // collection\0 235 300 236 - var iter = self.db.iterator(self.rbc, .forward, seek_key); 301 + var iter = self.dbIterator(self.rbc, .forward, seek_key); 237 302 defer iter.deinit(); 238 303 239 304 var count: usize = 0; ··· 280 345 defer seen.deinit(allocator); 281 346 282 347 // full scan of RBC — keys are collection\0did 283 - var iter = self.db.iterator(self.rbc, .forward, ""); 348 + var iter = self.dbIterator(self.rbc, .forward, ""); 284 349 defer iter.deinit(); 285 350 286 351 while (try iter.next(&err_str)) |entry| { ··· 317 382 prefix_buf[did.len] = separator; 318 383 const prefix = prefix_buf[0 .. did.len + 1]; 319 384 320 - var iter = self.db.iterator(self.cbr, .forward, prefix); 385 + var iter = self.dbIterator(self.cbr, .forward, prefix); 321 386 defer iter.deinit(); 322 387 323 388 if (iter.next(&err_str) catch null) |entry| { ··· 388 453 try ci.addCollection("did:plc:alice", "app.bsky.feed.post"); 389 454 390 455 // verify via direct rocksdb get 391 - var err_str: ?rocksdb.Data = null; 392 456 const rbc_key = try makeKey(allocator, "app.bsky.feed.post", "did:plc:alice"); 393 457 defer allocator.free(rbc_key); 394 - const val = try ci.db.get(ci.rbc, rbc_key, &err_str); 458 + const val = ci.dbGet(ci.rbc, rbc_key); 395 459 try testing.expect(val != null); 396 460 if (val) |v| v.deinit(); 397 461 }