···33 *
44 * Modes:
55 * - Normal: Watch firehose for place.wisp.fs events
66- * - Backfill: Process existing sites from database
77- * - DB Fill Only: Collect DIDs and backfill sites table (skip S3 writes)
66+ * - Backfill: Process existing sites discovered from known DIDs
77+ * - DB Fill Only: Legacy mode; no-op now that site_cache is the canonical projection
88 */
991010import { serve } from '@hono/node-server'
···1414import { config } from './config'
1515import { closeCacheInvalidationPublisher } from './lib/cache-invalidation'
1616import { fetchSiteRecord, handleSiteCreateOrUpdate, listSiteRecordsForDid } from './lib/cache-writer'
1717-import { closeDatabase, getSiteCache, listAllKnownDids, listAllSiteCaches, listAllSites, upsertSite } from './lib/db'
1717+import { closeDatabase, getSiteCache, listAllKnownDids } from './lib/db'
1818import { getActiveService, getCurrentSeq, getFirehoseHealth, startFirehose, stopFirehose } from './lib/firehose'
1919import { closeLeaderRedis, getLeaderInfo, releaseLeadership, runLeaderElection, saveCursor } from './lib/leader'
2020import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'
···8080/**
8181 * Backfill phase 1+2:
8282 * - Collect all known DIDs from DB
8383- * - Backfill each DID's place.wisp.fs records into the sites table
8383+ * - Discover each DID's place.wisp.fs records directly from the PDS
8484 */
8585-async function backfillSitesTableFromKnownDids(): Promise<void> {
8585+async function collectSitesFromKnownDids(): Promise<Array<{ did: string; rkey: string }>> {
8686 logger.info('Phase 1/3: Collecting known DIDs')
8787 const dids = await listAllKnownDids()
8888 logger.info(`Collected ${dids.length} known DIDs`)
89899090 if (dids.length === 0) {
9191- logger.warn('No known DIDs found; skipping sites table backfill')
9292- return
9191+ logger.warn('No known DIDs found; skipping site discovery')
9292+ return []
9393 }
94949595- logger.info('Phase 2/3: Backfilling place.wisp.fs records into sites table')
9595+ logger.info('Phase 2/3: Discovering place.wisp.fs records from known DIDs')
96969797 let didsProcessed = 0
9898 let didsFailed = 0
9999- let sitesSynced = 0
9999+ let sitesDiscovered = 0
100100 let sitesFailed = 0
101101+ const discoveredSites = new Map<string, { did: string; rkey: string }>()
101102102103 const concurrency = config.backfillConcurrency
103104···106107 const records = await listSiteRecordsForDid(did)
107108 for (const row of records) {
108109 try {
109109- const siteName =
110110- typeof row.record.site === 'string' && row.record.site.length > 0 ? row.record.site : row.rkey
111111- await upsertSite(did, row.rkey, siteName)
112112- sitesSynced++
110110+ discoveredSites.set(`${did}:${row.rkey}`, { did, rkey: row.rkey })
111111+ sitesDiscovered++
113112 } catch (err) {
114114- logger.error(`[Backfill:sites] Failed to upsert site ${did}/${row.rkey}`, err)
113113+ logger.error(`[Backfill:sites] Failed to register site ${did}/${row.rkey}`, err)
115114 sitesFailed++
116115 }
117116 }
···121120 didsFailed++
122121 }
123122 logger.info(
124124- `[Backfill:sites] Progress ${didsProcessed + didsFailed}/${dids.length} DIDs (${sitesSynced} sites synced, ${sitesFailed} sites failed)`,
123123+ `[Backfill:sites] Progress ${didsProcessed + didsFailed}/${dids.length} DIDs (${sitesDiscovered} sites discovered, ${sitesFailed} sites failed)`,
125124 )
126125 }
127126···138137 await Promise.all(inFlight)
139138140139 logger.info(
141141- `Phase 2/3 complete: ${didsProcessed} DIDs processed, ${didsFailed} DIDs failed, ${sitesSynced} sites synced, ${sitesFailed} sites failed`,
140140+ `Phase 2/3 complete: ${didsProcessed} DIDs processed, ${didsFailed} DIDs failed, ${discoveredSites.size} unique sites discovered, ${sitesFailed} sites failed`,
142141 )
142142+ return [...discoveredSites.values()]
143143}
144144145145/**
146146 * Backfill phase 3:
147147- * - process sites from database and backfill blobs into S3
147147+ * - process discovered sites and backfill blobs into S3
148148 */
149149async function runBackfill(): Promise<void> {
150150 logger.info('Starting backfill mode')
···159159 logger.info('Forcing full file download/write for all backfilled sites')
160160 }
161161162162- await backfillSitesTableFromKnownDids()
162162+ const sites = await collectSitesFromKnownDids()
163163164164 if (config.isDbFillOnly) {
165165- logger.info('DB fill only mode complete; skipping phase 3/3 S3 backfill')
165165+ logger.info('DB fill only mode enabled; skipping phase 3/3 cache backfill')
166166 return
167167 }
168168169169 logger.info('Phase 3/3: Backfilling site blobs into S3')
170170171171- let sites = await listAllSites()
172172- if (sites.length === 0) {
173173- const cachedSites = await listAllSiteCaches()
174174- sites = cachedSites.map((site) => ({ did: site.did, rkey: site.rkey }))
175175- logger.info('Sites table empty; falling back to site_cache entries')
176176- }
177177-178171 const concurrency = config.backfillConcurrency
179179- logger.info(`Found ${sites.length} sites in database (concurrency: ${concurrency})`)
172172+ logger.info(`Found ${sites.length} sites to process (concurrency: ${concurrency})`)
180173181174 let processed = 0
182175 let skipped = 0
-4
apps/firehose-service/src/lib/cache-writer.ts
···1616import { safeFetchBlob, safeFetchJson } from '@wispplace/safe-fetch'
1717import { publishCacheInvalidation } from './cache-invalidation'
1818import {
1919- deleteSite,
2019 deleteSiteCache,
2120 deleteSiteSettingsCache,
2221 getSiteCache,
2322 isSupporter,
2424- upsertSite,
2523 upsertSiteCache,
2624 upsertSiteSettingsCache,
2725} from './db'
···826824 // Update DB with new CIDs
827825 logger.debug(`About to upsert site cache for ${did}/${rkey}`)
828826 await upsertSiteCache(did, rkey, recordCid, newFileCids)
829829- await upsertSite(did, rkey, record.site)
830827 logger.debug(`Updated site cache for ${did}/${rkey} with record CID ${recordCid}`)
831828832829 // Backfill settings if a record exists for this rkey
···864861865862 // Delete from DB
866863 await deleteSiteCache(did, rkey)
867867- await deleteSite(did, rkey)
868864869865 // Notify hosting-service to invalidate its local caches
870866 await publishCacheInvalidation(did, rkey, 'delete')
+1-32
apps/firehose-service/src/lib/db.ts
···11-import type { SiteCache, SiteRecord, SiteSettingsCache } from '@wispplace/database'
11+import type { SiteCache, SiteSettingsCache } from '@wispplace/database'
22import { createLogger } from '@wispplace/observability'
33import postgres from 'postgres'
44import { config } from '../config'
···4141 `
4242}
43434444-export async function listAllSites(): Promise<SiteRecord[]> {
4545- return await sql<SiteRecord[]>`
4646- SELECT did, rkey, display_name, created_at, updated_at
4747- FROM sites
4848- ORDER BY updated_at DESC
4949- `
5050-}
5151-5244/**
5345 * List all known DIDs from all DID-bearing tables.
5446 * Missing tables are skipped to keep bootstrapping resilient.
5547 */
5648export async function listAllKnownDids(): Promise<string[]> {
5749 const sources: Array<{ name: string; fetch: () => Promise<Array<{ did: string }>> }> = [
5858- {
5959- name: 'sites',
6060- fetch: () => sql<Array<{ did: string }>>`
6161- SELECT DISTINCT did
6262- FROM sites
6363- WHERE did IS NOT NULL AND did <> ''
6464- `,
6565- },
6650 {
6751 name: 'site_cache',
6852 fetch: () => sql<Array<{ did: string }>>`
···218202219203export async function deleteSiteSettingsCache(did: string, rkey: string): Promise<void> {
220204 await sql`DELETE FROM site_settings_cache WHERE did = ${did} AND rkey = ${rkey}`
221221-}
222222-223223-export async function upsertSite(did: string, rkey: string, displayName: string): Promise<void> {
224224- await sql`
225225- INSERT INTO sites (did, rkey, display_name, created_at, updated_at)
226226- VALUES (${did}, ${rkey}, ${displayName}, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW()))
227227- ON CONFLICT (did, rkey)
228228- DO UPDATE SET
229229- display_name = EXCLUDED.display_name,
230230- updated_at = EXTRACT(EPOCH FROM NOW())
231231- `
232232-}
233233-234234-export async function deleteSite(did: string, rkey: string): Promise<void> {
235235- await sql`DELETE FROM sites WHERE did = ${did} AND rkey = ${rkey}`
236205}
237206238207export async function isSupporter(did: string): Promise<boolean> {
-23
apps/hosting-service/src/lib/db.ts
···7070 )
7171}
72727373-export async function upsertSite(did: string, rkey: string, displayName?: string) {
7474- console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName })
7575-}
7676-7773/**
7874 * Upsert site cache entry (used by on-demand caching when a site is completely missing)
7975 */
···10298 const error = err instanceof Error ? err : new Error(String(err))
10399 console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message })
104100 throw error
105105- }
106106-}
107107-108108-export interface SiteRecord {
109109- did: string
110110- rkey: string
111111- display_name?: string
112112-}
113113-114114-export async function getAllSites(): Promise<SiteRecord[]> {
115115- try {
116116- const result = await sql<SiteRecord[]>`
117117- SELECT did, rkey, display_name FROM sites
118118- ORDER BY created_at DESC
119119- `
120120- return result
121121- } catch (err) {
122122- console.error('Failed to get all sites', err)
123123- return []
124101 }
125102}
126103
···8181 )
8282`
83838484-// Legacy sites table. Main-app now uses site_cache as the authoritative runtime projection.
8585-await db`
8686- CREATE TABLE IF NOT EXISTS sites (
8787- did TEXT NOT NULL,
8888- rkey TEXT NOT NULL,
8989- display_name TEXT,
9090- created_at BIGINT DEFAULT EXTRACT(EPOCH FROM NOW()),
9191- updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM NOW()),
9292- PRIMARY KEY (did, rkey)
9393- )
9494-`
9595-9684// Site cache table - stores CIDs for cached sites (used by firehose/hosting services)
9785await db`
9886 CREATE TABLE IF NOT EXISTS site_cache (
-5
apps/main-app/src/lib/migrations.ts
···228228 console.error('Failed to create idx_custom_domains_verified:', err)
229229 }
230230 }),
231231- db`CREATE INDEX IF NOT EXISTS idx_sites_did ON sites(did)`.catch((err) => {
232232- if (!hasAlreadyExists(err)) {
233233- console.error('Failed to create idx_sites_did:', err)
234234- }
235235- }),
236231 db`CREATE INDEX IF NOT EXISTS idx_site_cache_did ON site_cache(did)`.catch((err) => {
237232 if (!hasAlreadyExists(err)) {
238233 console.error('Failed to create idx_site_cache_did:', err)
+1-1
apps/webhook-service/src/lib/db.ts
···288288 const dids = new Set<string>()
289289290290 const sites = await db<Array<{ did: string }>>`
291291- SELECT DISTINCT did FROM sites WHERE did IS NOT NULL AND did <> ''
291291+ SELECT DISTINCT did FROM site_cache WHERE did IS NOT NULL AND did <> ''
292292 `
293293 for (const r of sites) dids.add(r.did)
294294