Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: retry transient failures on data source fetches

Hugo 2d3fcf77 0f51de91

+132 -12
+24 -3
docs/data-source-fetches.md
··· 216 216 A fetch that errors while carrying conditions is also treated as a skip; 217 217 see [Interaction with fetch errors](#interaction-with-fetch-errors). 218 218 219 + ## Retries on transient failures 220 + 221 + Every outbound request made by the fetchers (PDS `getRecord`, `listRecords`, 222 + appview `getRelationships`) goes through `fetchWithRetry` in 223 + [lib/pds/fetch-with-retry.ts](../lib/pds/fetch-with-retry.ts). A single request 224 + is transparently retried on: 225 + 226 + - Thrown errors from `fetch` (DNS failure, connection reset, TLS error). 227 + - `AbortSignal.timeout` firing (per-attempt timeout). 228 + - Any `5xx` response from the server. 229 + 230 + Retry schedule: `1s + 3s` (two retries). 4xx responses are stable answers and 231 + are never retried; in particular, 404 and the XRPC `RecordNotFound` body on 232 + HTTP 400 still resolve to `found: false` on the first attempt. 233 + 234 + Retries run inline in the handler, so a slow retry does delay the event's 235 + processing. Two retries with this schedule add at most ~4s on top of the 236 + per-attempt timeouts (currently 10s each). After retries are exhausted, the 237 + error surfaces to the caller as described below. 238 + 219 239 ## Interaction with fetch errors 220 240 221 - A fetch **error** (bad URI, PDS unreachable, search throws) is distinct from a 222 - fetch **not-finding** anything. Errors are collected into the `errors` array 223 - on the `FetchResolution` and the entry is not added to the context. 241 + A fetch **error** (bad URI, PDS unreachable after retries, search throws) is 242 + distinct from a fetch **not-finding** anything. Errors are collected into the 243 + `errors` array on the `FetchResolution` and the entry is not added to the 244 + context. 224 245 225 246 Whether the automation fires when a fetch errors depends on whether that 226 247 fetch carries **conditions**:
+28
lib/actions/searcher.test.ts
··· 143 143 expect(mockFetch).toHaveBeenCalledTimes(100); 144 144 }); 145 145 146 + it("retries listRecords on 5xx and succeeds if a retry recovers", async () => { 147 + vi.useFakeTimers(); 148 + try { 149 + mockResolveDid.mockResolvedValueOnce("https://pds.example.com"); 150 + mockFetch.mockResolvedValueOnce({ ok: false, status: 503 }).mockResolvedValueOnce({ 151 + ok: true, 152 + json: async () => ({ 153 + records: [ 154 + { 155 + uri: "at://did:plc:owner/id.sifa.graph.follow/rk1", 156 + cid: "c1", 157 + value: { subject: "did:plc:subj" }, 158 + }, 159 + ], 160 + }), 161 + }); 162 + 163 + const promise = executeSearch(makeSearchStep(), event, ownerDid, {}); 164 + await vi.runAllTimersAsync(); 165 + const result = await promise; 166 + 167 + expect(result.found).toBe(true); 168 + expect(mockFetch).toHaveBeenCalledTimes(2); 169 + } finally { 170 + vi.useRealTimers(); 171 + } 172 + }); 173 + 146 174 it("renders templates in repo and where values", async () => { 147 175 mockResolveDid.mockResolvedValueOnce("https://pds.example.com"); 148 176 mockFetch.mockResolvedValueOnce({ ok: true, json: async () => ({ records: [] }) });
+5 -4
lib/actions/searcher.ts
··· 1 1 import type { FetchStepSearch } from "../db/schema.js"; 2 2 import type { JetstreamEvent } from "../jetstream/matcher.js"; 3 3 import { resolveDid } from "../pds/resolver.js"; 4 + import { fetchWithRetry } from "../pds/fetch-with-retry.js"; 4 5 import { 5 6 resolvePlaceholder, 6 7 type FetchContext, ··· 45 46 46 47 let res: Response; 47 48 try { 48 - res = await fetch(url, { 49 + res = await fetchWithRetry(url, { 49 50 headers: { Accept: "application/json" }, 50 - signal: AbortSignal.timeout(HTTP_TIMEOUT_MS), 51 + timeoutMs: HTTP_TIMEOUT_MS, 51 52 }); 52 53 } catch { 53 54 return null; // fall through to listRecords on transport failure ··· 94 95 url.searchParams.set("limit", String(LIST_RECORDS_PAGE_SIZE)); 95 96 if (cursor) url.searchParams.set("cursor", cursor); 96 97 97 - const res = await fetch(url, { 98 + const res = await fetchWithRetry(url, { 98 99 headers: { Accept: "application/json" }, 99 - signal: AbortSignal.timeout(HTTP_TIMEOUT_MS), 100 + timeoutMs: HTTP_TIMEOUT_MS, 100 101 }); 101 102 if (!res.ok) throw new Error(`listRecords failed (${res.status}) for ${repo}/${collection}`); 102 103
+72 -3
lib/pds/resolver.test.ts
··· 112 112 113 113 beforeEach(() => { 114 114 mockResolve = vi.spyOn(didResolver, "resolve"); 115 + mockFetch.mockReset(); 115 116 vi.stubGlobal("fetch", mockFetch); 116 117 }); 117 118 ··· 192 193 expect(result.record).toEqual({}); 193 194 }); 194 195 195 - it("throws when PDS getRecord fails with non-404 error", async () => { 196 + it("throws when PDS getRecord fails with non-404 error (after retries)", async () => { 197 + vi.useFakeTimers(); 198 + try { 199 + mockResolve.mockResolvedValueOnce({ 200 + id: "did:plc:abc", 201 + service: [ 202 + { 203 + id: "#atproto_pds", 204 + type: "AtprotoPersonalDataServer", 205 + serviceEndpoint: "https://pds.example.com", 206 + }, 207 + ], 208 + }); 209 + // Three 5xx responses: initial attempt + two retries, all exhausted. 210 + mockFetch.mockResolvedValue({ ok: false, status: 500 }); 211 + 212 + const promise = fetchRecord("at://did:plc:abc/col/rk"); 213 + // Drain the retry delays (1s + 3s). 214 + const assertion = expect(promise).rejects.toThrow("getRecord failed (500)"); 215 + await vi.runAllTimersAsync(); 216 + await assertion; 217 + expect(mockFetch).toHaveBeenCalledTimes(3); 218 + } finally { 219 + vi.useRealTimers(); 220 + } 221 + }); 222 + 223 + it("retries 5xx responses and returns success when a retry succeeds", async () => { 224 + vi.useFakeTimers(); 225 + try { 226 + mockResolve.mockResolvedValueOnce({ 227 + id: "did:plc:abc", 228 + service: [ 229 + { 230 + id: "#atproto_pds", 231 + type: "AtprotoPersonalDataServer", 232 + serviceEndpoint: "https://pds.example.com", 233 + }, 234 + ], 235 + }); 236 + mockFetch.mockResolvedValueOnce({ ok: false, status: 502 }).mockResolvedValueOnce({ 237 + ok: true, 238 + json: async () => ({ 239 + uri: "at://did:plc:abc/col/rk", 240 + cid: "c", 241 + value: { ok: true }, 242 + }), 243 + }); 244 + 245 + const promise = fetchRecord("at://did:plc:abc/col/rk"); 246 + await vi.runAllTimersAsync(); 247 + const result = await promise; 248 + 249 + expect(result.found).toBe(true); 250 + expect(result.record).toEqual({ ok: true }); 251 + expect(mockFetch).toHaveBeenCalledTimes(2); 252 + } finally { 253 + vi.useRealTimers(); 254 + } 255 + }); 256 + 257 + it("does not retry 4xx responses (besides the RecordNotFound normalization)", async () => { 196 258 mockResolve.mockResolvedValueOnce({ 197 259 id: "did:plc:abc", 198 260 service: [ ··· 203 265 }, 204 266 ], 205 267 }); 206 - mockFetch.mockResolvedValueOnce({ ok: false, status: 500 }); 268 + mockFetch.mockResolvedValueOnce({ 269 + ok: false, 270 + status: 400, 271 + json: async () => ({ error: "InvalidRequest" }), 272 + }); 207 273 208 - await expect(fetchRecord("at://did:plc:abc/col/rk")).rejects.toThrow("getRecord failed (500)"); 274 + await expect(fetchRecord("at://did:plc:abc/col/rk")).rejects.toThrow( 275 + "getRecord failed (400 InvalidRequest)", 276 + ); 277 + expect(mockFetch).toHaveBeenCalledTimes(1); 209 278 }); 210 279 211 280 it("returns found:false when PDS returns 400 with error=RecordNotFound", async () => {
+3 -2
lib/pds/resolver.ts
··· 1 1 import { createRequire } from "node:module"; 2 + import { fetchWithRetry } from "./fetch-with-retry.js"; 2 3 3 4 // Load via require() — Vite's SSR transform breaks CJS packages ("exports is not defined"). 4 5 // Same workaround used in auth/client.ts for @atproto/oauth-client-node. ··· 84 85 85 86 let res: Response; 86 87 try { 87 - res = await fetch(url, { 88 + res = await fetchWithRetry(url, { 88 89 headers: { Accept: "application/json" }, 89 - signal: AbortSignal.timeout(10_000), 90 + timeoutMs: 10_000, 90 91 }); 91 92 } catch (err) { 92 93 console.error(`PDS getRecord fetch error for ${atUri} (${url}):`, err);