Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

sliding window to firehose backfilling

+22 -9
+1
Dockerfile
··· 12 12 COPY apps/main-app/package.json ./apps/main-app/package.json 13 13 COPY apps/hosting-service/package.json ./apps/hosting-service/package.json 14 14 COPY apps/firehose-service/package.json ./apps/firehose-service/package.json 15 + COPY apps/webhook-service/package.json ./apps/webhook-service/package.json 15 16 16 17 # Install all dependencies (including workspaces) 17 18 RUN bun install --frozen-lockfile
+1
apps/firehose-service/Dockerfile
··· 13 13 COPY apps/firehose-service ./apps/firehose-service 14 14 COPY apps/main-app/package.json ./apps/main-app/package.json 15 15 COPY apps/hosting-service/package.json ./apps/hosting-service/package.json 16 + COPY apps/webhook-service/package.json ./apps/webhook-service/package.json 16 17 COPY cli/package.json ./cli/package.json 17 18 18 19 # Install dependencies
+1
apps/firehose-service/src/config.ts
··· 34 34 process.argv.includes('--db-fill-only') || 35 35 process.env.BACKFILL === 'true' || 36 36 process.env.DB_FILL_ONLY === 'true', 37 + backfillConcurrency: parseInt(process.env.BACKFILL_CONCURRENCY || '5', 10), 37 38 } as const
+18 -9
apps/firehose-service/src/index.ts
··· 157 157 logger.info('Sites table empty; falling back to site_cache entries') 158 158 } 159 159 160 - logger.info(`Found ${sites.length} sites in database`) 160 + const concurrency = config.backfillConcurrency 161 + logger.info(`Found ${sites.length} sites in database (concurrency: ${concurrency})`) 161 162 162 163 let processed = 0 163 164 let skipped = 0 164 165 let failed = 0 165 166 166 - for (const site of sites) { 167 + const processSite = async (site: { did: string; rkey: string }) => { 167 168 try { 168 - // Fetch current record from PDS 169 169 const result = await fetchSiteRecord(site.did, site.rkey) 170 170 171 171 if (!result) { 172 172 logger.info(`Site not found on PDS: ${site.did}/${site.rkey}`) 173 173 skipped++ 174 - continue 174 + return 175 175 } 176 176 177 177 const existingCache = await getSiteCache(site.did, site.rkey) 178 - // Check if CID matches (already up to date) 179 178 if (!forceRewriteHtml && !forceDownload && existingCache && result.cid === existingCache.record_cid) { 180 179 logger.info(`Site already up to date: ${site.did}/${site.rkey}`) 181 180 skipped++ 182 - continue 181 + return 183 182 } 184 183 185 - // Process the site 186 184 await handleSiteCreateOrUpdate(site.did, site.rkey, result.record, result.cid, { 187 185 forceRewriteHtml, 188 186 forceDownload, 189 187 }) 190 188 processed++ 191 - 192 - logger.info(`Progress: ${processed + skipped + failed}/${sites.length}`) 193 189 } catch (err) { 194 190 logger.error(`Failed to process ${site.did}/${site.rkey}`, err) 195 191 failed++ 196 192 } 193 + 194 + logger.info(`Progress: ${processed + skipped + failed}/${sites.length} (${processed} processed, ${skipped} skipped, ${failed} failed)`) 197 195 } 196 + 197 + // Sliding window: keep `concurrency` tasks in flight at all times 198 + const inFlight = new Set<Promise<void>>() 199 + for (const site of sites) { 200 + const task = processSite(site).then(() => { inFlight.delete(task) }) 201 + inFlight.add(task) 202 + if (inFlight.size >= concurrency) { 203 + await Promise.race(inFlight) 204 + } 205 + } 206 + await Promise.all(inFlight) 198 207 199 208 const elapsedMs = Date.now() - startTime 200 209 const elapsedSec = Math.round(elapsedMs / 1000)
+1
apps/hosting-service/Dockerfile
··· 13 13 COPY apps/hosting-service ./apps/hosting-service 14 14 COPY apps/main-app/package.json ./apps/main-app/package.json 15 15 COPY apps/firehose-service/package.json ./apps/firehose-service/package.json 16 + COPY apps/webhook-service/package.json ./apps/webhook-service/package.json 16 17 COPY cli/package.json ./cli/package.json 17 18 18 19 # Install dependencies