atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

Revert "fix: bound RocksDB memory with shared block cache and index caching"

This reverts commit a65e8e6842ea3b6c443cf0ce8fba7c2155f3d981.

zzstoatzz 14a34868 a65e8e68

+66 -131
-1
build.zig
··· 26 26 .{ .name = "websocket", .module = websocket.module("websocket") }, 27 27 .{ .name = "pg", .module = pg.module("pg") }, 28 28 .{ .name = "rocksdb", .module = rocksdb.module("bindings") }, 29 - .{ .name = "rocksdb_c", .module = rocksdb.module("rocksdb") }, 30 29 }; 31 30 32 31 // relay executable
+66 -130
src/collection_index.zig
··· 11 11 12 12 const std = @import("std"); 13 13 const rocksdb = @import("rocksdb"); 14 - const rdb = @import("rocksdb_c"); 15 14 16 15 const Allocator = std.mem.Allocator; 17 16 const log = std.log.scoped(.collection_index); 18 17 19 18 const separator = '\x00'; 20 - 21 - /// shared block cache size — bounds total read memory (indexes + data + filters) 22 - /// across all column families. with cache_index_and_filter_blocks=true, this is 23 - /// the hard cap on RocksDB's read-path memory. 256 MB is conservative for the 24 - /// collection index workload (mostly sequential writes + prefix scans). 25 - const block_cache_bytes: usize = 256 * 1024 * 1024; 26 19 27 20 pub const CollectionIndex = struct { 28 - db: *rdb.rocksdb_t, 29 - rbc: *rdb.rocksdb_column_family_handle_t, 30 - cbr: *rdb.rocksdb_column_family_handle_t, 31 - default_cf: *rdb.rocksdb_column_family_handle_t, 32 - cache: *rdb.rocksdb_cache_t, 21 + db: rocksdb.DB, 22 + rbc: rocksdb.ColumnFamilyHandle, 23 + cbr: rocksdb.ColumnFamilyHandle, 33 24 allocator: Allocator, 34 25 35 26 pub fn open(allocator: Allocator, data_dir: []const u8) !CollectionIndex { 36 - // shared LRU block cache — bounds total read memory across all CFs 37 - const cache = rdb.rocksdb_cache_create_lru(block_cache_bytes) orelse return error.RocksDBOpen; 38 - errdefer rdb.rocksdb_cache_destroy(cache); 39 - 40 - // block-based table options — force indexes/filters into the bounded cache 41 - const bbo = rdb.rocksdb_block_based_options_create() orelse return error.RocksDBOpen; 42 - defer rdb.rocksdb_block_based_options_destroy(bbo); 43 - rdb.rocksdb_block_based_options_set_block_cache(bbo, cache); 44 - rdb.rocksdb_block_based_options_set_block_size(bbo, 16 * 1024); // 16 KB (reduces index size vs 4 KB default) 45 - rdb.rocksdb_block_based_options_set_cache_index_and_filter_blocks(bbo, 1); 46 - rdb.rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(bbo, 1); 47 - 48 - // per-CF options: 32 MB write buffer, max 2 concurrent memtables 49 - const cf_opts = rdb.rocksdb_options_create() orelse return error.RocksDBOpen; 50 - defer rdb.rocksdb_options_destroy(cf_opts); 51 - rdb.rocksdb_options_set_block_based_table_factory(cf_opts, bbo); 52 - rdb.rocksdb_options_set_write_buffer_size(cf_opts, 32 * 1024 * 1024); 53 - rdb.rocksdb_options_set_max_write_buffer_number(cf_opts, 2); 54 - 55 - // DB-level options 56 - const db_opts = rdb.rocksdb_options_create() orelse return error.RocksDBOpen; 57 - defer rdb.rocksdb_options_destroy(db_opts); 58 - rdb.rocksdb_options_set_create_if_missing(db_opts, 1); 59 - rdb.rocksdb_options_set_create_missing_column_families(db_opts, 1); 60 - 61 - // null-terminate path for C API 62 - var path_buf: [4096]u8 = @splat(0); 63 - if (data_dir.len >= path_buf.len) return error.RocksDBOpen; 64 - @memcpy(path_buf[0..data_dir.len], data_dir); 65 - 66 - const cf_names: [3][*c]const u8 = .{ "default", "rbc", "cbr" }; 67 - const cf_options: [3]?*const rdb.rocksdb_options_t = .{ cf_opts, cf_opts, cf_opts }; 68 - var cf_handles: [3]?*rdb.rocksdb_column_family_handle_t = .{ null, null, null }; 69 - var err: ?[*:0]u8 = null; 27 + var err_str: ?rocksdb.Data = null; 70 28 71 - const db = rdb.rocksdb_open_column_families( 72 - db_opts, 73 - &path_buf, 74 - 3, 75 - &cf_names, 76 - @ptrCast(&cf_options), 77 - @ptrCast(&cf_handles), 78 - @ptrCast(&err), 79 - ) orelse { 80 - if (err) |e| { 81 - log.err("rocksdb open failed: {s}", .{std.mem.span(e)}); 82 - rdb.rocksdb_free(@ptrCast(e)); 29 + const db, const families = rocksdb.DB.open( 30 + allocator, 31 + data_dir, 32 + .{ 33 + .create_if_missing = true, 34 + .create_missing_column_families = true, 35 + }, 36 + &.{ 37 + .{ .name = "default" }, 38 + .{ .name = "rbc" }, 39 + .{ .name = "cbr" }, 40 + }, 41 + false, 42 + &err_str, 43 + ) catch { 44 + if (err_str) |e| { 45 + log.err("rocksdb open failed: {s}", .{e.data}); 46 + e.deinit(); 83 47 } 84 48 return error.RocksDBOpen; 85 49 }; 50 + defer allocator.free(families); 86 51 87 - log.info("collection index opened at {s} (block cache: {d} MB, indexes in cache)", .{ 88 - data_dir, 89 - block_cache_bytes / (1024 * 1024), 90 - }); 52 + // find column family handles by name 53 + var rbc: ?rocksdb.ColumnFamilyHandle = null; 54 + var cbr: ?rocksdb.ColumnFamilyHandle = null; 55 + for (families) |cf| { 56 + if (std.mem.eql(u8, cf.name, "rbc")) rbc = cf.handle; 57 + if (std.mem.eql(u8, cf.name, "cbr")) cbr = cf.handle; 58 + } 59 + 60 + log.info("collection index opened at {s}", .{data_dir}); 91 61 92 62 return .{ 93 63 .db = db, 94 - .default_cf = cf_handles[0] orelse return error.MissingColumnFamily, 95 - .rbc = cf_handles[1] orelse return error.MissingColumnFamily, 96 - .cbr = cf_handles[2] orelse return error.MissingColumnFamily, 97 - .cache = cache, 64 + .rbc = rbc orelse return error.MissingColumnFamily, 65 + .cbr = cbr orelse return error.MissingColumnFamily, 98 66 .allocator = allocator, 99 67 }; 100 68 } 101 69 102 70 pub fn deinit(self: *CollectionIndex) void { 103 - rdb.rocksdb_column_family_handle_destroy(self.default_cf); 104 - rdb.rocksdb_column_family_handle_destroy(self.rbc); 105 - rdb.rocksdb_column_family_handle_destroy(self.cbr); 106 - rdb.rocksdb_close(self.db); 107 - rdb.rocksdb_cache_destroy(self.cache); 108 - } 109 - 110 - // --- raw C API helpers (replacing rocksdb-zig wrapper methods) --- 111 - 112 - fn dbWrite(self: *CollectionIndex, batch: rocksdb.WriteBatch) !void { 113 - const opts = rdb.rocksdb_writeoptions_create() orelse return error.WriteFailed; 114 - defer rdb.rocksdb_writeoptions_destroy(opts); 115 - var err: ?[*:0]u8 = null; 116 - rdb.rocksdb_write(self.db, opts, @ptrCast(batch.inner), @ptrCast(&err)); 117 - if (err) |e| { 118 - defer rdb.rocksdb_free(@ptrCast(e)); 119 - return error.WriteFailed; 120 - } 121 - } 122 - 123 - fn dbIterator(self: *CollectionIndex, cf: *rdb.rocksdb_column_family_handle_t, direction: rocksdb.IteratorDirection, start: ?[]const u8) rocksdb.Iterator { 124 - const opts = rdb.rocksdb_readoptions_create(); 125 - defer if (opts) |o| rdb.rocksdb_readoptions_destroy(o); 126 - const it: *rdb.rocksdb_iterator_t = rdb.rocksdb_create_iterator_cf(self.db, opts, cf) orelse unreachable; 127 - if (start) |s| { 128 - switch (direction) { 129 - .forward => rdb.rocksdb_iter_seek(it, s.ptr, s.len), 130 - .reverse => rdb.rocksdb_iter_seek_for_prev(it, s.ptr, s.len), 131 - } 132 - } else { 133 - switch (direction) { 134 - .forward => rdb.rocksdb_iter_seek_to_first(it), 135 - .reverse => rdb.rocksdb_iter_seek_to_last(it), 136 - } 137 - } 138 - return .{ 139 - .raw = .{ .inner = @ptrCast(it) }, 140 - .direction = direction, 141 - .done = false, 142 - }; 143 - } 144 - 145 - fn dbGet(self: *CollectionIndex, cf: *rdb.rocksdb_column_family_handle_t, key: []const u8) ?rocksdb.Data { 146 - var val_len: usize = 0; 147 - const opts = rdb.rocksdb_readoptions_create(); 148 - defer if (opts) |o| rdb.rocksdb_readoptions_destroy(o); 149 - var err: ?[*:0]u8 = null; 150 - const val = rdb.rocksdb_get_cf(self.db, opts, cf, key.ptr, key.len, &val_len, @ptrCast(&err)); 151 - if (err) |e| { 152 - rdb.rocksdb_free(@ptrCast(e)); 153 - return null; 154 - } 155 - if (val) |v| { 156 - return .{ .data = v[0..val_len], .free = @ptrCast(&rdb.rocksdb_free) }; 157 - } 158 - return null; 71 + self.db.deinit(); 159 72 } 160 73 161 74 /// add a (collection, did) entry to both indexes. 162 75 /// idempotent — overwrites are no-ops for empty values. 163 76 pub fn addCollection(self: *CollectionIndex, did: []const u8, collection: []const u8) !void { 77 + var err_str: ?rocksdb.Data = null; 78 + 164 79 const rbc_key = makeKey(self.allocator, collection, did) catch return; 165 80 defer self.allocator.free(rbc_key); 166 81 const cbr_key = makeKey(self.allocator, did, collection) catch return; ··· 172 87 batch.put(self.rbc, rbc_key, ""); 173 88 batch.put(self.cbr, cbr_key, ""); 174 89 175 - self.dbWrite(batch) catch return error.WriteFailed; 90 + self.db.write(batch, &err_str) catch { 91 + if (err_str) |e| { 92 + log.warn("addCollection write failed: {s}", .{e.data}); 93 + e.deinit(); 94 + } 95 + return error.WriteFailed; 96 + }; 176 97 } 177 98 178 99 /// remove a (collection, did) entry from both indexes. 179 100 pub fn removeCollection(self: *CollectionIndex, did: []const u8, collection: []const u8) !void { 101 + var err_str: ?rocksdb.Data = null; 102 + 180 103 const rbc_key = makeKey(self.allocator, collection, did) catch return; 181 104 defer self.allocator.free(rbc_key); 182 105 const cbr_key = makeKey(self.allocator, did, collection) catch return; ··· 188 111 batch.delete(self.rbc, rbc_key); 189 112 batch.delete(self.cbr, cbr_key); 190 113 191 - self.dbWrite(batch) catch return error.WriteFailed; 114 + self.db.write(batch, &err_str) catch { 115 + if (err_str) |e| { 116 + log.warn("removeCollection write failed: {s}", .{e.data}); 117 + e.deinit(); 118 + } 119 + return error.WriteFailed; 120 + }; 192 121 } 193 122 194 123 /// remove all collection entries for a DID (e.g. account tombstone). ··· 207 136 defer batch.deinit(); 208 137 209 138 // scan cbr for all collections belonging to this DID 210 - var iter = self.dbIterator(self.cbr, .forward, prefix); 139 + var iter = self.db.iterator(self.cbr, .forward, prefix); 211 140 defer iter.deinit(); 212 141 213 142 while (try iter.next(&err_str)) |entry| { ··· 231 160 batch.delete(self.cbr, cbr_key); 232 161 } 233 162 234 - self.dbWrite(batch) catch return error.WriteFailed; 163 + self.db.write(batch, &err_str) catch { 164 + if (err_str) |e| { 165 + log.warn("removeAll write failed: {s}", .{e.data}); 166 + e.deinit(); 167 + } 168 + return error.WriteFailed; 169 + }; 235 170 } 236 171 237 172 /// process commit ops to update collection index. ··· 298 233 299 234 const prefix = seek_key[0 .. collection.len + 1]; // collection\0 300 235 301 - var iter = self.dbIterator(self.rbc, .forward, seek_key); 236 + var iter = self.db.iterator(self.rbc, .forward, seek_key); 302 237 defer iter.deinit(); 303 238 304 239 var count: usize = 0; ··· 345 280 defer seen.deinit(allocator); 346 281 347 282 // full scan of RBC — keys are collection\0did 348 - var iter = self.dbIterator(self.rbc, .forward, ""); 283 + var iter = self.db.iterator(self.rbc, .forward, ""); 349 284 defer iter.deinit(); 350 285 351 286 while (try iter.next(&err_str)) |entry| { ··· 382 317 prefix_buf[did.len] = separator; 383 318 const prefix = prefix_buf[0 .. did.len + 1]; 384 319 385 - var iter = self.dbIterator(self.cbr, .forward, prefix); 320 + var iter = self.db.iterator(self.cbr, .forward, prefix); 386 321 defer iter.deinit(); 387 322 388 323 if (iter.next(&err_str) catch null) |entry| { ··· 453 388 try ci.addCollection("did:plc:alice", "app.bsky.feed.post"); 454 389 455 390 // verify via direct rocksdb get 391 + var err_str: ?rocksdb.Data = null; 456 392 const rbc_key = try makeKey(allocator, "app.bsky.feed.post", "did:plc:alice"); 457 393 defer allocator.free(rbc_key); 458 - const val = ci.dbGet(ci.rbc, rbc_key); 394 + const val = try ci.db.get(ci.rbc, rbc_key, &err_str); 459 395 try testing.expect(val != null); 460 396 if (val) |v| v.deinit(); 461 397 }