fuzzy find my records ken.waow.tech
embeddings pds search
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

stream getRepo body to /tmp and mmap it for the CAR walk

ken's old walker buffered the entire sync.getRepo response body in heap via
zat.HttpTransport.fetch (which always dupes into std.Io.Writer.Allocating),
then handed it to zat.car.readWithOptions which eagerly materialized a
StringHashMap of every CID → block content. that combination capped ken at
repos with <200k blocks and kept the whole CAR resident for the duration of
the walk. pfrazee.com (196k records, 72 MB CAR, 248k blocks counting MST
internals) sat just past the cliff.

this path now:
1. talks to std.http.Client directly for the one call that needs it,
streaming the response body straight into /tmp/ken-car-{seq}-{did}.car
via std.Io.File.Writer.initStreaming — no heap staging
2. mmaps the temp file read-only via std.Io.File.MemoryMap (kernel pages
in what we touch, evicts what we don't)
3. feeds the mmap slice to zat.car.streamBlocks (v0.3.0-alpha.24) and
builds a CID → {offset, len} index into the buffer via pointer
arithmetic — no block content duplication, ~16 bytes of value per
entry instead of 48
4. walks the MST through that index, delete-on-destroy cleans the
temp file whether the walk succeeds or errors out

every other ken call still uses zat.HttpTransport — only this one endpoint
needs streaming. bumps zat to v0.3.0-alpha.24 for car.streamBlocks.

smoke tested against two real repos via a standalone /tmp/ken_smoke.zig that
imports repo_walk.zig directly:

zzstoatzz.io: 17,348 records, 200 collections, 8.2 MB CAR
pfrazee.com: 195,904 records, 39 collections, 72.0 MB CAR

pfrazee walks end-to-end in ~11.5s on my laptop. 0 lingering /tmp/ken-car-*
files after either run. verified fly's /tmp is on the rootfs overlay (7.4G
free on the current machine), not tmpfs, so streaming to disk does not
compete with the 4 GB memory budget.

not yet addressed: indexer.zig still holds the full records[] + extracted
text + 384-dim vectors for every record simultaneously during embedding,
which for pfrazee would be ~300 MB of vectors on top of the mmap. walking
pfrazee is unblocked; embedding pfrazee needs a separate, record-at-a-time
pipeline that's planned as a follow-up.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+193 -26
+2 -2
backend/build.zig.zon
··· 5 5 .minimum_zig_version = "0.16.0", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.org/zat.dev/zat/archive/v0.3.0-alpha.22.tar.gz", 9 - .hash = "zat-0.3.0-alpha.22-5PuC7p9OCAAz4jpPNPoM4alGgqehPnpZCzIUy4LBajPh", 8 + .url = "https://tangled.org/zat.dev/zat/archive/v0.3.0-alpha.24.tar.gz", 9 + .hash = "zat-0.3.0-alpha.24-5PuC7vB8CACd9OTAS8jDuwVN4hQBYiAdfArjFjo_sWyD", 10 10 }, 11 11 }, 12 12 .paths = .{
+191 -24
backend/src/repo_walk.zig
··· 5 5 //! pulls the entire repo as a CAR file (DAG-CBOR blocks + MST structure), 6 6 //! and yields records by walking the MST locally. 7 7 //! 8 + //! three-stage, streaming-friendly fetch + parse: 9 + //! 1. HTTP GET /xrpc/com.atproto.sync.getRepo, draining the response body 10 + //! directly into a temp file in /tmp. no intermediate heap buffer, so 11 + //! peak RSS is bounded regardless of repo size. 12 + //! 2. mmap the temp file read-only and feed the slice to 13 + //! `zat.car.streamBlocks`, building a `CID → byte range` index. 14 + //! 3. MST walk starts at the commit root, looks up each child CID by 15 + //! range, decodes the MST/record block on demand, and yields records. 16 + //! 17 + //! the mmap + the index are the only things resident beyond the arena's 18 + //! per-record decode churn. kernel handles paging for the CAR bytes. temp 19 + //! file is deleted before walkRepo returns (success or failure). 20 + //! 21 + //! this unblocks very large repos (pfrazee.com sits at ~248k blocks / 72 MB, 22 + //! well past the heap-based approach on a 4 GB fly machine running two 23 + //! concurrent indexes). 24 + //! 25 + //! we talk to std.http.Client directly for this one call instead of going 26 + //! through zat.HttpTransport.fetch — the latter always buffers the whole 27 + //! body via std.Io.Writer.Allocating, and for multi-hundred-MB repos that 28 + //! defeats the point. every other ken call still uses the zat helper. 29 + //! 8 30 //! relies on zat for the protocol primitives: 9 - //! - zat.car.read / findBlock for CAR parsing 31 + //! - zat.car.streamBlocks for CAR parsing (zero-alloc iterator) 10 32 //! - zat.mst.decodeMstNode for MST node decoding 11 - //! - zat.cbor.decodeAll for per-record DAG-CBOR decoding 33 + //! - zat.cbor.decodeAll for per-block DAG-CBOR decoding 12 34 //! - zat.multibase.base32lower.encode for CID → "bafy..." stringification 13 35 //! 14 36 //! records come out with the same (uri, cid, value) shape as pds.listRecords ··· 52 74 OutOfMemory, 53 75 }; 54 76 77 + /// entry in the CID → byte-range index. `off` / `len` point into the CAR 78 + /// response buffer. 16 bytes per block instead of 48+ in a StringHashMap 79 + /// of slices. 80 + const BlockRange = packed struct { 81 + off: u64, 82 + len: u32, 83 + _pad: u32 = 0, 84 + }; 85 + 86 + /// CID bytes → byte range inside `car_bytes`. keys borrow from the CAR 87 + /// buffer (zat's BlockIterator yields zero-copy cid slices), so this is 88 + /// ~16 bytes of value + a few bytes of hashmap overhead per block. 89 + const BlockIndex = std.StringHashMapUnmanaged(BlockRange); 90 + 91 + /// process-local counter used to generate unique temp-file names for 92 + /// concurrent `fetchRepoToTempFile` calls. 93 + var fetch_seq: std.atomic.Value(u64) = .init(0); 94 + 55 95 /// Fetch the repo CAR for `did` from `pds_url`, parse it, walk the MST, and 56 96 /// return every record. All results live in `arena`. 57 97 pub fn walkRepo( ··· 60 100 pds_url: []const u8, 61 101 did: []const u8, 62 102 ) WalkError!WalkResult { 63 - // 1. HTTP GET /xrpc/com.atproto.sync.getRepo?did=... 64 - var transport = zat.HttpTransport.init(io, arena); 65 - defer transport.deinit(); 66 - const url = std.fmt.allocPrint(arena, "{s}/xrpc/com.atproto.sync.getRepo?did={s}", .{ pds_url, did }) catch return error.OutOfMemory; 67 - const result = transport.fetch(.{ .url = url }) catch return error.FetchFailed; 68 - if (result.status != .ok) return error.FetchFailed; 69 - const car_bytes = result.body; 103 + // 1. stream the HTTP response body straight into a temp file in /tmp, 104 + // then mmap it read-only. peak RSS stays bounded no matter how big 105 + // the repo is (kernel pages in what we touch, evicts what we don't). 106 + var fetched = fetchRepoToTempFile(arena, io, pds_url, did) catch |err| switch (err) { 107 + error.OutOfMemory => return error.OutOfMemory, 108 + else => return error.FetchFailed, 109 + }; 110 + defer fetched.destroy(io); 111 + 112 + const car_bytes: []const u8 = fetched.bytes; 70 113 if (car_bytes.len == 0) return error.EmptyCar; 71 114 72 - // 2. parse CAR → blocks keyed by CID. bump the safety limits well above 73 - // zat's defaults (2 MB / 10k blocks) — a 17k-record repo lands around 74 - // 20 MB with ~18k blocks, and we want headroom for larger repos. 75 - const repo_car = zat.car.readWithOptions(arena, car_bytes, .{ 76 - .max_size = 256 * 1024 * 1024, 77 - .max_blocks = 200_000, 115 + // 2. stream the CAR once to extract roots + build a CID → byte-range 116 + // index. passing max_size = car_bytes.len disables zat's 2 MB guard 117 + // for trusted input we fetched ourselves. no max_blocks cap — the 118 + // iterator is O(1) space per block, so block count is bounded only 119 + // by CAR size (max_block_size still protects us from malformed 120 + // over-long blocks). 121 + var stream = zat.car.streamBlocks(arena, car_bytes, .{ 122 + .max_size = car_bytes.len, 78 123 }) catch return error.EmptyCar; 79 - if (repo_car.roots.len == 0) return error.EmptyCar; 124 + if (stream.roots.len == 0) return error.EmptyCar; 125 + 126 + var index: BlockIndex = .empty; 127 + // reserve in proportion to the CAR size — avg block is ~300 bytes in 128 + // practice, so car_bytes.len / 256 is a decent upper-bound estimate 129 + // that keeps us in a single rehash on all but the densest repos. 130 + try index.ensureTotalCapacity(arena, @intCast(@min(car_bytes.len / 256, std.math.maxInt(u32)))); 131 + 132 + while (true) { 133 + const block = stream.iter.next() catch |err| switch (err) { 134 + error.OutOfMemory => return error.OutOfMemory, 135 + else => return error.EmptyCar, 136 + } orelse break; 137 + // block.data is a zero-copy slice into car_bytes — recover its 138 + // byte range directly from pointer arithmetic, no extra state. 139 + const data_off: u64 = @intCast(@intFromPtr(block.data.ptr) - @intFromPtr(car_bytes.ptr)); 140 + try index.put(arena, block.cid_raw, .{ 141 + .off = data_off, 142 + .len = @intCast(block.data.len), 143 + }); 144 + } 80 145 81 146 // 3. commit block → data CID (MST root) 82 - const commit_cid_raw = repo_car.roots[0].raw; 83 - const commit_data = zat.car.findBlock(repo_car, commit_cid_raw) orelse return error.NoCommitBlock; 147 + const commit_cid_raw = stream.roots[0].raw; 148 + const commit_data = indexLookup(car_bytes, index, commit_cid_raw) orelse return error.NoCommitBlock; 84 149 const commit_value = zat.cbor.decodeAll(arena, commit_data) catch return error.InvalidCommit; 85 150 const data_cbor = commit_value.get("data") orelse return error.NoDataField; 86 151 const data_cid_raw = switch (data_cbor) { ··· 90 155 91 156 // 4. walk the MST from the data root, yielding records in order 92 157 var records: std.ArrayList(Record) = .empty; 93 - try walkNode(arena, repo_car, data_cid_raw, did, &records); 158 + try walkNode(arena, car_bytes, index, data_cid_raw, did, &records); 94 159 95 160 return .{ 96 161 .records = try records.toOwnedSlice(arena), ··· 98 163 }; 99 164 } 100 165 166 + /// owns a temp file + its mmap. deletes the temp file on destroy. 167 + const FetchedCar = struct { 168 + /// absolute path in /tmp, so we can unlink on destroy 169 + path: []const u8, 170 + file: std.Io.File, 171 + mmap: std.Io.File.MemoryMap, 172 + bytes: []const u8, 173 + 174 + fn destroy(self: *FetchedCar, io: Io) void { 175 + self.mmap.destroy(io); 176 + self.file.close(io); 177 + // best-effort delete; leaking a /tmp file on shutdown is fine 178 + std.Io.Dir.deleteFileAbsolute(io, self.path) catch {}; 179 + } 180 + }; 181 + 182 + /// GET /xrpc/com.atproto.sync.getRepo and drain the response body directly 183 + /// into a temp file, then mmap the file read-only. never holds the full 184 + /// CAR in heap. `path` is owned by `arena`. 185 + fn fetchRepoToTempFile( 186 + arena: Allocator, 187 + io: Io, 188 + pds_url: []const u8, 189 + did: []const u8, 190 + ) !FetchedCar { 191 + const url = try std.fmt.allocPrint(arena, "{s}/xrpc/com.atproto.sync.getRepo?did={s}", .{ pds_url, did }); 192 + 193 + // path: /tmp/ken-car-{seq}-{did-tail}.car — a process-local monotonic 194 + // counter is enough to disambiguate concurrent indexes (ken is a 195 + // single-process server); did-tail is purely for human debugging 196 + // when poking at /tmp by hand. 197 + const did_tail_start = if (std.mem.lastIndexOfScalar(u8, did, ':')) |i| i + 1 else 0; 198 + const did_tail = did[did_tail_start..@min(did_tail_start + 16, did.len)]; 199 + const seq = fetch_seq.fetchAdd(1, .monotonic); 200 + const path = try std.fmt.allocPrint(arena, "/tmp/ken-car-{d}-{s}.car", .{ seq, did_tail }); 201 + 202 + // --- create the temp file + wrap it in a buffered writer --- 203 + var out_file = try std.Io.Dir.createFileAbsolute(io, path, .{ .read = true }); 204 + errdefer { 205 + out_file.close(io); 206 + std.Io.Dir.deleteFileAbsolute(io, path) catch {}; 207 + } 208 + 209 + var write_buf: [64 * 1024]u8 = undefined; 210 + var out_writer = std.Io.File.Writer.initStreaming(out_file, io, &write_buf); 211 + 212 + // --- issue the HTTP request --- 213 + var client: std.http.Client = .{ .allocator = arena, .io = io }; 214 + defer client.deinit(); 215 + 216 + var req = try client.request(.GET, try std.Uri.parse(url), .{ 217 + // see notes in oauth.zig:pdsAuthedRequest — std.http.Client low-level 218 + // path doesn't transparently decompress, so force identity encoding 219 + // via the typed header slot (extra_headers is additive and produces 220 + // conflicting Accept-Encoding values). 221 + .headers = .{ .accept_encoding = .{ .override = "identity" } }, 222 + }); 223 + defer req.deinit(); 224 + try req.sendBodiless(); 225 + 226 + // 2 KB is enough for any reasonable redirect target — bsky.network's 227 + // sync.getRepo redirects to the user's actual PDS hostname, which is 228 + // at most a couple hundred bytes; matching oauth.zig's 4096-byte 229 + // auth_hdr_buf is overkill but cheap and future-proof. 230 + var redirect_buf: [2048]u8 = undefined; 231 + var response = try req.receiveHead(&redirect_buf); 232 + if (response.head.status != .ok) return error.FetchFailed; 233 + 234 + // --- drain body into the file --- 235 + const reader = response.reader(&.{}); 236 + _ = try reader.streamRemaining(&out_writer.interface); 237 + try out_writer.end(); 238 + 239 + const file_len = try out_file.length(io); 240 + if (file_len == 0) { 241 + out_file.close(io); 242 + std.Io.Dir.deleteFileAbsolute(io, path) catch {}; 243 + return error.EmptyCar; 244 + } 245 + 246 + // --- mmap the file read-only --- 247 + var mmap = try std.Io.File.MemoryMap.create(io, out_file, .{ 248 + .len = @intCast(file_len), 249 + .protection = .{ .read = true, .write = false }, 250 + .offset = 0, 251 + }); 252 + errdefer mmap.destroy(io); 253 + 254 + return .{ 255 + .path = path, 256 + .file = out_file, 257 + .mmap = mmap, 258 + .bytes = mmap.memory[0..@intCast(file_len)], 259 + }; 260 + } 261 + 262 + fn indexLookup(car_bytes: []const u8, index: BlockIndex, cid: []const u8) ?[]const u8 { 263 + const range = index.get(cid) orelse return null; 264 + return car_bytes[range.off..][0..range.len]; 265 + } 266 + 101 267 fn walkNode( 102 268 arena: Allocator, 103 - repo_car: zat.car.Car, 269 + car_bytes: []const u8, 270 + index: BlockIndex, 104 271 node_cid: []const u8, 105 272 did: []const u8, 106 273 out: *std.ArrayList(Record), 107 274 ) WalkError!void { 108 - const node_data = zat.car.findBlock(repo_car, node_cid) orelse return error.BlockNotFound; 275 + const node_data = indexLookup(car_bytes, index, node_cid) orelse return error.BlockNotFound; 109 276 const node = zat.mst.decodeMstNode(arena, node_data) catch return error.InvalidMstNode; 110 277 111 278 // MST invariant: left subtree holds keys strictly less than every key 112 279 // in this node. walk it first so the output ends up in lexicographic 113 280 // order (same contract as pds.listRecords, alphabetical by collection). 114 - if (node.left) |left_cid| try walkNode(arena, repo_car, left_cid, did, out); 281 + if (node.left) |left_cid| try walkNode(arena, car_bytes, index, left_cid, did, out); 115 282 116 283 // prefix-compressed keys: entries[i].key = entries[i-1].key[0..prefix_len] ++ suffix. 117 284 // a 512-byte reconstruction buffer is plenty — atproto MST keys are ··· 127 294 const rkey = key[slash + 1 ..]; 128 295 129 296 // record block — raw DAG-CBOR bytes 130 - const value_data = zat.car.findBlock(repo_car, entry.value) orelse continue; 297 + const value_data = indexLookup(car_bytes, index, entry.value) orelse continue; 131 298 const value_cbor = zat.cbor.decodeAll(arena, value_data) catch continue; 132 299 const value_json = try cborToJson(arena, value_cbor); 133 300 ··· 142 309 }); 143 310 144 311 // right subtree of THIS entry (keys between this one and the next) 145 - if (entry.tree) |tree_cid| try walkNode(arena, repo_car, tree_cid, did, out); 312 + if (entry.tree) |tree_cid| try walkNode(arena, car_bytes, index, tree_cid, did, out); 146 313 } 147 314 } 148 315