See the best posts from any Bluesky account
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Time out and retry stalled getAuthorFeed calls during backfill

Cold BunnyCDN paths on public.api.bsky.app can return 200 responses 2+
minutes after the request — observed in production where a single
getAuthorFeed page hung for 117s while the backfill loop sat idle. Add
a per-attempt AbortSignal timeout (15s) with 2 retries; on exhaustion,
throw a new BlueskyStalledError. The backfill loop catches it and
retries the same cursor indefinitely so a stall can't leave a user's
posts permanently half-loaded.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+304 -36
+32 -2
app/jobs/backfill_job.ts
··· 2 2 import { inject } from '@adonisjs/core' 3 3 import { Job } from '@adonisjs/queue' 4 4 import { record, setAttributes } from '@adonisjs/otel/helpers' 5 - import { AtprotoClient, parseGetAuthorFeedResponse } from '#lib/atproto/index' 5 + import { AtprotoClient, BlueskyStalledError, parseGetAuthorFeedResponse } from '#lib/atproto/index' 6 6 import { ClickHouseStore } from '#lib/clickhouse/index' 7 7 import TrackedProfile from '#models/tracked_profile' 8 8 import BackfillJobRow from '#models/backfill_job' ··· 14 14 export interface BackfillJobPayload { 15 15 did: string 16 16 } 17 + 18 + /** 19 + * Sleep between stall retries at the outer backfill loop. We've already 20 + * waited PER_ATTEMPT_TIMEOUT_MS × (MAX_STALL_RETRIES + 1) inside the 21 + * AtprotoClient before getting here, so a brief 5s pause keeps us from 22 + * hot-spinning on a persistently cold endpoint without adding significant 23 + * user-facing delay when the retry succeeds. 24 + */ 25 + const STALL_RETRY_SLEEP_MS = 5_000 26 + 27 + const sleep = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms)) 17 28 18 29 // --------------------------------------------------------------------------- 19 30 // BackfillJob ··· 110 121 111 122 // ------------------------------------------------------------------------- 112 123 // Main backfill loop 124 + // 125 + // getAuthorFeed stalls on cold BunnyCDN paths are routine — see the 126 + // incident where a single page hung for 117s before returning 200. The 127 + // client aborts at PER_ATTEMPT_TIMEOUT_MS and retries up to MAX_STALL_RETRIES 128 + // times; if even that budget is exhausted, it throws BlueskyStalledError. 129 + // We catch it here and retry the same cursor with a small delay — the 130 + // cursor is in-memory so no page is lost, and a fresh retry typically 131 + // hits a warmer cache node. This is the "retry indefinitely until 132 + // fully loaded" contract the feature needs. 113 133 // ------------------------------------------------------------------------- 114 134 115 135 let cursor: string | undefined 116 136 let fetchedCount = 0 117 137 118 138 while (true) { 119 - const page = await this.atprotoClient.getAuthorFeed(did, cursor, 100) 139 + let page: Awaited<ReturnType<AtprotoClient['getAuthorFeed']>> 140 + try { 141 + page = await this.atprotoClient.getAuthorFeed(did, cursor, 100) 142 + } catch (err) { 143 + if (err instanceof BlueskyStalledError) { 144 + setAttributes({ 'backfill.last_stall_at': new Date().toISOString() }) 145 + await sleep(STALL_RETRY_SLEEP_MS) 146 + continue 147 + } 148 + throw err 149 + } 120 150 121 151 // Parse the whole page, filtering reposts of other users client-side. 122 152 const snapshots = parseGetAuthorFeedResponse({ feed: page.posts }, did)
+134 -33
app/lib/atproto/client.ts
··· 89 89 } 90 90 } 91 91 92 + /** 93 + * Thrown when Bluesky's AppView repeatedly fails to respond within 94 + * PER_ATTEMPT_TIMEOUT_MS. Distinct from BlueskyRateLimitedError because a 95 + * stall has no 429 signal — the socket just sits there until we abort it. 96 + * 97 + * Callers that care about forward progress (e.g. the backfill loop) should 98 + * catch this and retry at the same cursor rather than failing the whole job: 99 + * a stall is almost always a cold BunnyCDN path, and retrying usually hits 100 + * a different edge node. 101 + */ 102 + export class BlueskyStalledError extends Error { 103 + constructor( 104 + message: string, 105 + readonly attempts: number, 106 + readonly cause?: unknown 107 + ) { 108 + super(message, { cause }) 109 + this.name = 'BlueskyStalledError' 110 + } 111 + } 112 + 92 113 /** XRPCError-shaped errors (from @atproto/api) carry `error` as the lexicon error name. */ 93 114 function isAccountDeactivatedXrpcError(err: unknown): boolean { 94 115 return err instanceof Error && (err as Error & { error?: string }).error === 'AccountDeactivated' ··· 107 128 export const RETRY_DELAYS_MS = [1000, 2000, 4000, 8000, 16000] 108 129 const MAX_RETRIES = RETRY_DELAYS_MS.length 109 130 131 + /** 132 + * Per-attempt timeout for a single HTTP call to the AppView. Baseline p99 is 133 + * ~2s and real stalls start above ~10s; 15s is a compromise between leaving 134 + * legitimately slow responses alone and recovering from multi-minute cold- 135 + * BunnyCDN hangs. 136 + */ 137 + export const DEFAULT_PER_ATTEMPT_TIMEOUT_MS = 15_000 138 + 139 + /** 140 + * Max retries when a request times out (AbortError). 2 retries = 3 total 141 + * attempts, capping worst-case wall-clock around 142 + * 3 × PER_ATTEMPT_TIMEOUT_MS (≈45s) before we give up and throw 143 + * BlueskyStalledError. Stalls don't get the exponential sleep that 429s do 144 + * — the whole point is that we've already been waiting 15s, so a retry 145 + * should fire immediately and hopefully hit a warmer cache node. 146 + */ 147 + const MAX_STALL_RETRIES = 2 148 + 110 149 /** Default sleep implementation using setTimeout */ 111 150 const defaultSleep = (ms: number): Promise<void> => 112 151 new Promise((resolve) => setTimeout(resolve, ms)) 113 152 153 + /** 154 + * Detect an AbortError produced by our per-attempt timer (or any upstream 155 + * code that uses AbortController). Checks the error itself and its `cause` 156 + * chain because some XRPC libraries wrap the underlying fetch abort. 157 + */ 158 + function isAbortError(err: unknown): boolean { 159 + if (!(err instanceof Error)) return false 160 + if (err.name === 'AbortError' || err.name === 'TimeoutError') return true 161 + const cause = (err as Error & { cause?: unknown }).cause 162 + if (cause instanceof Error && (cause.name === 'AbortError' || cause.name === 'TimeoutError')) { 163 + return true 164 + } 165 + return false 166 + } 167 + 114 168 // --------------------------------------------------------------------------- 115 169 // Request observation hook 116 170 // --------------------------------------------------------------------------- ··· 195 249 identity: { 196 250 resolveHandle: ( 197 251 params: { handle: string }, 198 - opts?: unknown 252 + opts?: { signal?: AbortSignal } 199 253 ) => Promise<{ data: { did: string }; headers: Record<string, string | undefined> }> 200 254 } 201 255 } ··· 205 259 actor: { 206 260 getProfile: ( 207 261 params: { actor: string }, 208 - opts?: unknown 262 + opts?: { signal?: AbortSignal } 209 263 ) => Promise<{ 210 264 data: { 211 265 postsCount?: number ··· 217 271 }> 218 272 searchActorsTypeahead: ( 219 273 params: { q: string; limit?: number }, 220 - opts?: unknown 274 + opts?: { signal?: AbortSignal } 221 275 ) => Promise<{ 222 276 data: { actors: TypeaheadActor[] } 223 277 headers: Record<string, string | undefined> ··· 226 280 feed: { 227 281 getAuthorFeed: ( 228 282 params: { actor: string; cursor?: string; limit?: number; filter?: AuthorFeedFilter }, 229 - opts?: unknown 283 + opts?: { signal?: AbortSignal } 230 284 ) => Promise<{ 231 285 data: { feed: FeedViewPost[]; cursor?: string } 232 286 headers: Record<string, string | undefined> 233 287 }> 234 288 getPosts: ( 235 289 params: { uris: string[] }, 236 - opts?: unknown 290 + opts?: { signal?: AbortSignal } 237 291 ) => Promise<{ 238 292 data: { posts: PostView[] } 239 293 headers: Record<string, string | undefined> ··· 274 328 export class AtprotoClient { 275 329 private readonly sleepFn: (ms: number) => Promise<void> 276 330 private readonly onRequest?: AtprotoRequestHook 331 + private readonly perAttemptTimeoutMs: number 277 332 278 333 constructor( 279 334 private readonly agent: AgentLike, 280 335 sleepFn: (ms: number) => Promise<void> = defaultSleep, 281 - onRequest?: AtprotoRequestHook 336 + onRequest?: AtprotoRequestHook, 337 + perAttemptTimeoutMs: number = DEFAULT_PER_ATTEMPT_TIMEOUT_MS 282 338 ) { 283 339 this.sleepFn = sleepFn 284 340 this.onRequest = onRequest 341 + this.perAttemptTimeoutMs = perAttemptTimeoutMs 285 342 } 286 343 287 344 private emit(event: AtprotoRequestEvent): void { ··· 295 352 } 296 353 297 354 /** 298 - * Wrap a call in exponential-backoff retry logic for 429s. 299 - * Throws BlueskyRateLimitedError when retries are exhausted. 355 + * Wrap a call in retry logic handling two failure modes: 356 + * 357 + * - 429 rate limiting: exponential backoff (1s → 2s → 4s → 8s → 16s), up 358 + * to MAX_RETRIES. Exhaustion throws BlueskyRateLimitedError. 359 + * - Per-attempt timeout: if the call doesn't complete in 360 + * perAttemptTimeoutMs, abort it via AbortSignal and retry with no 361 + * sleep (we've already waited long enough). Up to MAX_STALL_RETRIES 362 + * retries. Exhaustion throws BlueskyStalledError. 363 + * 364 + * These two counters are independent — a mix of 429s and stalls each get 365 + * their own budget, which is the behavior you want when the AppView is 366 + * simultaneously rate-limiting and occasionally stalling. 300 367 * 301 368 * Emits an AtprotoRequestEvent for every attempt (success or failure) via 302 369 * the optional onRequest hook. On success, the caller is responsible for ··· 306 373 */ 307 374 private async withRetry<T>( 308 375 endpoint: AtprotoRequestEvent['endpoint'], 309 - fn: () => Promise<T> 376 + fn: (signal: AbortSignal) => Promise<T> 310 377 ): Promise<{ value: T; attempts: number; totalLatencyMs: number }> { 311 378 let lastError: unknown 312 379 let totalLatencyMs = 0 313 - for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { 380 + let totalAttempts = 0 381 + let rateLimitRetries = 0 382 + let stallRetries = 0 383 + 384 + while (true) { 385 + totalAttempts += 1 386 + const controller = new AbortController() 387 + const timer = setTimeout(() => controller.abort(), this.perAttemptTimeoutMs) 314 388 const started = Date.now() 315 389 try { 316 - const value = await fn() 390 + const value = await fn(controller.signal) 391 + clearTimeout(timer) 317 392 totalLatencyMs += Date.now() - started 318 - return { value, attempts: attempt + 1, totalLatencyMs } 393 + return { value, attempts: totalAttempts, totalLatencyMs } 319 394 } catch (err) { 395 + clearTimeout(timer) 320 396 const latencyMs = Date.now() - started 321 397 totalLatencyMs += latencyMs 322 398 lastError = err 323 399 const status = (err as Error & { status?: number }).status 400 + const aborted = isAbortError(err) 324 401 this.emit({ 325 402 endpoint, 326 - attempt: attempt + 1, 403 + attempt: totalAttempts, 327 404 latencyMs, 328 405 ok: false, 329 406 status, 330 407 error: err instanceof Error ? err.message : String(err), 331 408 }) 332 - if (!is429(err)) throw err 333 409 334 - if (attempt === MAX_RETRIES) break 410 + if (aborted) { 411 + if (stallRetries >= MAX_STALL_RETRIES) { 412 + throw new BlueskyStalledError( 413 + `Bluesky AppView stalled: ${MAX_STALL_RETRIES + 1} consecutive attempts exceeded ${this.perAttemptTimeoutMs}ms timeout`, 414 + stallRetries + 1, 415 + lastError 416 + ) 417 + } 418 + stallRetries += 1 419 + continue 420 + } 335 421 336 - const delay = RETRY_DELAYS_MS[attempt] 337 - await this.sleepFn(delay) 422 + if (!is429(err)) throw err 423 + 424 + if (rateLimitRetries >= MAX_RETRIES) break 425 + await this.sleepFn(RETRY_DELAYS_MS[rateLimitRetries]) 426 + rateLimitRetries += 1 338 427 } 339 428 } 429 + 340 430 throw new BlueskyRateLimitedError( 341 431 `Bluesky AppView rate limit exceeded after ${MAX_RETRIES + 1} attempts: ${ 342 432 lastError instanceof Error ? lastError.message : String(lastError) ··· 359 449 value: response, 360 450 attempts, 361 451 totalLatencyMs, 362 - } = await this.withRetry('resolveHandle', () => 363 - this.agent.com.atproto.identity.resolveHandle({ handle }) 452 + } = await this.withRetry('resolveHandle', (signal) => 453 + this.agent.com.atproto.identity.resolveHandle({ handle }, { signal }) 364 454 ) 365 455 const headers = response.headers as Record<string, string | undefined> 366 456 this.emit({ ··· 397 487 totalLatencyMs: number 398 488 } 399 489 try { 400 - result = await this.withRetry('getProfile', () => 401 - this.agent.app.bsky.actor.getProfile({ actor: did }) 490 + result = await this.withRetry('getProfile', (signal) => 491 + this.agent.app.bsky.actor.getProfile({ actor: did }, { signal }) 402 492 ) 403 493 } catch (err) { 404 494 if (isAccountDeactivatedXrpcError(err)) { ··· 437 527 value: response, 438 528 attempts, 439 529 totalLatencyMs, 440 - } = await this.withRetry('searchActorsTypeahead', () => 441 - this.agent.app.bsky.actor.searchActorsTypeahead({ q, limit }) 530 + } = await this.withRetry('searchActorsTypeahead', (signal) => 531 + this.agent.app.bsky.actor.searchActorsTypeahead({ q, limit }, { signal }) 442 532 ) 443 533 const headers = response.headers as Record<string, string | undefined> 444 534 this.emit({ ··· 477 567 value: response, 478 568 attempts, 479 569 totalLatencyMs, 480 - } = await this.withRetry('getAuthorFeed', () => 481 - this.agent.app.bsky.feed.getAuthorFeed({ 482 - actor: did, 483 - cursor, 484 - limit, 485 - filter, 486 - }) 570 + } = await this.withRetry('getAuthorFeed', (signal) => 571 + this.agent.app.bsky.feed.getAuthorFeed( 572 + { 573 + actor: did, 574 + cursor, 575 + limit, 576 + filter, 577 + }, 578 + { signal } 579 + ) 487 580 ) 488 581 const headers = response.headers as Record<string, string | undefined> 489 582 this.emit({ ··· 526 619 value: response, 527 620 attempts, 528 621 totalLatencyMs, 529 - } = await this.withRetry('getPosts', () => this.agent.app.bsky.feed.getPosts({ uris: batch })) 622 + } = await this.withRetry('getPosts', (signal) => 623 + this.agent.app.bsky.feed.getPosts({ uris: batch }, { signal }) 624 + ) 530 625 const headers = response.headers as Record<string, string | undefined> 531 626 this.emit({ 532 627 endpoint: 'getPosts', ··· 561 656 */ 562 657 export function createAtprotoClient( 563 658 onRequest?: AtprotoRequestHook, 564 - fetchImpl?: typeof globalThis.fetch 659 + fetchImpl?: typeof globalThis.fetch, 660 + perAttemptTimeoutMs: number = DEFAULT_PER_ATTEMPT_TIMEOUT_MS 565 661 ): AtprotoClient { 566 662 const agent = new Agent({ 567 663 service: 'https://public.api.bsky.app', 568 664 headers: { 'User-Agent': ATPROTO_USER_AGENT }, 569 665 ...(fetchImpl ? { fetch: fetchImpl } : {}), 570 666 } as never) 571 - return new AtprotoClient(agent as unknown as AgentLike, defaultSleep, onRequest) 667 + return new AtprotoClient( 668 + agent as unknown as AgentLike, 669 + defaultSleep, 670 + onRequest, 671 + perAttemptTimeoutMs 672 + ) 572 673 }
+1
app/lib/atproto/index.ts
··· 30 30 export { 31 31 AtprotoClient, 32 32 BlueskyRateLimitedError, 33 + BlueskyStalledError, 33 34 AccountDeactivatedError, 34 35 createAtprotoClient, 35 36 } from './client.js'
+137 -1
tests/unit/atproto/client.spec.ts
··· 1 1 import { test } from '@japa/runner' 2 - import { AtprotoClient, BlueskyRateLimitedError, AccountDeactivatedError } from '#lib/atproto/index' 2 + import { 3 + AtprotoClient, 4 + BlueskyRateLimitedError, 5 + BlueskyStalledError, 6 + AccountDeactivatedError, 7 + } from '#lib/atproto/index' 3 8 import { ATPROTO_USER_AGENT, createAtprotoClient } from '#lib/atproto/client' 4 9 5 10 // --------------------------------------------------------------------------- ··· 27 32 headers: Record<string, string> 28 33 } 29 34 getAuthorFeedError?: Error 35 + /** 36 + * When set, getAuthorFeed returns a promise that only rejects on 37 + * signal abort. Use an array to configure per-call behavior: `true` hangs, 38 + * `false` resolves with getAuthorFeedResponse. A single `true` hangs every 39 + * call. 40 + */ 41 + getAuthorFeedHang?: boolean | boolean[] 30 42 getPostsResponse?: { 31 43 data: { posts: unknown[] } 32 44 headers: Record<string, string> ··· 71 83 }, 72 84 feed: { 73 85 getAuthorFeed: async (...args: unknown[]) => { 86 + const callIndex = calls.filter((c) => c.method === 'getAuthorFeed').length 74 87 calls.push({ method: 'getAuthorFeed', args }) 75 88 if (config.getAuthorFeedError) throw config.getAuthorFeedError 89 + 90 + const hangThisCall = Array.isArray(config.getAuthorFeedHang) 91 + ? config.getAuthorFeedHang[callIndex] === true 92 + : config.getAuthorFeedHang === true 93 + 94 + if (hangThisCall) { 95 + const opts = args[1] as { signal?: AbortSignal } | undefined 96 + const signal = opts?.signal 97 + return new Promise((_resolve, reject) => { 98 + if (!signal) return // will hang forever if test forgot to pass one 99 + if (signal.aborted) { 100 + const err = new Error('aborted') 101 + err.name = 'AbortError' 102 + reject(err) 103 + return 104 + } 105 + signal.addEventListener('abort', () => { 106 + const err = new Error('aborted') 107 + err.name = 'AbortError' 108 + reject(err) 109 + }) 110 + }) 111 + } 76 112 return config.getAuthorFeedResponse! 77 113 }, 78 114 getPosts: async (...args: unknown[]) => { ··· 513 549 assert.equal(rlErr.attempts, 6) 514 550 assert.strictEqual(rlErr.cause, rateLimitError) 515 551 }).timeout(10000) 552 + }) 553 + 554 + test.group('AtprotoClient — stall handling', () => { 555 + // Keep per-attempt timeout small so these tests run in tens of ms. 556 + const TINY_TIMEOUT_MS = 25 557 + 558 + test('aborts a hanging request after perAttemptTimeoutMs', async ({ assert }) => { 559 + const agent = makeMockAgent({ getAuthorFeedHang: true }) 560 + const client = new AtprotoClient(agent as never, noopSleep, undefined, TINY_TIMEOUT_MS) 561 + 562 + await assert.rejects(async () => { 563 + await client.getAuthorFeed('did:plc:abc') 564 + }, BlueskyStalledError) 565 + 566 + // 1 original + MAX_STALL_RETRIES (=2) retries = 3 calls 567 + assert.equal(agent._calls.length, 3) 568 + }).timeout(2000) 569 + 570 + test('recovers when a stall is followed by success', async ({ assert }) => { 571 + const agent = makeMockAgent({ 572 + getAuthorFeedHang: [true, false], 573 + getAuthorFeedResponse: { 574 + data: { feed: [], cursor: undefined }, 575 + headers: EMPTY_HEADERS, 576 + }, 577 + }) 578 + const client = new AtprotoClient(agent as never, noopSleep, undefined, TINY_TIMEOUT_MS) 579 + 580 + const result = await client.getAuthorFeed('did:plc:abc') 581 + assert.deepEqual(result.posts, []) 582 + assert.equal(agent._calls.length, 2) 583 + }).timeout(2000) 584 + 585 + test('passes an AbortSignal to the underlying agent call', async ({ assert }) => { 586 + const agent = makeMockAgent({ 587 + getAuthorFeedResponse: { 588 + data: { feed: [], cursor: undefined }, 589 + headers: EMPTY_HEADERS, 590 + }, 591 + }) 592 + const client = new AtprotoClient(agent as never, noopSleep, undefined, TINY_TIMEOUT_MS) 593 + await client.getAuthorFeed('did:plc:abc') 594 + 595 + const opts = agent._calls[0].args[1] as { signal?: unknown } | undefined 596 + assert.instanceOf(opts?.signal, AbortSignal) 597 + }) 598 + 599 + test('BlueskyStalledError carries attempts count and cause', async ({ assert }) => { 600 + const agent = makeMockAgent({ getAuthorFeedHang: true }) 601 + const client = new AtprotoClient(agent as never, noopSleep, undefined, TINY_TIMEOUT_MS) 602 + 603 + let caughtErr: unknown 604 + try { 605 + await client.getAuthorFeed('did:plc:abc') 606 + } catch (err) { 607 + caughtErr = err 608 + } 609 + 610 + assert.instanceOf(caughtErr, BlueskyStalledError) 611 + const stallErr = caughtErr as BlueskyStalledError 612 + assert.equal(stallErr.attempts, 3) 613 + assert.instanceOf(stallErr.cause, Error) 614 + assert.equal((stallErr.cause as Error).name, 'AbortError') 615 + }).timeout(2000) 616 + 617 + test('does not abort a fast synchronous 429 — 429 path keeps its own retry budget', async ({ 618 + assert, 619 + }) => { 620 + // Regression guard: the per-attempt timer must not fire for quickly- 621 + // rejected calls, and stall retries must not be consumed by 429s. 622 + const rateLimitError = new Error('Rate limit exceeded') 623 + ;(rateLimitError as Error & { status: number }).status = 429 624 + 625 + const agent = makeMockAgent({ getAuthorFeedError: rateLimitError }) 626 + const client = new AtprotoClient(agent as never, noopSleep, undefined, TINY_TIMEOUT_MS) 627 + 628 + await assert.rejects(async () => { 629 + await client.getAuthorFeed('did:plc:abc') 630 + }, BlueskyRateLimitedError) 631 + 632 + // 1 original + 5 retries = 6 calls, same as before this change. 633 + assert.equal(agent._calls.length, 6) 634 + }).timeout(5000) 635 + }) 636 + 637 + test.group('BlueskyStalledError', () => { 638 + test('is an Error subclass', ({ assert }) => { 639 + const err = new BlueskyStalledError('stalled', 3) 640 + assert.instanceOf(err, Error) 641 + assert.instanceOf(err, BlueskyStalledError) 642 + assert.include(err.message, 'stalled') 643 + assert.equal(err.name, 'BlueskyStalledError') 644 + }) 645 + 646 + test('carries structured attempts count and cause', ({ assert }) => { 647 + const cause = new Error('aborted') 648 + const err = new BlueskyStalledError('stalled', 3, cause) 649 + assert.equal(err.attempts, 3) 650 + assert.strictEqual(err.cause, cause) 651 + }) 516 652 }) 517 653 518 654 test.group('createAtprotoClient — User-Agent', () => {