AppView in a box as a Vite plugin thing hatk.dev
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf: lazy block map, generator MST walk, reduce parallelism

- Replace eager block Map with LazyBlockMap that stores byte offsets
into the CAR buffer instead of copying block data
- Convert walkMst to a generator so entries are yielded and processed
one at a time instead of collecting 193k+ entries into an array
- Delete block offsets as they're consumed to allow mid-processing GC
- Add iterator support to LazyBlockMap for indexer compatibility
- Reduce default backfill parallelism from 5 to 3
- Bump max-old-space-size from 256 to 512 in Dockerfile template
- Add build/publish/release scripts to root package.json

Benchmarked against a 71MB CAR (244k blocks, 193k records):
peak heap dropped from OOM at 512MB to ~415MB with recovery to 109MB.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+67 -33
+1 -1
packages/hatk/package.json
··· 1 1 { 2 2 "name": "@hatk/hatk", 3 - "version": "0.0.1-alpha.8", 3 + "version": "0.0.1-alpha.9", 4 4 "license": "MIT", 5 5 "bin": { 6 6 "hatk": "dist/cli.js"
+7 -6
packages/hatk/src/backfill.ts
··· 1 - import { parseCarFrame } from './car.ts' 1 + import { parseCarFrame, type LazyBlockMap } from './car.ts' 2 2 import { cborDecode } from './cbor.ts' 3 3 import { walkMst } from './mst.ts' 4 4 import { ··· 181 181 throw httpErr 182 182 } 183 183 184 - let carBytes: Uint8Array | null = new Uint8Array(await res.arrayBuffer()) 184 + const carBytes = new Uint8Array(await res.arrayBuffer()) 185 185 carSizeBytes = carBytes.length 186 - let { roots, blocks }: { roots: string[]; blocks: Map<string, Uint8Array> | null } = parseCarFrame(carBytes) 187 - carBytes = null // free CAR bytes before bulk insert 186 + let { roots, blocks }: { roots: string[]; blocks: LazyBlockMap | null } = parseCarFrame(carBytes) 188 187 189 188 // Decode commit to get MST root 190 189 const rootData = blocks.get(roots[0]) ··· 216 215 const collection = entry.path.split('/')[0] 217 216 if (!collections.has(collection)) continue 218 217 219 - const blockData = blocks.get(entry.cid) 218 + const blockData = blocks!.get(entry.cid) 220 219 if (!blockData) continue 220 + blocks!.delete(entry.cid) // free block data as we go 221 221 222 222 try { 223 223 const { value: record } = cborDecode(blockData) ··· 240 240 }) 241 241 } 242 242 } 243 - blocks = null // free block map 243 + blocks!.free() 244 + blocks = null 244 245 if (chunk.length > 0) { 245 246 count += await bulkInsertRecords(chunk) 246 247 }
+52 -16
packages/hatk/src/car.ts
··· 50 50 } 51 51 52 52 /** 53 - * Parses a CARv1 binary frame into its root CIDs and block map. 53 + * A memory-efficient block map that stores byte offsets into the original CAR 54 + * buffer instead of copying block data. Implements the same `get`/`delete`/`size` 55 + * interface as `Map<string, Uint8Array>` so it can be used as a drop-in replacement. 56 + */ 57 + export class LazyBlockMap { 58 + private offsets: Map<string, [number, number]> 59 + private carBytes: Uint8Array | null 60 + 61 + constructor(carBytes: Uint8Array, offsets: Map<string, [number, number]>) { 62 + this.carBytes = carBytes 63 + this.offsets = offsets 64 + } 65 + 66 + get(cid: string): Uint8Array | undefined { 67 + const range = this.offsets.get(cid) 68 + if (!range || !this.carBytes) return undefined 69 + return this.carBytes.subarray(range[0], range[1]) 70 + } 71 + 72 + delete(cid: string): boolean { 73 + return this.offsets.delete(cid) 74 + } 75 + 76 + get size(): number { 77 + return this.offsets.size 78 + } 79 + 80 + *[Symbol.iterator](): IterableIterator<[string, Uint8Array]> { 81 + for (const [cid, range] of this.offsets) { 82 + if (!this.carBytes) return 83 + yield [cid, this.carBytes.subarray(range[0], range[1])] 84 + } 85 + } 86 + 87 + /** Release the underlying CAR buffer */ 88 + free(): void { 89 + this.carBytes = null 90 + this.offsets.clear() 91 + } 92 + } 93 + 94 + /** 95 + * Parses a CARv1 binary frame into its root CIDs and a lazy block map. 54 96 * 55 - * @param carBytes - Raw CAR file bytes (e.g. from `getRepo` or a firehose commit) 56 - * @returns `roots` — ordered list of root CID strings; `blocks` — map of CID string → raw block data 97 + * The block map stores byte offsets into `carBytes` rather than copying data, 98 + * reducing heap usage from O(total block bytes) to O(number of blocks * 16 bytes). 57 99 * 58 - * @example 59 - * ```ts 60 - * const car = new Uint8Array(await res.arrayBuffer()) 61 - * const { roots, blocks } = parseCarFrame(car) 62 - * const commitData = blocks.get(roots[0]) 63 - * ``` 100 + * @param carBytes - Raw CAR file bytes (e.g. from `getRepo` or a firehose commit) 101 + * @returns `roots` — ordered list of root CID strings; `blocks` — lazy block map 64 102 */ 65 103 export function parseCarFrame(carBytes: Uint8Array): { 66 104 roots: string[] 67 - blocks: Map<string, Uint8Array> 105 + blocks: LazyBlockMap 68 106 } { 69 107 let offset = 0 70 108 ··· 81 119 // so roots may already be decoded strings 82 120 const roots = (header.roots || []).map((root: any) => root?.$link ?? cidToString(root)) 83 121 84 - // Parse blocks: each is varint(len) + CID + data 85 - const blocks = new Map<string, Uint8Array>() 122 + // Build offset index: CID → [start, end] into carBytes 123 + const offsets = new Map<string, [number, number]>() 86 124 87 125 while (offset < carBytes.length) { 88 126 const [blockLen, afterBlockLen] = readVarint(carBytes, offset) ··· 93 131 const cid = cidToString(cidBytes) 94 132 95 133 const dataLen = blockLen - (afterCid - offset) 96 - const data = carBytes.slice(afterCid, afterCid + dataLen) 97 - 98 - blocks.set(cid, data) 134 + offsets.set(cid, [afterCid, afterCid + dataLen]) 99 135 offset = afterCid + dataLen 100 136 } 101 137 102 - return { roots, blocks } 138 + return { roots, blocks: new LazyBlockMap(carBytes, offsets) } 103 139 }
+1 -1
packages/hatk/src/config.ts
··· 78 78 signalCollections: backfillRaw.signalCollections || undefined, 79 79 repos: env.BACKFILL_REPOS ? env.BACKFILL_REPOS.split(',').map((s) => s.trim()) : backfillRaw.repos || undefined, 80 80 fullNetwork: env.BACKFILL_FULL_NETWORK ? env.BACKFILL_FULL_NETWORK === 'true' : backfillRaw.fullNetwork || false, 81 - parallelism: parseInt(env.BACKFILL_PARALLELISM || '') || backfillRaw.parallelism || 5, 81 + parallelism: parseInt(env.BACKFILL_PARALLELISM || '') || backfillRaw.parallelism || 3, 82 82 fetchTimeout: parseInt(env.BACKFILL_FETCH_TIMEOUT || '') || backfillRaw.fetchTimeout || 300, 83 83 maxRetries: parseInt(env.BACKFILL_MAX_RETRIES || '') || backfillRaw.maxRetries || 5, 84 84 },
+6 -9
packages/hatk/src/mst.ts
··· 5 5 cid: string // CID of the record block 6 6 } 7 7 8 - export function walkMst(blocks: Map<string, Uint8Array>, rootCid: string): MstEntry[] { 9 - const entries: MstEntry[] = [] 10 - 11 - function visit(cid: string, prefix: string): string { 8 + export function* walkMst(blocks: { get(cid: string): Uint8Array | undefined }, rootCid: string): Generator<MstEntry> { 9 + function* visit(cid: string, prefix: string): Generator<MstEntry, string> { 12 10 const data = blocks.get(cid) 13 11 if (!data) return prefix 14 12 const { value: node } = cborDecode(data) 15 13 16 14 // Visit left subtree 17 - if (node.l?.$link) visit(node.l.$link, prefix) 15 + if (node.l?.$link) yield* visit(node.l.$link, prefix) 18 16 19 17 let lastKey = prefix 20 18 for (const entry of node.e || []) { ··· 24 22 lastKey = fullKey 25 23 26 24 if (entry.v?.$link) { 27 - entries.push({ path: fullKey, cid: entry.v.$link }) 25 + yield { path: fullKey, cid: entry.v.$link } 28 26 } 29 27 30 28 // Visit right subtree 31 29 if (entry.t?.$link) { 32 - visit(entry.t.$link, lastKey) 30 + yield* visit(entry.t.$link, lastKey) 33 31 } 34 32 } 35 33 36 34 return lastKey 37 35 } 38 36 39 - visit(rootCid, '') 40 - return entries 37 + yield* visit(rootCid, '') 41 38 }