AppView in a box as a Vite plugin thing hatk.dev
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf: streaming CAR parser to fix backfill OOMs

Replace buffered res.arrayBuffer() with incremental stream parsing.
Each block is .slice()d into its own Uint8Array, eliminating the single
large external ArrayBuffer that V8 can't GC (213MB → 64MB external).

Also adds diff-based backfill via `since` parameter with fallback to
full import when the PDS has compacted history.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+163 -22
+1 -1
packages/hatk/package.json
··· 1 1 { 2 2 "name": "@hatk/hatk", 3 - "version": "0.0.1-alpha.10", 3 + "version": "0.0.1-alpha.11", 4 4 "license": "MIT", 5 5 "bin": { 6 6 "hatk": "dist/cli.js"
+34 -21
packages/hatk/src/backfill.ts
··· 1 - import { parseCarFrame, type LazyBlockMap } from './car.ts' 1 + import { parseCarStream } from './car.ts' 2 2 import { cborDecode } from './cbor.ts' 3 3 import { walkMst } from './mst.ts' 4 4 import { 5 5 setRepoStatus, 6 6 getRepoStatus, 7 + getRepoRev, 7 8 getRepoRetryInfo, 8 9 listRetryEligibleRepos, 9 10 listPendingRepos, ··· 162 163 let error: string | undefined 163 164 let resolvedPds: string | undefined 164 165 let resolvedHandle: string | null = null 166 + let resolvedSince: string | null = null 165 167 let retryCount: number | undefined 166 168 let retryAfter: number | undefined 167 169 ··· 172 174 resolvedPds = pdsUrl 173 175 resolvedHandle = handle 174 176 timeout = setTimeout(() => controller.abort(), fetchTimeout * 1000) 175 - const res = await fetch(`${resolvedPds}/xrpc/com.atproto.sync.getRepo?did=${encodeURIComponent(did)}`, { 176 - signal: controller.signal, 177 - }) 177 + let lastRev = await getRepoRev(did) 178 + const baseUrl = `${resolvedPds}/xrpc/com.atproto.sync.getRepo?did=${encodeURIComponent(did)}` 179 + let repoUrl = lastRev ? `${baseUrl}&since=${encodeURIComponent(lastRev)}` : baseUrl 180 + let res = await fetch(repoUrl, { signal: controller.signal }) 181 + 182 + // If the PDS rejected our `since` rev (compacted history), fall back to full import 183 + if (res.status === 400 && lastRev) { 184 + lastRev = null 185 + res = await fetch(baseUrl, { signal: controller.signal }) 186 + } 187 + 178 188 if (!res.ok) { 179 189 const httpErr = new Error(`getRepo failed for ${did}: ${res.status}`) as Error & { httpStatus: number } 180 190 httpErr.httpStatus = res.status 181 191 throw httpErr 182 192 } 183 193 184 - const carBytes = new Uint8Array(await res.arrayBuffer()) 185 - carSizeBytes = carBytes.length 186 - let { roots, blocks }: { roots: string[]; blocks: LazyBlockMap | null } = parseCarFrame(carBytes) 194 + resolvedSince = lastRev 195 + const { roots, blocks, byteLength } = await parseCarStream(res.body!) 196 + carSizeBytes = byteLength 187 197 188 198 // Decode commit to get MST root 189 199 const rootData = blocks.get(roots[0]) ··· 194 204 const entries = walkMst(blocks, commit.data.$link) 195 205 196 206 // Delete existing records for this DID before re-importing so deletions are reflected 197 - for (const col of collections) { 198 - const schema = getSchema(col) 199 - if (!schema) continue 200 - await runSQL(`DELETE FROM ${schema.tableName} WHERE did = $1`, did) 201 - for (const child of schema.children) { 202 - await runSQL(`DELETE FROM ${child.tableName} WHERE parent_did = $1`, did) 203 - } 204 - for (const union of schema.unions) { 205 - for (const branch of union.branches) { 206 - await runSQL(`DELETE FROM ${branch.tableName} WHERE parent_did = $1`, did) 207 + // Only on full imports (no since) — diff CARs only contain changes 208 + if (!lastRev) { 209 + for (const col of collections) { 210 + const schema = getSchema(col) 211 + if (!schema) continue 212 + await runSQL(`DELETE FROM ${schema.tableName} WHERE did = $1`, did) 213 + for (const child of schema.children) { 214 + await runSQL(`DELETE FROM ${child.tableName} WHERE parent_did = $1`, did) 215 + } 216 + for (const union of schema.unions) { 217 + for (const branch of union.branches) { 218 + await runSQL(`DELETE FROM ${branch.tableName} WHERE parent_did = $1`, did) 219 + } 207 220 } 208 221 } 209 222 } ··· 215 228 const collection = entry.path.split('/')[0] 216 229 if (!collections.has(collection)) continue 217 230 218 - const blockData = blocks!.get(entry.cid) 231 + const blockData = blocks.get(entry.cid) 219 232 if (!blockData) continue 220 - blocks!.delete(entry.cid) // free block data as we go 233 + blocks.delete(entry.cid) // free block data as we go 221 234 222 235 try { 223 236 const { value: record } = cborDecode(blockData) ··· 240 253 }) 241 254 } 242 255 } 243 - blocks!.free() 244 - blocks = null 245 256 if (chunk.length > 0) { 246 257 count += await bulkInsertRecords(chunk) 247 258 } ··· 273 284 error, 274 285 pds_url: resolvedPds, 275 286 car_size_bytes: carSizeBytes, 287 + import_mode: carSizeBytes !== undefined ? (resolvedSince ? 'diff' : 'full') : undefined, 288 + since_rev: resolvedSince, 276 289 retry_count: retryCount, 277 290 retry_after: retryAfter, 278 291 permanent_failure: retryCount === 999 ? true : undefined,
+123
packages/hatk/src/car.ts
··· 92 92 } 93 93 94 94 /** 95 + * Parses a CARv1 stream incrementally from a `ReadableStream`. 96 + * 97 + * Instead of buffering the entire CAR into a single ArrayBuffer, this reads 98 + * chunks from the stream and parses blocks as they arrive. Each block's data 99 + * is `.slice()`d into its own small `Uint8Array`, allowing V8 to GC individual 100 + * blocks as they're consumed during the MST walk. 101 + * 102 + * This is critical for backfill where multiple workers download 30-90MB CARs 103 + * concurrently — buffered downloads cause OOMs because `ArrayBuffer` memory 104 + * is "external" to V8's heap and not controlled by `--max-old-space-size`. 105 + * 106 + * @param body - The response body stream (e.g. `res.body` from `fetch()`) 107 + * @returns `roots` — root CID strings; `blocks` — map of CID → block data; `byteLength` — total bytes read 108 + */ 109 + export async function parseCarStream(body: ReadableStream<Uint8Array>): Promise<{ 110 + roots: string[] 111 + blocks: Map<string, Uint8Array> 112 + byteLength: number 113 + }> { 114 + const reader = body.getReader() 115 + 116 + // Growable buffer with position tracking. We reuse a single allocation and 117 + // compact (shift data to front) when the read position passes the midpoint, 118 + // avoiding per-chunk allocations and subarray references that pin old memory. 119 + let buf = new Uint8Array(64 * 1024) 120 + let pos = 0 // read cursor 121 + let len = 0 // bytes of valid data in buf 122 + let byteLength = 0 123 + 124 + // Ensure at least `need` bytes are available at buf[pos..pos+need) 125 + async function fill(need: number): Promise<boolean> { 126 + while (len - pos < need) { 127 + const { done, value } = await reader.read() 128 + if (done) return (len - pos) >= need 129 + byteLength += value.length 130 + 131 + // Compact: shift remaining data to front when read cursor passes midpoint 132 + if (pos > 0 && pos > buf.length >>> 1) { 133 + buf.copyWithin(0, pos, len) 134 + len -= pos 135 + pos = 0 136 + } 137 + 138 + // Grow if needed 139 + const required = len + value.length 140 + if (required > buf.length) { 141 + const newBuf = new Uint8Array(Math.max(required, buf.length * 2)) 142 + newBuf.set(buf.subarray(0, len)) 143 + buf = newBuf 144 + } 145 + 146 + buf.set(value, len) 147 + len += value.length 148 + } 149 + return true 150 + } 151 + 152 + function consume(n: number): void { 153 + pos += n 154 + } 155 + 156 + // Read a varint starting at buf[pos] 157 + function readVarintFromBuf(): [number, number] { 158 + let value = 0 159 + let shift = 0 160 + let p = pos 161 + while (p < len) { 162 + const byte = buf[p++] 163 + value |= (byte & 0x7f) << shift 164 + if ((byte & 0x80) === 0) return [value, p - pos] 165 + shift += 7 166 + if (shift > 35) throw new Error('Varint too long') 167 + } 168 + throw new Error('Unexpected end of varint') 169 + } 170 + 171 + // Parse header: varint(headerLen) + CBOR(header) 172 + if (!(await fill(1))) throw new Error('Empty CAR stream') 173 + 174 + // Prefetch up to 10 bytes for the varint; readVarintFromBuf bounds to `len` 175 + await fill(10) 176 + const [headerLen, headerVarintSize] = readVarintFromBuf() 177 + consume(headerVarintSize) 178 + 179 + if (!(await fill(headerLen))) throw new Error('Truncated CAR header') 180 + // .slice() copies out of the reusable buffer 181 + const headerSlice = buf.slice(pos, pos + headerLen) 182 + const { value: header } = cborDecode(headerSlice) 183 + consume(headerLen) 184 + 185 + const roots = (header.roots || []).map((root: any) => root?.$link ?? cidToString(root)) 186 + 187 + // Parse blocks 188 + const blocks = new Map<string, Uint8Array>() 189 + 190 + while (true) { 191 + if (!(await fill(1))) break 192 + 193 + // Prefetch up to 10 bytes for the varint; readVarintFromBuf bounds to `len` 194 + await fill(10) 195 + const [blockLen, blockVarintSize] = readVarintFromBuf() 196 + consume(blockVarintSize) 197 + if (blockLen === 0) break 198 + 199 + if (!(await fill(blockLen))) throw new Error('Truncated CAR block') 200 + 201 + const [cidBytes, afterCid] = parseCidFromBytes(buf, pos) 202 + const cid = cidToString(cidBytes) 203 + const cidLen = afterCid - pos 204 + // .slice() creates an independent copy — the buffer can be reused 205 + const data = buf.slice(afterCid, afterCid + blockLen - cidLen) 206 + 207 + blocks.set(cid, data) 208 + consume(blockLen) 209 + } 210 + 211 + reader.releaseLock() 212 + // Release the internal buffer 213 + buf = null! 214 + return { roots, blocks, byteLength } 215 + } 216 + 217 + /** 95 218 * Parses a CARv1 binary frame into its root CIDs and a lazy block map. 96 219 * 97 220 * The block map stores byte offsets into `carBytes` rather than copying data,
+5
packages/hatk/src/db.ts
··· 252 252 } 253 253 } 254 254 255 + export async function getRepoRev(did: string): Promise<string | null> { 256 + const rows = await all(`SELECT rev FROM _repos WHERE did = $1`, did) 257 + return rows[0]?.rev ?? null 258 + } 259 + 255 260 export async function getRepoRetryInfo(did: string): Promise<{ retryCount: number; retryAfter: number } | null> { 256 261 const rows = await all(`SELECT retry_count, retry_after FROM _repos WHERE did = $1`, did) 257 262 if (rows.length === 0) return null