···11import { createLogger } from '@wispplace/observability'
22import { config } from './config'
33-import { closeDatabase, db } from './lib/db'
44-import { getFirehoseHealth, startFirehose, stopFirehose } from './lib/firehose'
33+import { closeDatabase, db, loadAllWebhooks } from './lib/db'
44+import { runStartupBackfill } from './lib/backfill'
55+import { getFirehoseHealth, initScopeDids, startFirehose, stopFirehose } from './lib/firehose'
56import { closeRedisPublisher } from './lib/redis'
6778const logger = createLogger('webhook-service')
···104105105106async function main() {
106107 logger.info('Starting webhook-service')
107107- logger.info(`Firehose: ${config.firehoseService}`)
108108+ logger.info(`Jetstream: ${config.jetstreamUrl}`)
108109 logger.info(`Health endpoint: http://localhost:${config.healthPort}/health`)
109110111111+ const webhooks = await loadAllWebhooks()
112112+ if (webhooks.length === 0) {
113113+ logger.info('[registry] No webhook records in DB')
114114+ } else {
115115+ logger.info(`[registry] Tracking ${webhooks.length} webhook(s) across ${new Set(webhooks.map((w) => w.record.scope.aturi.replace(/^at:\/\//, '').split('/')[0])).size} DID(s)`)
116116+ for (const w of webhooks) {
117117+ logger.info(
118118+ `[registry] ${w.did}/${w.rkey}` +
119119+ ` scope=${w.record.scope.aturi}` +
120120+ ` events=${w.record.events?.join(',') ?? 'all'}` +
121121+ ` backlinks=${w.record.scope.backlinks ?? false}` +
122122+ ` enabled=${w.record.enabled ?? true}` +
123123+ ` url=${w.record.url}`,
124124+ )
125125+ }
126126+ }
127127+128128+ // Populate sync pre-filter sets before starting the firehose
129129+ initScopeDids(webhooks)
130130+110131 startFirehose()
132132+133133+ // Backfill any place.wisp.v2.wh records that existed before this run
134134+ await runStartupBackfill()
135135+ // Re-init after backfill in case it added new webhooks
136136+ initScopeDids(await loadAllWebhooks())
111137}
112138113139main().catch((err) => {
+92
apps/webhook-service/src/lib/backfill.ts
···11+import { getPdsForDid } from '@wispplace/atproto-utils'
22+import type { Main as WhRecord } from '@wispplace/lexicons/types/place/wisp/v2/wh'
33+import { createLogger } from '@wispplace/observability'
44+import { listAllKnownDids, upsertWebhookRecord } from './db'
55+66+const logger = createLogger('webhook-service:backfill')
77+88+interface ListRecordsResponse {
99+ records: Array<{
1010+ uri: string
1111+ cid: string
1212+ value: WhRecord
1313+ }>
1414+ cursor?: string
1515+}
1616+1717+/**
1818+ * Fetch all place.wisp.v2.wh records for a DID from their PDS.
1919+ * Pages through all results using the cursor.
2020+ */
2121+async function fetchWhRecordsForDid(did: string): Promise<Array<{ rkey: string; record: WhRecord }>> {
2222+ const pdsUrl = await getPdsForDid(did)
2323+ if (!pdsUrl) return []
2424+2525+ const results: Array<{ rkey: string; record: WhRecord }> = []
2626+ let cursor: string | undefined
2727+2828+ do {
2929+ const params = new URLSearchParams({
3030+ repo: did,
3131+ collection: 'place.wisp.v2.wh',
3232+ limit: '100',
3333+ })
3434+ if (cursor) params.set('cursor', cursor)
3535+3636+ const res = await fetch(`${pdsUrl}/xrpc/com.atproto.repo.listRecords?${params}`, {
3737+ signal: AbortSignal.timeout(10_000),
3838+ })
3939+4040+ if (!res.ok) {
4141+ if (res.status === 404) return results // DID has no records of this type
4242+ logger.warn(`[backfill] PDS returned ${res.status} for ${did}`)
4343+ return results
4444+ }
4545+4646+ const data = (await res.json()) as ListRecordsResponse
4747+ cursor = data.cursor
4848+4949+ for (const r of data.records) {
5050+ const rkey = r.uri.split('/').at(-1)
5151+ if (!rkey) continue
5252+ if (!r.value.scope?.aturi || !r.value.url) continue
5353+ results.push({ rkey, record: r.value })
5454+ }
5555+ } while (cursor)
5656+5757+ return results
5858+}
5959+6060+/**
6161+ * On startup, scan all known DIDs for existing place.wisp.v2.wh records
6262+ * and populate the local DB. This recovers webhook registrations that were
6363+ * created while the service was offline.
6464+ */
6565+export async function runStartupBackfill(): Promise<void> {
6666+ const dids = await listAllKnownDids()
6767+ if (dids.length === 0) {
6868+ logger.info('[backfill] No known DIDs to scan')
6969+ return
7070+ }
7171+7272+ logger.info(`[backfill] Scanning ${dids.length} known DIDs for place.wisp.v2.wh records`)
7373+7474+ let found = 0
7575+ let failed = 0
7676+7777+ for (const did of dids) {
7878+ try {
7979+ const records = await fetchWhRecordsForDid(did)
8080+ for (const { rkey, record } of records) {
8181+ await upsertWebhookRecord(did, rkey, record)
8282+ found++
8383+ logger.info(`[backfill] Imported ${did}/${rkey}`)
8484+ }
8585+ } catch (err) {
8686+ failed++
8787+ logger.warn(`[backfill] Failed to scan ${did}`, { err: String(err) })
8888+ }
8989+ }
9090+9191+ logger.info(`[backfill] Done — ${found} webhook record(s) imported, ${failed} DID(s) failed`)
9292+}
+50
apps/webhook-service/src/lib/db.ts
···4343`
44444545await db`
4646+ CREATE TABLE IF NOT EXISTS firehose_cursor (
4747+ id TEXT PRIMARY KEY DEFAULT 'singleton',
4848+ seq BIGINT NOT NULL,
4949+ saved_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
5050+ )
5151+`
5252+5353+await db`
4654 CREATE TABLE IF NOT EXISTS webhook_event_logs (
4755 id BIGSERIAL PRIMARY KEY,
4856 owner_did TEXT NOT NULL,
···233241 status: r.status as 'ok' | 'failed',
234242 deliveredAt: r.delivered_at,
235243 }))
244244+}
245245+246246+/** Persist the current firehose sequence number so restarts can resume. */
247247+export async function saveCursor(seq: number): Promise<void> {
248248+ await db`
249249+ INSERT INTO firehose_cursor (id, seq, saved_at)
250250+ VALUES ('singleton', ${seq}, NOW())
251251+ ON CONFLICT (id) DO UPDATE SET seq = EXCLUDED.seq, saved_at = NOW()
252252+ `
253253+}
254254+255255+/** Load the last saved firehose sequence number, or undefined if none. */
256256+export async function loadCursor(): Promise<number | undefined> {
257257+ const rows = await db<Array<{ seq: number }>>`
258258+ SELECT seq FROM firehose_cursor WHERE id = 'singleton'
259259+ `
260260+ return rows[0]?.seq
261261+}
262262+263263+/**
264264+ * Collect all DIDs we know about from the local DB.
265265+ * Used during startup backfill to find repos that may already have
266266+ * place.wisp.v2.wh records that arrived before the service started.
267267+ */
268268+export async function listAllKnownDids(): Promise<string[]> {
269269+ const dids = new Set<string>()
270270+271271+ const sites = await db<Array<{ did: string }>>`
272272+ SELECT DISTINCT did FROM sites WHERE did IS NOT NULL AND did <> ''
273273+ `
274274+ for (const r of sites) dids.add(r.did)
275275+276276+ try {
277277+ const sessions = await db<Array<{ sub: string }>>`
278278+ SELECT DISTINCT sub FROM oauth_sessions WHERE sub IS NOT NULL AND sub <> ''
279279+ `
280280+ for (const r of sessions) dids.add(r.sub)
281281+ } catch {
282282+ // oauth_sessions schema may differ; skip gracefully
283283+ }
284284+285285+ return [...dids].sort()
236286}
237287238288/** Close all database connections gracefully. */
+187-88
apps/webhook-service/src/lib/firehose.ts
···11-import { IdResolver } from '@atproto/identity'
22-import { Firehose } from '@atproto/sync'
33-import { BunFirehose, type CommitEvt, type Event, isBun } from '@wispplace/bun-firehose'
41import type { Main as WhRecord } from '@wispplace/lexicons/types/place/wisp/v2/wh'
52import { createLogger } from '@wispplace/observability'
63import { config } from '../config'
77-import { deleteWebhookRecord, findBacklinkWebhooks, findWebhooksForDid, upsertWebhookRecord } from './db'
44+import {
55+ deleteWebhookRecord,
66+ findBacklinkWebhooks,
77+ findWebhooksForDid,
88+ loadAllWebhooks,
99+ upsertWebhookRecord,
1010+ type WebhookEntry,
1111+} from './db'
812import { deliverWebhook } from './delivery'
1313+import { JetstreamClient, type JetstreamEvent } from './jetstream'
914import { matchWebhooks } from './matcher'
1015import { getCached, invalidate, setCached } from './registry'
11161217const logger = createLogger('webhook-service:firehose')
1313-const idResolver = new IdResolver()
14181519let lastEventTime = Date.now()
1620let isConnected = false
2121+let totalEvents = 0
2222+let totalMatched = 0
17231824export function getFirehoseHealth() {
1925 return {
2026 connected: isConnected,
2127 lastEventTime,
2228 timeSinceLastEvent: Date.now() - lastEventTime,
2323- healthy: isConnected && Date.now() - lastEventTime < 60000,
2929+ healthy: isConnected && Date.now() - lastEventTime < 60_000,
3030+ }
3131+}
3232+3333+export function getEventStats() {
3434+ return { events: totalEvents, matched: totalMatched }
3535+}
3636+3737+let directScopeDids = new Set<string>()
3838+let backlinkScopeDids = new Set<string>()
3939+4040+export function initScopeDids(webhooks: Array<{ record: { scope: { aturi: string; backlinks?: boolean } } }>): void {
4141+ directScopeDids = new Set()
4242+ backlinkScopeDids = new Set()
4343+ for (const w of webhooks) {
4444+ const did = w.record.scope.aturi.replace(/^at:\/\//, '').split('/')[0]
4545+ if (!did) continue
4646+ directScopeDids.add(did)
4747+ if (w.record.scope.backlinks) backlinkScopeDids.add(did)
4848+ }
4949+ logger.info(`[registry] tracking ${directScopeDids.size} scope DID(s), ${backlinkScopeDids.size} with backlinks`)
5050+ restartDirectJetstream()
5151+ if (backlinkScopeDids.size > 0 && !backlinkJetstream) {
5252+ startBacklinkJetstream()
5353+ } else if (backlinkScopeDids.size === 0 && backlinkJetstream) {
5454+ stopBacklinkJetstream()
2455 }
2556}
26572727-async function getWebhooksForEvent(eventDid: string) {
2828- // Direct scope matches: cached by eventDid
5858+function extractAtUriDids(obj: unknown, found: Set<string>): void {
5959+ if (typeof obj === 'string') {
6060+ if (obj.startsWith('at://')) {
6161+ const rest = obj.slice(5)
6262+ const slash = rest.indexOf('/')
6363+ const did = slash === -1 ? rest : rest.slice(0, slash)
6464+ if (did) found.add(did)
6565+ }
6666+ return
6767+ }
6868+ if (Array.isArray(obj)) {
6969+ for (const v of obj) extractAtUriDids(v, found)
7070+ return
7171+ }
7272+ if (obj !== null && typeof obj === 'object') {
7373+ for (const v of Object.values(obj)) extractAtUriDids(v, found)
7474+ }
7575+}
7676+7777+function recordReferencesAnyOf(record: unknown, dids: Set<string>): boolean {
7878+ if (record == null || dids.size === 0) return false
7979+ const found = new Set<string>()
8080+ extractAtUriDids(record, found)
8181+ for (const did of found) {
8282+ if (dids.has(did)) return true
8383+ }
8484+ return false
8585+}
8686+8787+async function getWebhooksForEvent(eventDid: string, eventRecord: unknown) {
2988 let direct = getCached(eventDid)
3089 if (!direct) {
3190 direct = await findWebhooksForDid(eventDid)
3291 setCached(eventDid, direct)
3392 }
34933535- // Backlink matches: cached under a fixed key
3694 let backlink = getCached('__backlinks__')
3795 if (!backlink) {
3896 backlink = await findBacklinkWebhooks()
3997 setCached('__backlinks__', backlink)
4098 }
41994242- // Combine, deduplicate by ownerDid/rkey
100100+ const includeBacklinks = backlink.length > 0 && recordReferencesAnyOf(eventRecord, backlinkScopeDids)
101101+ if (!includeBacklinks) return direct
102102+43103 const seen = new Set(direct.map((e) => `${e.ownerDid}/${e.rkey}`))
44104 const combined = [...direct]
45105 for (const entry of backlink) {
···52112 return combined
53113}
541145555-async function handleEvent(evt: Event | CommitEvt): Promise<void> {
115115+async function deliver(did: string, collection: string, rkey: string, op: string, cid: string | undefined, record: unknown): Promise<void> {
116116+ const candidates = await getWebhooksForEvent(did, record)
117117+ if (candidates.length === 0) return
118118+119119+ const matched = matchWebhooks(candidates, did, collection, rkey, op as any, record)
120120+121121+ if (process.env.FILTER_DEBUG) {
122122+ for (const c of candidates) {
123123+ logger.debug(
124124+ matched.includes(c)
125125+ ? `[filter] ✓ ${c.ownerDid}/${c.rkey} scope=${c.record.scope.aturi}`
126126+ : `[filter] ✗ ${c.ownerDid}/${c.rkey} scope=${c.record.scope.aturi}`,
127127+ )
128128+ }
129129+ }
130130+131131+ if (matched.length === 0) return
132132+ totalMatched += matched.length
133133+ logger.info(`[deliver] ${op} ${did}/${collection}/${rkey} → ${matched.length} webhook(s)`)
134134+ await Promise.allSettled(matched.map((entry) => deliverWebhook(entry, did, collection, rkey, op as any, cid, record)))
135135+}
136136+137137+async function handleWhRecord(op: string, did: string, rkey: string, record: unknown): Promise<void> {
138138+ logger.info(`[wh] ${op} ${did}/${rkey}`)
139139+ if (op === 'delete') {
140140+ deleteWebhookRecord(did, rkey).catch((err) => logger.error(`[DB] delete ${did}/${rkey}`, err))
141141+ } else if (record) {
142142+ const wh = record as WhRecord
143143+ if (!wh.scope?.aturi || !wh.url) {
144144+ logger.error(`[wh] Skipping ${did}/${rkey} — invalid record`, { record })
145145+ } else {
146146+ logger.info(`[wh] scope=${wh.scope.aturi} url=${wh.url} enabled=${wh.enabled ?? true}`)
147147+ upsertWebhookRecord(did, rkey, wh).catch((err) => logger.error(`[DB] upsert ${did}/${rkey}`, err))
148148+ }
149149+ } else {
150150+ logger.warn(`[wh] ${op} ${did}/${rkey} — record missing`)
151151+ }
152152+ invalidate(did)
153153+ invalidate('__backlinks__')
154154+ loadAllWebhooks().then(initScopeDids).catch(() => {})
155155+}
156156+157157+let directJetstream: JetstreamClient | null = null
158158+159159+async function handleDirectEvent(event: JetstreamEvent): Promise<void> {
56160 try {
161161+ if (event.kind !== 'commit' || !event.commit) return
57162 lastEventTime = Date.now()
5858-5959- if (!('event' in evt)) return
6060- if (evt.event !== 'create' && evt.event !== 'update' && evt.event !== 'delete') return
6161- const { did, collection, rkey, record, cid, event } = evt as CommitEvt
163163+ const { did } = event
164164+ const { operation: op, collection, rkey, record, cid } = event.commit
165165+ if (op !== 'create' && op !== 'update' && op !== 'delete') return
166166+ totalEvents++
621676363- // Keep DB up to date and invalidate cache when webhook records change
64168 if (collection === 'place.wisp.v2.wh') {
6565- logger.info(`[wh] Received ${event} for ${did}/${rkey}`)
6666- if (event === 'delete') {
6767- deleteWebhookRecord(did, rkey).catch((err) => logger.error(`[DB] Failed to delete webhook ${did}/${rkey}`, err))
6868- } else if (record) {
6969- logger.debug(`[wh] raw record: ${JSON.stringify(record)}`)
7070- const wh = record as WhRecord
7171- if (!wh.scope?.aturi || !wh.url) {
7272- logger.error(`[wh] Skipping ${did}/${rkey} — record failed validation`, { record })
7373- } else {
7474- logger.info(`[wh] scope=${wh.scope.aturi} url=${wh.url} enabled=${wh.enabled ?? true}`)
7575- upsertWebhookRecord(did, rkey, wh).catch((err) =>
7676- logger.error(`[DB] Failed to upsert webhook ${did}/${rkey}`, err),
7777- )
7878- }
7979- } else {
8080- logger.warn(`[wh] ${event} ${did}/${rkey} — record missing from commit`)
8181- }
8282- invalidate(did)
8383- invalidate('__backlinks__')
169169+ await handleWhRecord(op, did, rkey, record)
84170 return
85171 }
861728787- // Lookup webhooks for this event (cache-first)
8888- const candidates = await getWebhooksForEvent(did)
8989- if (candidates.length === 0) return
173173+ await deliver(did, collection, rkey, op, cid, record)
174174+ } catch (err) {
175175+ logger.error('Direct Jetstream event error', err)
176176+ }
177177+}
178178+179179+function restartDirectJetstream(): void {
180180+ const cursor = directJetstream?.cursor
181181+ directJetstream?.destroy()
182182+183183+ if (directScopeDids.size === 0) {
184184+ directJetstream = null
185185+ return
186186+ }
187187+188188+ directJetstream = new JetstreamClient({
189189+ url: config.jetstreamUrl,
190190+ wantedDids: [...directScopeDids],
191191+ cursor,
192192+ onEvent: handleDirectEvent,
193193+ onError: (err) => logger.error('Direct Jetstream error', err),
194194+ onConnect: () => { isConnected = true; logger.info('Direct Jetstream connected') },
195195+ onDisconnect: () => { isConnected = false },
196196+ })
197197+ directJetstream.start()
198198+}
901999191- const matched = matchWebhooks(candidates, did, collection, rkey, event, record)
9292- if (matched.length === 0) return
200200+let backlinkJetstream: JetstreamClient | null = null
932019494- logger.info(`[deliver] ${event} ${did}/${collection}/${rkey} → ${matched.length} webhook(s)`)
202202+async function handleBacklinkEvent(event: JetstreamEvent): Promise<void> {
203203+ try {
204204+ if (event.kind !== 'commit' || !event.commit) return
205205+ lastEventTime = Date.now()
206206+ const { did } = event
207207+ const { operation: op, collection, rkey, record, cid } = event.commit
208208+ if (op !== 'create' && op !== 'update' && op !== 'delete') return
952099696- await Promise.allSettled(
9797- matched.map((entry) => deliverWebhook(entry, did, collection, rkey, event, cid?.toString(), record)),
9898- )
210210+ if (collection === 'place.wisp.v2.wh' && !directScopeDids.has(did)) {
211211+ await handleWhRecord(op, did, rkey, record)
212212+ return
213213+ }
214214+215215+ if (!recordReferencesAnyOf(record, backlinkScopeDids)) return
216216+217217+ await deliver(did, collection, rkey, op, cid, record)
99218 } catch (err) {
100100- logger.error('Unexpected error in handleEvent', err)
219219+ logger.error('Backlink Jetstream event error', err)
101220 }
102221}
103222104104-function handleError(err: Error): void {
105105- logger.error('Firehose error', err)
223223+function startBacklinkJetstream(): void {
224224+ backlinkJetstream = new JetstreamClient({
225225+ url: config.jetstreamUrl,
226226+ onEvent: handleBacklinkEvent,
227227+ onError: (err) => logger.error('Backlink Jetstream error', err),
228228+ onConnect: () => logger.info('Backlink Jetstream connected'),
229229+ onDisconnect: () => logger.warn('Backlink Jetstream disconnected, reconnecting'),
230230+ })
231231+ backlinkJetstream.start()
106232}
107233108108-let firehoseHandle: { destroy: () => void } | null = null
234234+function stopBacklinkJetstream(): void {
235235+ backlinkJetstream?.destroy()
236236+ backlinkJetstream = null
237237+}
109238110239export function startFirehose(): void {
111111- logger.info(`Starting firehose (runtime: ${isBun ? 'Bun' : 'Node.js'})`)
112112-113113- if (isBun) {
114114- const f = new BunFirehose({
115115- idResolver,
116116- service: config.firehoseService,
117117- unauthenticatedCommits: true,
118118- handleEvent,
119119- onError: handleError,
120120- onConnect: () => {
121121- isConnected = true
122122- logger.info('Firehose connected')
123123- },
124124- onDisconnect: () => {
125125- isConnected = false
126126- logger.warn('Firehose disconnected, will reconnect')
127127- },
128128- })
129129- f.start()
130130- firehoseHandle = { destroy: () => f.destroy() }
131131- } else {
132132- isConnected = true
133133- const f = new Firehose({
134134- idResolver,
135135- service: config.firehoseService,
136136- handleEvent: handleEvent as any,
137137- onError: handleError,
138138- })
139139- f.start()
140140- firehoseHandle = { destroy: () => f.destroy() }
141141- }
240240+ logger.info(`Jetstream: ${config.jetstreamUrl}`)
241241+ restartDirectJetstream()
242242+ if (backlinkScopeDids.size > 0) startBacklinkJetstream()
142243143244 setInterval(() => {
144144- const health = getFirehoseHealth()
145145- if (health.timeSinceLastEvent > 30000) {
146146- logger.warn(`No firehose events for ${Math.round(health.timeSinceLastEvent / 1000)}s`)
147147- } else {
148148- logger.info(`Firehose alive, last event ${Math.round(health.timeSinceLastEvent / 1000)}s ago`)
245245+ if (Date.now() - lastEventTime > 30_000) {
246246+ logger.warn(`No events for ${Math.round((Date.now() - lastEventTime) / 1000)}s`)
149247 }
150150- }, 30000)
248248+ }, 30_000)
151249}
152250153251export function stopFirehose(): void {
154154- logger.info('Stopping firehose')
252252+ logger.info('Stopping Jetstream consumers')
155253 isConnected = false
156156- firehoseHandle?.destroy()
157157- firehoseHandle = null
254254+ directJetstream?.destroy()
255255+ directJetstream = null
256256+ stopBacklinkJetstream()
158257}