See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Harden firehose threshold-scan + Discord webhook

- Discord webhook: add 5s per-attempt fetch timeout via AbortSignal so a
hung socket can't stall the worker. Treat AbortError as retryable.
- Discord webhook: stop retrying 4xx responses (except 429) — they're not
recoverable. On 429, honour Retry-After (capped at 10s) before falling
back to the static backoff schedule.
- threshold_scan_job: wrap dedup-row insertion so a UNIQUE-constraint
error from a concurrent replica is swallowed and logged via PostHog
instead of aborting the loop.
- Note at the AppView call site that getPosts already chunks to 25 URIs
internally.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+326 -11
+36 -7
app/jobs/threshold_scan_job.ts
··· 146 146 const pendingUris = pending.map((p) => p.subjectUri) 147 147 let posts: PostView[] 148 148 try { 149 + // AtprotoClient.getPosts handles the AppView's 25-URI-per-call limit 150 + // internally by chunking and merging results — we pass the full list. 149 151 posts = await this.atprotoClient.getPosts(pendingUris) 150 152 } catch (err) { 151 153 for (const p of pending) { ··· 207 209 } 208 210 209 211 private async insertDedupRow(subjectUri: string, threshold: number): Promise<void> { 210 - // Use create; the caller is expected to have already filtered out existing 211 - // (subject, threshold) pairs via the alreadyFired map. 212 - await NotifiedThreshold.create({ 213 - subjectUri, 214 - threshold, 215 - firedAt: Date.now(), 216 - }) 212 + // The caller already filtered out (subject, threshold) pairs known to exist 213 + // via the alreadyFired map, but two queue-worker replicas can still race to 214 + // insert the same row. SQLite raises a UNIQUE-constraint error on the 215 + // composite PK; swallow exactly that case (the row is present either way) 216 + // and re-throw anything else. 217 + try { 218 + await NotifiedThreshold.create({ 219 + subjectUri, 220 + threshold, 221 + firedAt: Date.now(), 222 + }) 223 + } catch (err) { 224 + if (isUniqueConstraintError(err)) { 225 + this.capturePostHog('firehose_dedup_race', { 226 + subject_uri: subjectUri, 227 + threshold, 228 + }) 229 + return 230 + } 231 + throw err 232 + } 217 233 } 234 + } 235 + 236 + /** 237 + * Detect SQLite's UNIQUE-constraint error across the Lucid / better-sqlite3 / 238 + * sqlite3 drivers. All surface a message containing "UNIQUE constraint" and 239 + * some also expose an error code beginning with `SQLITE_CONSTRAINT`. 240 + */ 241 + function isUniqueConstraintError(err: unknown): boolean { 242 + if (!(err instanceof Error)) return false 243 + if (typeof err.message === 'string' && err.message.includes('UNIQUE constraint')) return true 244 + const code = (err as Error & { code?: string }).code 245 + if (typeof code === 'string' && code.startsWith('SQLITE_CONSTRAINT')) return true 246 + return false 218 247 } 219 248 220 249 /**
+40 -2
app/services/discord_webhook.ts
··· 45 45 46 46 const BACKOFF_DELAYS_MS = [500, 1000, 2000] 47 47 const DESCRIPTION_MAX_CHARS = 500 48 + /** Per-attempt fetch timeout. A hung Discord socket must not stall the job. */ 49 + const FETCH_TIMEOUT_MS = 5000 50 + /** Cap on honouring Discord's `Retry-After` — don't let Discord block the worker for long. */ 51 + const MAX_RETRY_AFTER_MS = 10_000 48 52 49 53 /** 50 54 * Convert an AT-URI of shape `at://{did}/app.bsky.feed.post/{rkey}` to the ··· 102 106 103 107 /** 104 108 * POST the Discord embed payload. Retries up to 3 total attempts on network 105 - * error or HTTP >= 400. Throws `DiscordWebhookError` if all attempts fail. 109 + * errors, timeouts, 429, and 5xx. 4xx other than 429 are treated as 110 + * non-recoverable and throw immediately. Each attempt has a 5s timeout. 111 + * Throws `DiscordWebhookError` if all retryable attempts fail. 106 112 */ 107 113 async send(params: DiscordWebhookSendParams): Promise<void> { 108 114 const payload = buildPayload(params) ··· 110 116 111 117 let lastError: unknown 112 118 for (let attempt = 0; attempt < BACKOFF_DELAYS_MS.length; attempt++) { 119 + let retryAfterMs: number | undefined 113 120 try { 114 121 const response = await this.#fetch(params.webhookUrl, { 115 122 method: 'POST', 116 123 headers: { 'Content-Type': 'application/json' }, 117 124 body, 125 + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), 118 126 }) 119 127 if (response.ok) return 128 + // 4xx other than 429 is non-recoverable (bad payload, bad URL, etc.) 129 + if (response.status >= 400 && response.status < 500 && response.status !== 429) { 130 + throw new DiscordWebhookError( 131 + `Discord webhook returned HTTP ${response.status} (non-retryable)`, 132 + attempt + 1, 133 + new Error(`HTTP ${response.status}`) 134 + ) 135 + } 120 136 lastError = new Error(`Discord webhook returned HTTP ${response.status}`) 137 + if (response.status === 429) { 138 + retryAfterMs = parseRetryAfterHeader(response.headers) 139 + } 121 140 } catch (err) { 141 + // Non-retryable errors bubble straight out. 142 + if (err instanceof DiscordWebhookError) throw err 122 143 lastError = err 123 144 } 124 145 125 146 if (attempt < BACKOFF_DELAYS_MS.length - 1) { 126 - await this.#sleep(BACKOFF_DELAYS_MS[attempt]) 147 + const delay = retryAfterMs ?? BACKOFF_DELAYS_MS[attempt] 148 + await this.#sleep(delay) 127 149 } 128 150 } 129 151 ··· 135 157 ) 136 158 } 137 159 } 160 + 161 + /** 162 + * Parse a `Retry-After` header value (integer seconds per RFC 7231 §7.1.3) 163 + * into milliseconds, capped at {@link MAX_RETRY_AFTER_MS}. Returns undefined 164 + * when the header is missing or unparseable so the caller falls back to the 165 + * static backoff schedule. HTTP-date form is not handled — Discord always 166 + * sends integer seconds (sometimes fractional). 167 + */ 168 + function parseRetryAfterHeader(headers: Headers | undefined): number | undefined { 169 + if (!headers || typeof headers.get !== 'function') return undefined 170 + const raw = headers.get('retry-after') 171 + if (!raw) return undefined 172 + const seconds = Number.parseFloat(raw) 173 + if (!Number.isFinite(seconds) || seconds < 0) return undefined 174 + return Math.min(seconds * 1000, MAX_RETRY_AFTER_MS) 175 + }
+57
tests/unit/jobs/threshold_scan_job.spec.ts
··· 350 350 }) 351 351 }) 352 352 353 + test.group('ThresholdScanJob — concurrent replica race', (group) => { 354 + group.each.setup(() => testUtils.db().withGlobalTransaction()) 355 + 356 + test('swallows PK collision when a concurrent replica already inserted the dedup row', async ({ 357 + assert, 358 + }) => { 359 + const uri = 'at://did:plc:alice/app.bsky.feed.post/race' 360 + const post = makePost({ 361 + uri, 362 + did: 'did:plc:alice', 363 + handle: 'alice.bsky.social', 364 + text: 'race', 365 + }) 366 + const atproto = makeAtproto({ posts: { [uri]: post } }) 367 + const clickhouse = makeClickHouse({ candidates: [{ subjectUri: uri, count: 1500 }] }) 368 + 369 + // Webhook "send" simulates the other replica winning the race by inserting 370 + // the (uri, 1000) row mid-flight. When our job later calls create(), SQLite 371 + // must raise UNIQUE and the job must swallow it without crashing. 372 + const webhook: DiscordWebhookService & { 373 + _calls: Array<{ webhookUrl: string; subjectUri: string; threshold: number }> 374 + } = { 375 + _calls: [], 376 + async send(params: { 377 + webhookUrl: string 378 + subjectUri: string 379 + threshold: number 380 + [key: string]: unknown 381 + }) { 382 + webhook._calls.push({ 383 + webhookUrl: params.webhookUrl, 384 + subjectUri: params.subjectUri, 385 + threshold: params.threshold, 386 + }) 387 + await NotifiedThreshold.create({ 388 + subjectUri: params.subjectUri, 389 + threshold: params.threshold, 390 + firedAt: Date.now() - 1, 391 + }) 392 + }, 393 + } as unknown as DiscordWebhookService & { _calls: typeof webhook._calls } 394 + 395 + const posthog = makePostHogCollector() 396 + const job = makeJob(atproto, clickhouse, webhook, { posthogCapture: posthog.capture }) 397 + 398 + // Must not throw — the UNIQUE-constraint error is expected and swallowed. 399 + await job.execute() 400 + 401 + const rows = await NotifiedThreshold.query().where('subject_uri', uri) 402 + assert.equal(rows.length, 1) 403 + const races = posthog.captures.filter((c) => c.event === 'firehose_dedup_race') 404 + assert.equal(races.length, 1) 405 + assert.equal(races[0].properties?.subject_uri, uri) 406 + assert.equal(races[0].properties?.threshold, 1000) 407 + }) 408 + }) 409 + 353 410 test.group('ThresholdScanJob — missing webhook URL', (group) => { 354 411 group.each.setup(() => testUtils.db().withGlobalTransaction()) 355 412
+193 -2
tests/unit/services/discord_webhook.spec.ts
··· 14 14 // Helpers 15 15 // --------------------------------------------------------------------------- 16 16 17 - type FetchCall = { url: string; body: unknown } 17 + type FetchCall = { url: string; body: unknown; signal?: AbortSignal | null } 18 + 19 + type FakeResponse = { ok: boolean; status: number; headers?: Record<string, string> } 18 20 19 - function makeFakeFetch(responses: Array<{ ok: boolean; status: number } | Error>): { 21 + function makeFakeFetch(responses: Array<FakeResponse | Error>): { 20 22 fn: typeof fetch 21 23 calls: FetchCall[] 22 24 } { ··· 26 28 calls.push({ 27 29 url, 28 30 body: init?.body ? JSON.parse(init.body as string) : undefined, 31 + signal: init?.signal ?? null, 29 32 }) 30 33 const response = responses[idx++] 31 34 if (!response) throw new Error('Unexpected fetch call (no response queued)') 32 35 if (response instanceof Error) throw response 36 + const headers = new Headers(response.headers ?? {}) 33 37 return { 34 38 ok: response.ok, 35 39 status: response.status, 40 + headers, 36 41 text: async () => '', 37 42 } 38 43 }) as unknown as typeof fetch ··· 207 212 }) 208 213 209 214 assert.equal(calls.length, 1) 215 + }) 216 + 217 + test('throws immediately on 400 without retrying', async ({ assert }) => { 218 + const { fn, calls } = makeFakeFetch([ 219 + { ok: false, status: 400 }, 220 + // These must not be consumed — the 400 must bail out. 221 + { ok: true, status: 204 }, 222 + ]) 223 + const sleepCalls: number[] = [] 224 + const svc = new DiscordWebhookService({ 225 + fetchImpl: fn, 226 + sleep: (ms) => { 227 + sleepCalls.push(ms) 228 + return Promise.resolve() 229 + }, 230 + }) 231 + 232 + await assert.rejects( 233 + () => 234 + svc.send({ 235 + webhookUrl: 'https://discord.example/hook', 236 + subjectUri: 'at://did:plc:abc/app.bsky.feed.post/rkey1', 237 + threshold: 1000, 238 + estimatedCount: 1000, 239 + authorHandle: 'a.bsky.social', 240 + postText: 'x', 241 + }), 242 + DiscordWebhookError 243 + ) 244 + assert.equal(calls.length, 1) 245 + assert.equal(sleepCalls.length, 0) 246 + }) 247 + 248 + test('honours Retry-After on 429 (seconds) in preference to static backoff', async ({ 249 + assert, 250 + }) => { 251 + const { fn } = makeFakeFetch([ 252 + { ok: false, status: 429, headers: { 'retry-after': '2' } }, 253 + { ok: true, status: 204 }, 254 + ]) 255 + const sleepCalls: number[] = [] 256 + const svc = new DiscordWebhookService({ 257 + fetchImpl: fn, 258 + sleep: (ms) => { 259 + sleepCalls.push(ms) 260 + return Promise.resolve() 261 + }, 262 + }) 263 + 264 + await svc.send({ 265 + webhookUrl: 'https://discord.example/hook', 266 + subjectUri: 'at://did:plc:abc/app.bsky.feed.post/rkey1', 267 + threshold: 1000, 268 + estimatedCount: 1000, 269 + authorHandle: 'a.bsky.social', 270 + postText: 'x', 271 + }) 272 + 273 + assert.equal(sleepCalls.length, 1) 274 + assert.equal(sleepCalls[0], 2000) 275 + }) 276 + 277 + test('429 without Retry-After falls back to static backoff', async ({ assert }) => { 278 + const { fn } = makeFakeFetch([ 279 + { ok: false, status: 429 }, 280 + { ok: true, status: 204 }, 281 + ]) 282 + const sleepCalls: number[] = [] 283 + const svc = new DiscordWebhookService({ 284 + fetchImpl: fn, 285 + sleep: (ms) => { 286 + sleepCalls.push(ms) 287 + return Promise.resolve() 288 + }, 289 + }) 290 + 291 + await svc.send({ 292 + webhookUrl: 'https://discord.example/hook', 293 + subjectUri: 'at://did:plc:abc/app.bsky.feed.post/rkey1', 294 + threshold: 1000, 295 + estimatedCount: 1000, 296 + authorHandle: 'a.bsky.social', 297 + postText: 'x', 298 + }) 299 + 300 + assert.equal(sleepCalls.length, 1) 301 + assert.equal(sleepCalls[0], 500) 302 + }) 303 + 304 + test('caps Retry-After at 10s to avoid blocking the worker', async ({ assert }) => { 305 + const { fn } = makeFakeFetch([ 306 + { ok: false, status: 429, headers: { 'retry-after': '9999' } }, 307 + { ok: true, status: 204 }, 308 + ]) 309 + const sleepCalls: number[] = [] 310 + const svc = new DiscordWebhookService({ 311 + fetchImpl: fn, 312 + sleep: (ms) => { 313 + sleepCalls.push(ms) 314 + return Promise.resolve() 315 + }, 316 + }) 317 + 318 + await svc.send({ 319 + webhookUrl: 'https://discord.example/hook', 320 + subjectUri: 'at://did:plc:abc/app.bsky.feed.post/rkey1', 321 + threshold: 1000, 322 + estimatedCount: 1000, 323 + authorHandle: 'a.bsky.social', 324 + postText: 'x', 325 + }) 326 + 327 + assert.equal(sleepCalls[0], 10_000) 328 + }) 329 + 330 + test('5xx still uses static backoff', async ({ assert }) => { 331 + const { fn } = makeFakeFetch([ 332 + { ok: false, status: 503 }, 333 + { ok: true, status: 204 }, 334 + ]) 335 + const sleepCalls: number[] = [] 336 + const svc = new DiscordWebhookService({ 337 + fetchImpl: fn, 338 + sleep: (ms) => { 339 + sleepCalls.push(ms) 340 + return Promise.resolve() 341 + }, 342 + }) 343 + 344 + await svc.send({ 345 + webhookUrl: 'https://discord.example/hook', 346 + subjectUri: 'at://did:plc:abc/app.bsky.feed.post/rkey1', 347 + threshold: 1000, 348 + estimatedCount: 1000, 349 + authorHandle: 'a.bsky.social', 350 + postText: 'x', 351 + }) 352 + 353 + assert.equal(sleepCalls[0], 500) 354 + }) 355 + 356 + test('treats AbortError (timeout) as retryable network error', async ({ assert }) => { 357 + // Simulate a hung socket: first attempt aborts, retry succeeds. 358 + const abortErr = new DOMException('The operation was aborted.', 'AbortError') 359 + const { fn, calls } = makeFakeFetch([abortErr, { ok: true, status: 204 }]) 360 + const sleepCalls: number[] = [] 361 + const svc = new DiscordWebhookService({ 362 + fetchImpl: fn, 363 + sleep: (ms) => { 364 + sleepCalls.push(ms) 365 + return Promise.resolve() 366 + }, 367 + }) 368 + 369 + await svc.send({ 370 + webhookUrl: 'https://discord.example/hook', 371 + subjectUri: 'at://did:plc:abc/app.bsky.feed.post/rkey1', 372 + threshold: 1000, 373 + estimatedCount: 1000, 374 + authorHandle: 'a.bsky.social', 375 + postText: 'x', 376 + }) 377 + 378 + assert.equal(calls.length, 2) 379 + assert.equal(sleepCalls.length, 1) 380 + assert.equal(sleepCalls[0], 500) 381 + // Each attempt should have been given an AbortSignal. 382 + assert.isOk(calls[0].signal) 383 + assert.isOk(calls[1].signal) 384 + }) 385 + 386 + test('passes an AbortSignal on each attempt so hung sockets cannot stall', async ({ assert }) => { 387 + const { fn, calls } = makeFakeFetch([{ ok: true, status: 204 }]) 388 + const svc = new DiscordWebhookService({ fetchImpl: fn, sleep: () => Promise.resolve() }) 389 + 390 + await svc.send({ 391 + webhookUrl: 'https://discord.example/hook', 392 + subjectUri: 'at://did:plc:abc/app.bsky.feed.post/rkey1', 393 + threshold: 1000, 394 + estimatedCount: 1000, 395 + authorHandle: 'a.bsky.social', 396 + postText: 'x', 397 + }) 398 + 399 + assert.isOk(calls[0].signal) 400 + assert.instanceOf(calls[0].signal, AbortSignal) 210 401 }) 211 402 })