···11import { createLogger } from '@wispplace/observability'
22import { config } from './config'
33import { runStartupBackfill } from './lib/backfill'
44-import { closeDatabase, db, loadAllWebhooks } from './lib/db'
44+import { closeDatabase, db, loadAllWebhooks, loadCursor } from './lib/db'
55import { getFirehoseHealth, initScopeDids, startFirehose, stopFirehose } from './lib/firehose'
66import { closeRedisPublisher } from './lib/redis'
77···130130 // Populate sync pre-filter sets before starting the firehose
131131 initScopeDids(webhooks)
132132133133- startFirehose()
133133+ const cursor = await loadCursor(config.jetstreamUrl)
134134+ if (cursor !== undefined) {
135135+ logger.info(`[cursor] Resuming from ${cursor}`)
136136+ }
137137+ startFirehose(cursor)
134138135139 // Backfill any place.wisp.v2.wh records that existed before this run
136140 await runStartupBackfill()
+42-16
apps/webhook-service/src/lib/db.ts
···4343`
44444545await db`
4646- CREATE TABLE IF NOT EXISTS firehose_cursor (
4747- id TEXT PRIMARY KEY DEFAULT 'singleton',
4848- seq BIGINT NOT NULL,
4949- saved_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
4646+ CREATE TABLE IF NOT EXISTS jetstream_cursor (
4747+ id TEXT PRIMARY KEY DEFAULT 'singleton',
4848+ seq BIGINT NOT NULL,
4949+ jetstream_url TEXT,
5050+ saved_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
5051 )
5152`
5253···129130 * Insert or update a webhook record in both tables.
130131 * `webhooks` holds structured columns for quick filtering; `webhook_records` holds the full JSONB record.
131132 * Key is `did/rkey`.
133133+ * Returns true if the record was inserted or its content changed, false if it was already up-to-date.
132134 */
133133-export async function upsertWebhookRecord(did: string, rkey: string, record: WhRecord): Promise<void> {
135135+export async function upsertWebhookRecord(did: string, rkey: string, record: WhRecord): Promise<boolean> {
134136 const k = `${did}/${rkey}`
135137 try {
136138 await db`
···143145 enabled = EXCLUDED.enabled,
144146 updated_at = EXTRACT(EPOCH FROM NOW())
145147 `
146146- await db`
148148+ const rows = await db<Array<{ changed: boolean }>>`
149149+ WITH existing AS (
150150+ SELECT v FROM webhook_records WHERE k = ${k}
151151+ )
147152 INSERT INTO webhook_records (k, v, updated_at)
148153 VALUES (${k}, ${record}, EXTRACT(EPOCH FROM NOW()))
149154 ON CONFLICT (k) DO UPDATE SET
150155 v = EXCLUDED.v,
151156 updated_at = EXTRACT(EPOCH FROM NOW())
157157+ RETURNING (
158158+ NOT EXISTS (SELECT 1 FROM existing)
159159+ OR (SELECT v FROM existing) IS DISTINCT FROM webhook_records.v
160160+ ) AS changed
152161 `
162162+ return rows[0]?.changed ?? true
153163 } catch (err) {
154164 logger.error(`[DB] upsertWebhookRecord error for ${k}`, err)
155165 throw err
···243253 }))
244254}
245255246246-/** Persist the current firehose sequence number so restarts can resume. */
247247-export async function saveCursor(seq: number): Promise<void> {
256256+const CURSOR_REWIND_US = 2_000_000 // 2 seconds in microseconds
257257+258258+/** Persist the current Jetstream cursor (time_us) and the URL it came from. */
259259+export async function saveCursor(seq: number, jetstreamUrl: string): Promise<void> {
248260 await db`
249249- INSERT INTO firehose_cursor (id, seq, saved_at)
250250- VALUES ('singleton', ${seq}, NOW())
251251- ON CONFLICT (id) DO UPDATE SET seq = EXCLUDED.seq, saved_at = NOW()
261261+ INSERT INTO jetstream_cursor (id, seq, jetstream_url, saved_at)
262262+ VALUES ('singleton', ${seq}, ${jetstreamUrl}, NOW())
263263+ ON CONFLICT (id) DO UPDATE SET seq = EXCLUDED.seq, jetstream_url = EXCLUDED.jetstream_url, saved_at = NOW()
252264 `
253265}
254266255255-/** Load the last saved firehose sequence number, or undefined if none. */
256256-export async function loadCursor(): Promise<number | undefined> {
257257- const rows = await db<Array<{ seq: number }>>`
258258- SELECT seq FROM firehose_cursor WHERE id = 'singleton'
267267+/**
268268+ * Load the saved Jetstream cursor, rewound by 2 seconds to ensure gapless playback.
269269+ * Returns undefined if no cursor is saved or the saved cursor is from a different Jetstream instance
270270+ * (time_us is not comparable across instances).
271271+ */
272272+export async function loadCursor(jetstreamUrl: string): Promise<number | undefined> {
273273+ const rows = await db<Array<{ seq: number; jetstream_url: string | null }>>`
274274+ SELECT seq, jetstream_url FROM jetstream_cursor WHERE id = 'singleton'
259275 `
260260- return rows[0]?.seq
276276+ const row = rows[0]
277277+ if (!row) return undefined
278278+ if (row.jetstream_url && row.jetstream_url !== jetstreamUrl) return undefined
279279+ return Math.max(0, Number(row.seq) - CURSOR_REWIND_US)
261280}
262281263282/**
···281300 } catch {
282301 // oauth_sessions schema may differ; skip gracefully
283302 }
303303+304304+ // Include any DIDs that already have webhook records — they may not be wisp.place
305305+ // users but still have place.wisp.v2.wh records we need to stay in sync with.
306306+ const webhookDids = await db<Array<{ did: string }>>`
307307+ SELECT DISTINCT did FROM webhooks WHERE did IS NOT NULL AND did <> ''
308308+ `
309309+ for (const r of webhookDids) dids.add(r.did)
284310285311 return [...dids].sort()
286312}