AppView in a box as a Vite plugin thing hatk.dev
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: cap auto-backfill concurrency to prevent OOM

Retry timers in triggerAutoBackfill bypassed the concurrency check in
processMessage, allowing unbounded concurrent CAR downloads (23 observed
in production, 2GB RSS). Move the concurrency gate into
triggerAutoBackfill itself and use config.backfill.parallelism instead of
a hardcoded constant. Add dedup set to prevent re-schedule timer
accumulation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+17 -3
+1 -1
packages/hatk/package.json
··· 1 1 { 2 2 "name": "@hatk/hatk", 3 - "version": "0.0.1-alpha.9", 3 + "version": "0.0.1-alpha.10", 4 4 "license": "MIT", 5 5 "bin": { 6 6 "hatk": "dist/cli.js"
+15 -2
packages/hatk/src/indexer.ts
··· 30 30 31 31 // Track in-flight backfills to avoid duplicates 32 32 const backfillInFlight = new Set<string>() 33 - const MAX_CONCURRENT_BACKFILLS = 5 33 + const pendingReschedule = new Set<string>() 34 34 35 35 // In-memory cache of repo status to avoid flooding the DB read queue 36 36 const repoStatusCache = new Map<string, string>() ··· 41 41 let indexerPinnedRepos: Set<string> | null = null 42 42 let indexerFetchTimeout: number 43 43 let indexerMaxRetries: number 44 + let maxConcurrentBackfills = 3 44 45 45 46 async function flushBuffer(): Promise<void> { 46 47 if (buffer.length === 0) return ··· 130 131 131 132 export async function triggerAutoBackfill(did: string, attempt = 0): Promise<void> { 132 133 if (backfillInFlight.has(did)) return 134 + if (backfillInFlight.size >= maxConcurrentBackfills) { 135 + if (!pendingReschedule.has(did)) { 136 + pendingReschedule.add(did) 137 + setTimeout(() => { 138 + pendingReschedule.delete(did) 139 + triggerAutoBackfill(did, attempt) 140 + }, 10_000) 141 + } 142 + return 143 + } 133 144 backfillInFlight.add(did) 134 145 pendingBuffers.set(did, []) 135 146 if (attempt === 0) await setRepoStatus(did, 'pending') ··· 192 203 cursor?: string | null 193 204 fetchTimeout: number 194 205 maxRetries: number 206 + parallelism?: number 195 207 ftsRebuildInterval?: number 196 208 } 197 209 ··· 226 238 indexerPinnedRepos = opts.pinnedRepos || null 227 239 indexerFetchTimeout = fetchTimeout 228 240 indexerMaxRetries = opts.maxRetries 241 + maxConcurrentBackfills = opts.parallelism ?? 3 229 242 230 243 // Pre-populate repo status cache from DB so non-signal updates 231 244 // (e.g. profile changes) are processed for already-tracked DIDs ··· 306 319 } 307 320 308 321 if (hasSignalOp && (!indexerPinnedRepos || indexerPinnedRepos.has(did))) { 309 - if (repoStatus === null && backfillInFlight.size < MAX_CONCURRENT_BACKFILLS) { 322 + if (repoStatus === null && backfillInFlight.size < maxConcurrentBackfills) { 310 323 repoStatusCache.set(did, 'pending') 311 324 triggerAutoBackfill(did) 312 325 } else if (repoStatus === null) {
+1
packages/hatk/src/main.ts
··· 164 164 cursor, 165 165 fetchTimeout: config.backfill.fetchTimeout, 166 166 maxRetries: config.backfill.maxRetries, 167 + parallelism: config.backfill.parallelism, 167 168 ftsRebuildInterval: config.ftsRebuildInterval, 168 169 }) 169 170