The AtmosphereConf talks your skyline missed
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #21 from musicjunkieg/fix/oauth-request-lock

fix: OAuth requestLock + authenticated searchPosts

authored by

chaos gremlin and committed by
GitHub
84b2feb7 78876724

+114 -52
+39
src/lib/auth/client.ts
··· 8 8 const stateStore = new Map<string, NodeSavedState>(); 9 9 const sessionStore = new Map<string, NodeSavedSession>(); 10 10 11 + // Single-process request lock: serializes access to a given key (e.g. a 12 + // session DID) so concurrent token refreshes never race. Without this, 13 + // @atproto/oauth-client-node prints 14 + // "No lock mechanism provided. Credentials might get revoked." 15 + // and two in-flight requests for the same user can both try to rotate the 16 + // refresh token — the losing request has its credentials invalidated and 17 + // the next AppView call returns 500 through the PDS proxy. (That was the 18 + // original root cause of the network-attention regression on staging.) 19 + // 20 + // Implementation per @atproto/oauth-client-node docs: a Map of per-key 21 + // promise chains. `fn` runs only after the previous holder releases, and 22 + // `releaseLock` is always called in finally so a thrown `fn` doesn't wedge 23 + // the chain for subsequent waiters. 24 + const locks = new Map<string, Promise<void>>(); 25 + async function requestLock<T>( 26 + key: string, 27 + fn: () => T | PromiseLike<T>, 28 + ): Promise<T> { 29 + const prevLock = locks.get(key) ?? Promise.resolve(); 30 + let releaseLock!: () => void; 31 + const currentLock = new Promise<void>((resolve) => { 32 + releaseLock = resolve; 33 + }); 34 + locks.set(key, currentLock); 35 + try { 36 + await prevLock; 37 + return await fn(); 38 + } finally { 39 + releaseLock(); 40 + // If this was the last waiter, drop the map entry so it doesn't leak 41 + // across long-running processes. Comparing by reference is safe: a new 42 + // waiter would have replaced the entry before we got here. 43 + if (locks.get(key) === currentLock) { 44 + locks.delete(key); 45 + } 46 + } 47 + } 48 + 11 49 function createClient(): NodeOAuthClient { 12 50 const appUrl = process.env.APP_URL; 13 51 ··· 20 58 21 59 return new NodeOAuthClient({ 22 60 clientMetadata: buildClientMetadata(appUrl), 61 + requestLock, 23 62 stateStore: { 24 63 async get(key: string) { 25 64 return stateStore.get(key);
+1 -1
src/lib/crawl/crawler.ts
··· 115 115 console.error("Constellation fetch failed, skipping RSVPs:", err); 116 116 return new Map<string, Set<string>>(); 117 117 }), 118 - searchConferencePosts(signal), 118 + searchConferencePosts(agent, signal), 119 119 ]); 120 120 121 121 throwIfAborted(signal);
+74 -51
src/lib/crawl/search.ts
··· 1 - import type { AppBskyFeedDefs } from "@atproto/api"; 1 + import type { Agent, AppBskyFeedDefs } from "@atproto/api"; 2 2 3 3 type PostView = AppBskyFeedDefs.PostView; 4 - 5 - const APPVIEW_URL = 6 - "https://api.bsky.app/xrpc/app.bsky.feed.searchPosts"; 7 4 8 5 const SEARCH_QUERIES = [ 9 6 "atmosphereconf", ··· 21 18 const MAX_ATTEMPTS = 3; 22 19 const BASE_BACKOFF_MS = 200; 23 20 24 - interface SearchResponse { 21 + interface SearchPageResult { 25 22 posts: PostView[]; 26 - cursor?: string; 23 + cursor: string | undefined; 27 24 } 28 25 29 - function isRetryableStatus(status: number): boolean { 30 - return status === 429 || (status >= 500 && status < 600); 26 + interface SearchPageParams { 27 + q: string; 28 + sort: "latest"; 29 + since: string; 30 + until: string; 31 + limit: number; 32 + cursor?: string; 31 33 } 32 34 33 35 /** ··· 53 55 } 54 56 55 57 /** 56 - * Fetch a single search page with bounded retries on transient failures. 58 + * Inspect an arbitrary thrown value and decide whether to retry. 59 + * 60 + * Retryable: HTTP `429`, any `5xx`, and network errors (fetch / DNS / TCP 61 + * failures that surface without a status). Non-retryable: other HTTP 4xx 62 + * and anything with a non-retryable status shape. 63 + * 64 + * `@atproto/api` throws `XRPCError` with a numeric `status` on HTTP errors, 65 + * so that's what we look at first. We fall back to `error.status` on plain 66 + * objects for defensive parity. 67 + */ 68 + function isRetryableError(err: unknown): boolean { 69 + if (!err || typeof err !== "object") return true; // unknown — retry once 70 + const status = (err as { status?: number }).status; 71 + if (typeof status === "number") { 72 + return status === 429 || (status >= 500 && status < 600); 73 + } 74 + // No status → network / abort-ish error. Retry unless it's an AbortError 75 + // (abort handling is done by the caller; we just signal retryable=true). 76 + return true; 77 + } 78 + 79 + /** 80 + * Fetch a single search page via the user's authenticated agent with 81 + * bounded retries on transient failures. 82 + * 83 + * Why authenticated: the public Bluesky AppView (`public.api.bsky.app`) 84 + * returns `403 Forbidden` on paginated `searchPosts` requests to prevent 85 + * unauthenticated scraping — documented behavior per bluesky-social/atproto 86 + * issue #3583 and others. The alternate host `api.bsky.app` IP-blocks 87 + * Railway egress. Authenticated-through-PDS is the only reliable path. 57 88 * 58 - * Retries on HTTP 429, 5xx, and network errors with exponential backoff 59 - * (200ms → 400ms → 800ms) plus uniform jitter up to the same delay. 60 - * Non-retryable HTTP errors (4xx other than 429) and abort errors are 61 - * thrown immediately. The crawl has a 30s overall budget enforced upstream, 62 - * so retry counts and base delay are deliberately kept small. 89 + * The original PDS path was returning 500 because `@atproto/oauth-client-node` 90 + * was running without a `requestLock`, letting concurrent crawl operations 91 + * race on token refresh and get credentials revoked. That lock is now 92 + * installed in `src/lib/auth/client.ts`. 93 + * 94 + * Retries: 3 attempts, exponential backoff (200ms → 400ms → 800ms) with 95 + * uniform jitter, retry only on 429/5xx/network. Abort always propagates 96 + * immediately so a cancelled crawl never burns retries. Retry counts and 97 + * delays are deliberately small because the overall crawl has a 30s budget 98 + * enforced upstream in `src/app/api/crawl/route.ts`. 63 99 */ 64 100 async function fetchSearchPage( 65 - params: URLSearchParams, 101 + agent: Agent, 102 + params: SearchPageParams, 66 103 signal?: AbortSignal, 67 - ): Promise<SearchResponse> { 104 + ): Promise<SearchPageResult> { 68 105 let lastError: unknown; 69 106 70 107 for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { 71 108 if (signal?.aborted) throw signal.reason ?? new Error("Aborted"); 72 109 73 110 try { 74 - const res = await fetch(`${APPVIEW_URL}?${params.toString()}`, { 75 - signal, 76 - headers: { accept: "application/json" }, 77 - }); 78 - if (res.ok) { 79 - return (await res.json()) as SearchResponse; 80 - } 81 - if (!isRetryableStatus(res.status)) { 82 - throw new Error( 83 - `AppView searchPosts returned ${res.status} ${res.statusText}`, 84 - ); 85 - } 86 - lastError = new Error( 87 - `AppView searchPosts returned ${res.status} ${res.statusText}`, 88 - ); 111 + const res = await agent.app.bsky.feed.searchPosts(params, { signal }); 112 + return { posts: res.data.posts, cursor: res.data.cursor }; 89 113 } catch (err) { 90 114 // Abort always propagates immediately — never burn retries on a 91 115 // cancelled crawl. 92 116 if (signal?.aborted) throw err; 117 + if (!isRetryableError(err)) throw err; 93 118 lastError = err; 94 119 } 95 120 ··· 101 126 } 102 127 } 103 128 104 - throw lastError ?? new Error("AppView searchPosts failed after retries"); 129 + throw lastError ?? new Error("searchPosts failed after retries"); 105 130 } 106 131 107 132 /** 108 133 * Search Bluesky for conference-related posts during the conference period 109 - * and the post-conference aftermath. 110 - * 111 - * Calls the public AppView (`api.bsky.app`) directly via `fetch` instead of 112 - * routing through the user's PDS via `agent.app.bsky.feed.searchPosts`. The 113 - * search is a public read — there is no benefit to authenticating it, and 114 - * the OAuth/DPoP path through the PDS has been observed returning 5xx in 115 - * production while the public AppView returns 200 for the same query. 134 + * and the post-conference aftermath. Returns deduplicated posts from all 135 + * search queries. 116 136 */ 117 137 export async function searchConferencePosts( 138 + agent: Agent, 118 139 signal?: AbortSignal, 119 140 ): Promise<PostView[]> { 120 141 const seenUris = new Set<string>(); ··· 125 146 126 147 do { 127 148 if (signal?.aborted) throw signal.reason ?? new Error("Aborted"); 128 - const params = new URLSearchParams({ 129 - q: query, 130 - sort: "latest", 131 - since: SEARCH_SINCE, 132 - until: SEARCH_UNTIL, 133 - limit: "100", 134 - }); 135 - if (cursor) params.set("cursor", cursor); 136 - 137 149 try { 138 - const data = await fetchSearchPage(params, signal); 150 + const page = await fetchSearchPage( 151 + agent, 152 + { 153 + q: query, 154 + sort: "latest", 155 + since: SEARCH_SINCE, 156 + until: SEARCH_UNTIL, 157 + limit: 100, 158 + cursor, 159 + }, 160 + signal, 161 + ); 139 162 140 - for (const post of data.posts) { 163 + for (const post of page.posts) { 141 164 if (!seenUris.has(post.uri)) { 142 165 seenUris.add(post.uri); 143 166 posts.push(post); 144 167 } 145 168 } 146 169 147 - cursor = data.cursor; 170 + cursor = page.cursor; 148 171 } catch (error) { 149 172 // Propagate abort errors so the whole crawl cancels cleanly. 150 173 if (signal?.aborted) throw error;