fuzzy find my records ken.waow.tech
embeddings pds search
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

filter noise collections and auto time-cutoff large repos

the streaming CAR walker unblocked very large repos at fetch/parse time,
but the embed pipeline still choked: pfrazee's 196k records (mostly
likes/follows/reposts) burned transient memory + embed time on records
with no semantic text. this was never going to scale beyond me.

two transparent concessions, surfaced honestly in the UI:

1. collection-level filter. records in DEFAULT_SKIP_COLLECTIONS (likes,
follows, reposts, blocks, listitems, threadgate, postgate, actor
status, chat declaration, sh.tangled graph follow/star) are dropped
before CBOR value decode — skipped records cost only the MST entry
iteration. applied in both the CAR walker and the listRecords
fallback for consistency.

2. auto time cutoff. if post-collection-filter count still exceeds
LARGE_REPO_THRESHOLD (30k), enable a 2-year TID cutoff. implemented
as a cheap count-only MST walk before the full walk — we learn the
post-filter size without decoding record values, then decide. TIDs
decode from base32-sortable rkeys in ~15 lines; non-TID rkeys (self,
etc.) are always kept.

pipeline shape becomes: openRepo → countOpened → decide filter →
walkOpened → close. the open/walk split keeps the mmap alive across
both passes so the count pass is essentially free.

pfrazee smoke: 195,908 total → 37,611 kept post collection filter →
cutoff kicks in → 35,682 final. zzstoatzz.io regression-clean: 17,350
total → 5,145 kept, 12,205 skipped, no cutoff.

status response gains skipped_by_collection, skipped_by_time,
applied_tid_cutoff_ms. pack-meta line in the UI shows the honest
breakdown: \"5,145 records · 190 collections · skipped 12,205
likes/follows/reposts\" for normal repos; \"35,682 records · 30
collections · skipped 158,297 likes/follows/reposts · indexed records
after 2023-04-01 (1,929 older records skipped)\" for pfrazee.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+372 -32
+16 -1
backend/src/assets/main.js
··· 290 290 const elapsed = indexStartMs ? ((Date.now() - indexStartMs) / 1000).toFixed(1) : null; 291 291 // hide the ready-status banner — this info lives in the pack-meta line now 292 292 hideStatus(); 293 - packStatsEl.textContent = elapsed && elapsed > 0 293 + const base = elapsed && elapsed > 0 294 294 ? `${rec.toLocaleString()} records · ${colCount} collections · ${elapsed}s` 295 295 : `${rec.toLocaleString()} records · ${colCount} collections`; 296 + 297 + // honest disclosure of what got dropped and why. the filter is part of 298 + // the indexing contract — the user should be able to see it at a glance. 299 + const parts = [base]; 300 + const skippedColl = j.skipped_by_collection || 0; 301 + const skippedTime = j.skipped_by_time || 0; 302 + const cutoffMs = j.applied_tid_cutoff_ms || 0; 303 + if (skippedColl > 0) { 304 + parts.push(`skipped ${skippedColl.toLocaleString()} likes/follows/reposts`); 305 + } 306 + if (cutoffMs > 0) { 307 + const cutoffDate = new Date(cutoffMs).toISOString().slice(0, 10); 308 + parts.push(`indexed records after ${cutoffDate} (${skippedTime.toLocaleString()} older records skipped)`); 309 + } 310 + packStatsEl.textContent = parts.join(" · "); 296 311 searchForm.classList.remove("hidden"); 297 312 searchInput.focus(); 298 313 renderPackActions(j);
+127 -2
backend/src/indexer.zig
··· 27 27 /// doesn't push us over. at 384 dims × 4 bytes, this is ~2600 vectors/chunk. 28 28 const BLOB_CHUNK_BYTES: usize = 4 * 1024 * 1024; 29 29 30 + /// collections to drop before embedding: membership, reactions, gating. 31 + /// everything here produces no semantic text — a like record is `{subject: 32 + /// <at-uri>}`, a follow is `{subject: <did>}`. embedding them is pure noise 33 + /// and burns the majority of a large repo's record count on nothing. 34 + /// kept by omission: app.bsky.feed.post, actor.profile, graph.list (name + 35 + /// description), feed.generator, graph.starterpack, whitewind, leaflet, 36 + /// tangled, cosmik, pinksea, anything else with actual prose. 37 + const DEFAULT_SKIP_COLLECTIONS = [_][]const u8{ 38 + "app.bsky.feed.like", 39 + "app.bsky.feed.repost", 40 + "app.bsky.feed.threadgate", 41 + "app.bsky.feed.postgate", 42 + "app.bsky.graph.follow", 43 + "app.bsky.graph.block", 44 + "app.bsky.graph.listblock", 45 + "app.bsky.graph.listitem", 46 + "app.bsky.graph.verification", 47 + "app.bsky.actor.status", 48 + "chat.bsky.actor.declaration", 49 + "sh.tangled.graph.follow", 50 + "sh.tangled.feed.star", 51 + }; 52 + 53 + /// if a repo still has more than this many records after collection 54 + /// filtering, auto-enable the time cutoff to keep transient memory and 55 + /// embed time bounded. chosen so a single large repo fits comfortably 56 + /// alongside one concurrent medium repo on ken's 4 GB fly machine: 57 + /// 30k records × (384 × 4B vector + arena overhead) ≈ 150 MB transient, 58 + /// ~46 MB steady-state vectors. 59 + const LARGE_REPO_THRESHOLD: usize = 30_000; 60 + 61 + /// window applied by the auto cutoff. 2 years is long enough to cover 62 + /// "the stuff a returning user actually remembers writing" while being 63 + /// short enough that even the most prolific accounts stay under the 64 + /// threshold post-filter. exposed in the status response so the UI can 65 + /// tell the user exactly what got cut. 66 + const AUTO_CUTOFF_WINDOW_US: i64 = 2 * 365 * 24 * 60 * 60 * 1_000_000; 67 + 30 68 pub const Status = enum { indexing, ready, @"error" }; 31 69 32 70 pub const Entry = struct { ··· 89 127 /// ETA against only the fresh work. 90 128 records_reused: usize, 91 129 indexed_at_ms: i64, 130 + /// dropped because their NSID is in DEFAULT_SKIP_COLLECTIONS (likes, 131 + /// follows, reposts, etc — no semantic text). surfaced in the status 132 + /// response so the UI can be honest about what didn't get indexed. 133 + skipped_by_collection: usize, 134 + /// dropped because the auto time cutoff kicked in on a large repo. 135 + /// 0 unless records_fetched (post-collection-filter) would have 136 + /// exceeded LARGE_REPO_THRESHOLD. 137 + skipped_by_time: usize, 138 + /// unix ms of the applied time cutoff, or 0 if no cutoff was applied. 139 + /// UI displays this as "indexed records newer than <date>". 140 + applied_tid_cutoff_ms: i64, 92 141 93 142 pub fn count(self: *const IndexedPack) usize { 94 143 if (self.dim == 0) return 0; ··· 179 228 .records_embedded = 0, 180 229 .records_reused = 0, 181 230 .indexed_at_ms = 0, 231 + .skipped_by_collection = 0, 232 + .skipped_by_time = 0, 233 + .applied_tid_cutoff_ms = 0, 182 234 }; 183 235 184 236 // duplicate the DID for the hash map key so the cache owns it ··· 215 267 }; 216 268 } 217 269 270 + /// fetch + walk the repo with the transparent filtering pipeline: 271 + /// 1. open (stream CAR to /tmp, mmap, build block index, locate MST root) 272 + /// 2. count pass with collection filter only → learn post-filter size 273 + /// 3. if > LARGE_REPO_THRESHOLD, auto-enable a 2-year time cutoff 274 + /// 4. full walk with the chosen filter → decoded records 275 + /// 276 + /// the count pass is cheap: it walks the MST but skips CBOR value decode 277 + /// and record materialization. the tradeoff is one extra MST traversal in 278 + /// exchange for deciding the cutoff before we spend any memory on 279 + /// filtered-out records. 280 + /// 281 + /// `pack.applied_tid_cutoff_ms` is written here so the status endpoint 282 + /// can surface exactly when the cutoff fell. 283 + fn openAndWalkRepo( 284 + arena: std.mem.Allocator, 285 + io: Io, 286 + pack: *IndexedPack, 287 + ) repo_walk.WalkError!repo_walk.WalkResult { 288 + var opened = try repo_walk.openRepo(arena, io, pack.pds_url, pack.did); 289 + defer opened.close(io); 290 + 291 + // pass 1: count-only with collection filter. this tells us the 292 + // post-filter size so we can decide whether to activate the time 293 + // cutoff. we don't decode record values in this pass, so it's cheap 294 + // relative to the full walk. 295 + const base_filter: repo_walk.WalkFilter = .{ 296 + .skip_collections = &DEFAULT_SKIP_COLLECTIONS, 297 + }; 298 + const count = try repo_walk.countOpened(arena, &opened, base_filter); 299 + std.log.info( 300 + " car count: {d} kept (would skip {d} by collection)", 301 + .{ count.kept, count.skipped_by_collection }, 302 + ); 303 + 304 + // pass 2: full walk with the chosen filter (collection + optional time) 305 + var filter = base_filter; 306 + if (count.kept > LARGE_REPO_THRESHOLD) { 307 + const now_ns = Io.Timestamp.now(io, .real).nanoseconds; 308 + const now_us: i64 = @intCast(@divTrunc(now_ns, 1000)); 309 + const cutoff_us = now_us - AUTO_CUTOFF_WINDOW_US; 310 + filter.min_rkey_tid_us = cutoff_us; 311 + pack.applied_tid_cutoff_ms = @divTrunc(cutoff_us, 1000); 312 + std.log.info( 313 + " large repo ({d} > {d}) — applying time cutoff at {d} ms", 314 + .{ count.kept, LARGE_REPO_THRESHOLD, pack.applied_tid_cutoff_ms }, 315 + ); 316 + } 317 + 318 + return try repo_walk.walkOpened(arena, &opened, pack.did, filter); 319 + } 320 + 218 321 fn doIndex(job: *Job) !void { 219 322 const pack = job.pack; 220 323 const arena = pack.arena.allocator(); ··· 243 346 var collection_of: std.ArrayList([]const u8) = .empty; 244 347 var per_collection: std.ArrayList(CollectionCount) = .empty; 245 348 246 - if (repo_walk.walkRepo(scratch_alloc, job.io, pack.pds_url, pack.did)) |walked| { 247 - std.log.info(" car walk: {d} records, {d} KB", .{ walked.records.len, walked.car_bytes / 1024 }); 349 + if (openAndWalkRepo(scratch_alloc, job.io, pack)) |walked| { 350 + std.log.info( 351 + " car walk: {d} records kept, {d} KB, skipped {d} by collection, {d} by time", 352 + .{ 353 + walked.records.len, 354 + walked.car_bytes / 1024, 355 + walked.skipped_by_collection, 356 + walked.skipped_by_time, 357 + }, 358 + ); 248 359 for (walked.records) |r| { 249 360 try all_records.append(scratch_alloc, .{ 250 361 .uri = r.uri, ··· 254 365 try collection_of.append(scratch_alloc, r.collection); 255 366 } 256 367 pack.records_fetched = all_records.items.len; 368 + pack.skipped_by_collection = walked.skipped_by_collection; 369 + pack.skipped_by_time = walked.skipped_by_time; 257 370 258 371 // derive per-collection counts from the flat walk output. records 259 372 // come out lex-sorted so same-collection runs are contiguous. ··· 278 391 std.log.info(" {d} collections (listRecords fallback)", .{collections.len}); 279 392 280 393 for (collections) |collection| { 394 + // apply the same collection allow-list as the CAR walker. 395 + // listRecords can't filter server-side, so we just skip the 396 + // entire pagination loop for collections in the deny list. 397 + var skip = false; 398 + for (DEFAULT_SKIP_COLLECTIONS) |sc| { 399 + if (std.mem.eql(u8, collection, sc)) { 400 + skip = true; 401 + break; 402 + } 403 + } 404 + if (skip) continue; 405 + 281 406 var cursor: ?[]const u8 = null; 282 407 var count: usize = 0; 283 408 while (true) {
+226 -29
backend/src/repo_walk.zig
··· 59 59 records: []Record, 60 60 /// total CAR size in bytes (for logging / diagnostics) 61 61 car_bytes: usize, 62 + skipped_by_collection: usize, 63 + skipped_by_time: usize, 64 + }; 65 + 66 + /// skip predicate applied per-record during the MST walk. filtering happens 67 + /// before value decode/dupe, so skipped records cost roughly the MST entry 68 + /// iteration and nothing else. 69 + pub const WalkFilter = struct { 70 + /// exact NSID matches to drop (e.g. "app.bsky.feed.like"). comparison 71 + /// is O(n*m) on entry count × skip list size — both are small in 72 + /// practice, not worth hashing. 73 + skip_collections: []const []const u8 = &.{}, 74 + /// drop records whose rkey decodes to a TID timestamp older than this 75 + /// cutoff (unix microseconds). records with non-TID rkeys (e.g. 76 + /// `self`, profile) are always kept — time cutoff is a best-effort 77 + /// filter, not a hard wall. 78 + min_rkey_tid_us: ?i64 = null, 79 + }; 80 + 81 + pub const CountResult = struct { 82 + /// records that would be kept under the filter 83 + kept: usize, 84 + skipped_by_collection: usize, 85 + skipped_by_time: usize, 62 86 }; 63 87 64 88 pub const WalkError = error{ ··· 92 116 /// concurrent `fetchRepoToTempFile` calls. 93 117 var fetch_seq: std.atomic.Value(u64) = .init(0); 94 118 95 - /// Fetch the repo CAR for `did` from `pds_url`, parse it, walk the MST, and 96 - /// return every record. All results live in `arena`. 97 - pub fn walkRepo( 119 + /// A repo that has been fetched + mmap'd and had its block index built. 120 + /// Holds resources (temp file, mmap) that must be released via `close`. 121 + /// Callers typically do `openRepo` → zero or more `walkOpened`/`countOpened` 122 + /// passes → `close`. The two-pass pattern (count → decide filter → walk) 123 + /// avoids re-fetching the CAR. 124 + pub const OpenedRepo = struct { 125 + fetched: FetchedCar, 126 + car_bytes: []const u8, 127 + index: BlockIndex, 128 + /// raw bytes of the MST root CID (entry point for walks). borrows 129 + /// from the arena the repo was opened with. 130 + data_cid: []const u8, 131 + 132 + pub fn close(self: *OpenedRepo, io: Io) void { 133 + self.fetched.destroy(io); 134 + } 135 + }; 136 + 137 + /// Fetch the repo CAR, stream-index its blocks, and locate the MST root. 138 + /// Returns an `OpenedRepo` that can be walked (or counted) any number of 139 + /// times without re-fetching. Caller must `close` when done. 140 + pub fn openRepo( 98 141 arena: Allocator, 99 142 io: Io, 100 143 pds_url: []const u8, 101 144 did: []const u8, 102 - ) WalkError!WalkResult { 145 + ) WalkError!OpenedRepo { 103 146 // 1. stream the HTTP response body straight into a temp file in /tmp, 104 147 // then mmap it read-only. peak RSS stays bounded no matter how big 105 148 // the repo is (kernel pages in what we touch, evicts what we don't). ··· 107 150 error.OutOfMemory => return error.OutOfMemory, 108 151 else => return error.FetchFailed, 109 152 }; 110 - defer fetched.destroy(io); 153 + errdefer fetched.destroy(io); 111 154 112 155 const car_bytes: []const u8 = fetched.bytes; 113 156 if (car_bytes.len == 0) return error.EmptyCar; ··· 153 196 else => return error.InvalidDataField, 154 197 }; 155 198 156 - // 4. walk the MST from the data root, yielding records in order 199 + return .{ 200 + .fetched = fetched, 201 + .car_bytes = car_bytes, 202 + .index = index, 203 + .data_cid = data_cid_raw, 204 + }; 205 + } 206 + 207 + /// MST walk that yields records through `arena`. Uses `filter` to drop 208 + /// records before value decode — skipped records cost only the MST entry 209 + /// iteration, not a CBOR roundtrip. 210 + pub fn walkOpened( 211 + arena: Allocator, 212 + opened: *const OpenedRepo, 213 + did: []const u8, 214 + filter: WalkFilter, 215 + ) WalkError!WalkResult { 157 216 var records: std.ArrayList(Record) = .empty; 158 - try walkNode(arena, car_bytes, index, data_cid_raw, did, &records); 217 + var ctx: WalkCtx = .{ 218 + .arena = arena, 219 + .car_bytes = opened.car_bytes, 220 + .index = opened.index, 221 + .did = did, 222 + .filter = filter, 223 + .out = &records, 224 + }; 225 + try walkNode(&ctx, opened.data_cid); 159 226 160 227 return .{ 161 228 .records = try records.toOwnedSlice(arena), 162 - .car_bytes = car_bytes.len, 229 + .car_bytes = opened.car_bytes.len, 230 + .skipped_by_collection = ctx.skipped_by_collection, 231 + .skipped_by_time = ctx.skipped_by_time, 232 + }; 233 + } 234 + 235 + /// Count-only MST walk. Applies the filter and counts kept records + skip 236 + /// buckets, but does not decode record values — cheap enough to run before 237 + /// the real walk as a "how many records would we embed?" probe. 238 + pub fn countOpened( 239 + arena: Allocator, 240 + opened: *const OpenedRepo, 241 + filter: WalkFilter, 242 + ) WalkError!CountResult { 243 + var ctx: WalkCtx = .{ 244 + .arena = arena, 245 + .car_bytes = opened.car_bytes, 246 + .index = opened.index, 247 + .did = "", 248 + .filter = filter, 249 + .out = null, 250 + }; 251 + try walkNode(&ctx, opened.data_cid); 252 + return .{ 253 + .kept = ctx.kept, 254 + .skipped_by_collection = ctx.skipped_by_collection, 255 + .skipped_by_time = ctx.skipped_by_time, 163 256 }; 164 257 } 258 + 259 + /// Convenience: open → walk → close in one call. Equivalent to the 260 + /// pre-filter walkRepo API; used by the smoke test and any caller that 261 + /// doesn't need the count-then-walk staging. 262 + pub fn walkRepo( 263 + arena: Allocator, 264 + io: Io, 265 + pds_url: []const u8, 266 + did: []const u8, 267 + filter: WalkFilter, 268 + ) WalkError!WalkResult { 269 + var opened = try openRepo(arena, io, pds_url, did); 270 + defer opened.close(io); 271 + return walkOpened(arena, &opened, did, filter); 272 + } 273 + 165 274 166 275 /// owns a temp file + its mmap. deletes the temp file on destroy. 167 276 const FetchedCar = struct { ··· 264 373 return car_bytes[range.off..][0..range.len]; 265 374 } 266 375 267 - fn walkNode( 376 + /// shared state for recursive MST traversal. `out == null` means 377 + /// count-only mode: apply filter, track stats, but skip CBOR value decode 378 + /// and record materialization. 379 + const WalkCtx = struct { 268 380 arena: Allocator, 269 381 car_bytes: []const u8, 270 382 index: BlockIndex, 271 - node_cid: []const u8, 272 383 did: []const u8, 273 - out: *std.ArrayList(Record), 274 - ) WalkError!void { 275 - const node_data = indexLookup(car_bytes, index, node_cid) orelse return error.BlockNotFound; 276 - const node = zat.mst.decodeMstNode(arena, node_data) catch return error.InvalidMstNode; 384 + filter: WalkFilter, 385 + out: ?*std.ArrayList(Record), 386 + kept: usize = 0, 387 + skipped_by_collection: usize = 0, 388 + skipped_by_time: usize = 0, 389 + }; 390 + 391 + const FilterDecision = enum { kept, skipped_collection, skipped_time }; 392 + 393 + fn applyFilter(filter: WalkFilter, collection_name: []const u8, rkey: []const u8) FilterDecision { 394 + for (filter.skip_collections) |sc| { 395 + if (std.mem.eql(u8, collection_name, sc)) return .skipped_collection; 396 + } 397 + if (filter.min_rkey_tid_us) |cutoff| { 398 + if (decodeTidMicros(rkey)) |tid_us| { 399 + if (tid_us < cutoff) return .skipped_time; 400 + } 401 + // non-TID rkeys fall through as kept — profile `self`, etc. 402 + } 403 + return .kept; 404 + } 405 + 406 + fn walkNode(ctx: *WalkCtx, node_cid: []const u8) WalkError!void { 407 + const node_data = indexLookup(ctx.car_bytes, ctx.index, node_cid) orelse return error.BlockNotFound; 408 + const node = zat.mst.decodeMstNode(ctx.arena, node_data) catch return error.InvalidMstNode; 277 409 278 410 // MST invariant: left subtree holds keys strictly less than every key 279 411 // in this node. walk it first so the output ends up in lexicographic 280 412 // order (same contract as pds.listRecords, alphabetical by collection). 281 - if (node.left) |left_cid| try walkNode(arena, car_bytes, index, left_cid, did, out); 413 + if (node.left) |left_cid| try walkNode(ctx, left_cid); 282 414 283 415 // prefix-compressed keys: entries[i].key = entries[i-1].key[0..prefix_len] ++ suffix. 284 416 // a 512-byte reconstruction buffer is plenty — atproto MST keys are ··· 290 422 const key = key_buf[0 .. entry.prefix_len + entry.key_suffix.len]; 291 423 292 424 const slash = std.mem.indexOfScalar(u8, key, '/') orelse continue; 293 - const collection = arena.dupe(u8, key[0..slash]) catch return error.OutOfMemory; 425 + const collection_name = key[0..slash]; 294 426 const rkey = key[slash + 1 ..]; 295 427 296 - // record block — raw DAG-CBOR bytes 297 - const value_data = indexLookup(car_bytes, index, entry.value) orelse continue; 298 - const value_cbor = zat.cbor.decodeAll(arena, value_data) catch continue; 299 - const value_json = try cborToJson(arena, value_cbor); 428 + switch (applyFilter(ctx.filter, collection_name, rkey)) { 429 + .skipped_collection => ctx.skipped_by_collection += 1, 430 + .skipped_time => ctx.skipped_by_time += 1, 431 + .kept => { 432 + ctx.kept += 1; 433 + if (ctx.out) |out| { 434 + // full emit: decode value, dupe strings, append. on 435 + // decode failure preserve old behavior and skip the 436 + // right subtree — `continue` bypasses the tree walk 437 + // at the bottom of the loop. 438 + const collection = ctx.arena.dupe(u8, collection_name) catch return error.OutOfMemory; 439 + const value_data = indexLookup(ctx.car_bytes, ctx.index, entry.value) orelse continue; 440 + const value_cbor = zat.cbor.decodeAll(ctx.arena, value_data) catch continue; 441 + const value_json = try cborToJson(ctx.arena, value_cbor); 300 442 301 - const uri = std.fmt.allocPrint(arena, "at://{s}/{s}/{s}", .{ did, collection, rkey }) catch return error.OutOfMemory; 302 - const cid_str = zat.multibase.encode(arena, .base32lower, entry.value) catch return error.OutOfMemory; 443 + const uri = std.fmt.allocPrint(ctx.arena, "at://{s}/{s}/{s}", .{ ctx.did, collection, rkey }) catch return error.OutOfMemory; 444 + const cid_str = zat.multibase.encode(ctx.arena, .base32lower, entry.value) catch return error.OutOfMemory; 303 445 304 - try out.append(arena, .{ 305 - .uri = uri, 306 - .cid = cid_str, 307 - .collection = collection, 308 - .value = value_json, 309 - }); 446 + try out.append(ctx.arena, .{ 447 + .uri = uri, 448 + .cid = cid_str, 449 + .collection = collection, 450 + .value = value_json, 451 + }); 452 + } 453 + }, 454 + } 310 455 311 456 // right subtree of THIS entry (keys between this one and the next) 312 - if (entry.tree) |tree_cid| try walkNode(arena, car_bytes, index, tree_cid, did, out); 457 + if (entry.tree) |tree_cid| try walkNode(ctx, tree_cid); 458 + } 459 + } 460 + 461 + /// Decode an atproto TID rkey (13 base32-sortable chars) to a unix 462 + /// microsecond timestamp. Returns null for non-TID rkeys — profile 463 + /// `self`, app.bsky.actor.declaration `self`, etc. Callers should treat 464 + /// a null as "unfilterable, keep by default". 465 + /// 466 + /// TID layout: 64 bits, top bit always 0 (sort-safe), next 53 bits = 467 + /// microseconds since unix epoch, bottom 10 bits = clock id. 468 + /// Alphabet: `234567abcdefghijklmnopqrstuvwxyz`. 469 + fn decodeTidMicros(rkey: []const u8) ?i64 { 470 + if (rkey.len != 13) return null; 471 + var val: u64 = 0; 472 + for (rkey) |c| { 473 + const digit: u64 = switch (c) { 474 + '2'...'7' => @intCast(c - '2'), 475 + 'a'...'z' => @intCast(@as(u32, c - 'a') + 6), 476 + else => return null, 477 + }; 478 + val = (val << 5) | digit; 479 + } 480 + if (val >> 63 != 0) return null; // top bit must be 0 481 + const us = (val >> 10) & ((@as(u64, 1) << 53) - 1); 482 + return @intCast(us); 483 + } 484 + 485 + test "decodeTidMicros: known TID round trip" { 486 + // 3juj-5m5xfs2v = ??? — use a TID encoding a specific time 487 + // pick unix epoch 2023-06-01T00:00:00Z = 1685577600 seconds 488 + // = 1685577600000000 microseconds 489 + // shift left 10 bits = 1725031526400000000 << 10 actually wait 490 + // let's encode then decode for self-consistency instead 491 + const us: u64 = 1685577600000000; 492 + const clock: u64 = 0; 493 + var val: u64 = (us << 10) | clock; 494 + // encode to 13 base32-sortable chars 495 + var buf: [13]u8 = undefined; 496 + var i: usize = 13; 497 + while (i > 0) { 498 + i -= 1; 499 + const d: u8 = @intCast(val & 0x1f); 500 + buf[i] = if (d < 6) '2' + d else 'a' + (d - 6); 501 + val >>= 5; 313 502 } 503 + const decoded = decodeTidMicros(&buf) orelse return error.DecodeFailed; 504 + try std.testing.expectEqual(@as(i64, @intCast(us)), decoded); 505 + } 506 + 507 + test "decodeTidMicros: non-TID rkey returns null" { 508 + try std.testing.expectEqual(@as(?i64, null), decodeTidMicros("self")); 509 + try std.testing.expectEqual(@as(?i64, null), decodeTidMicros("")); 510 + try std.testing.expectEqual(@as(?i64, null), decodeTidMicros("too-long-for-a-tid-abc")); 314 511 } 315 512 316 513 /// Convert a zat.cbor.Value into a std.json.Value. This exists so records
+3
backend/src/server.zig
··· 641 641 try buf.print(alloc, "\"records_fetched\":{d},", .{pack.records_fetched}); 642 642 try buf.print(alloc, "\"records_embedded\":{d},", .{pack.records_embedded}); 643 643 try buf.print(alloc, "\"records_reused\":{d},", .{pack.records_reused}); 644 + try buf.print(alloc, "\"skipped_by_collection\":{d},", .{pack.skipped_by_collection}); 645 + try buf.print(alloc, "\"skipped_by_time\":{d},", .{pack.skipped_by_time}); 646 + try buf.print(alloc, "\"applied_tid_cutoff_ms\":{d},", .{pack.applied_tid_cutoff_ms}); 644 647 try buf.print(alloc, "\"count\":{d},", .{pack.count()}); 645 648 try buf.print(alloc, "\"indexed_at_ms\":{d},", .{pack.indexed_at_ms}); 646 649 try buf.print(alloc, "\"build_ms\":{d},", .{pack.build_ms});