Malachite is a tool to import your Last.fm and Spotify listening history to the AT Protocol network using the fm.teal.alpha.feed.play lexicon.
malachite scrobbles importer atproto music
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: correct 401 and 429 shapes

+158 -28
+35 -2
src/core/car-fetch.ts
··· 90 90 91 91 // ─── public API ────────────────────────────────────────────────────────────── 92 92 93 + /** 94 + * Thrown when the PDS returns 401 on com.atproto.sync.getRepo. 95 + * Callers can catch this specifically to surface a re-auth prompt rather than 96 + * treating it as a generic network error. 97 + */ 98 + export class CARFetchUnauthorizedError extends Error { 99 + constructor(pdsUrl: string, did: string) { 100 + super( 101 + `CAR fetch returned 401 Unauthorized for ${did} at ${pdsUrl}. ` + 102 + `The PDS requires authentication but a valid token could not be obtained. ` + 103 + `Try signing out and back in to refresh your session.` 104 + ); 105 + this.name = 'CARFetchUnauthorizedError'; 106 + } 107 + } 108 + 93 109 export interface CARRecord { 94 110 rkey: string; 95 111 uri: string; ··· 120 136 const response = await fetch(url, { headers, signal }); 121 137 122 138 if (!response.ok) { 139 + if (response.status === 401) { 140 + throw new CARFetchUnauthorizedError(pdsUrl, did); 141 + } 123 142 throw new Error(`CAR fetch failed: ${response.status} ${response.statusText}`); 124 143 } 125 144 ··· 180 199 export async function getAgentToken(agent: unknown): Promise<string | undefined> { 181 200 const a = agent as Record<string, unknown>; 182 201 183 - // Password-auth AtpAgent: session carries a plain JWT. 202 + // Password-auth CredentialSession (AtpAgent): 203 + // session.accessJwt holds the current JWT. It may be expired — callers 204 + // should handle CARFetchUnauthorizedError and retry after refreshing. 184 205 const jwt = (a['session'] as any)?.accessJwt; 185 206 if (jwt) return jwt as string; 186 207 ··· 191 212 const tokens = await sm.getTokens() as { accessToken?: string } | null; 192 213 if (tokens?.accessToken) return tokens.accessToken; 193 214 } catch { 194 - // If the OAuth token is expired and can't be refreshed silently, fall through. 215 + // Token read failed — try a silent refresh before giving up. 216 + } 217 + 218 + // If getTokens() returned nothing (expired session), attempt a silent 219 + // refresh via the session manager and retry once. 220 + if (typeof sm?.refresh === 'function') { 221 + try { 222 + await sm.refresh(); 223 + const refreshed = await sm.getTokens() as { accessToken?: string } | null; 224 + if (refreshed?.accessToken) return refreshed.accessToken; 225 + } catch { 226 + // Refresh failed — fall through and return undefined. 227 + } 195 228 } 196 229 } 197 230
+66 -7
src/core/sync.ts
··· 7 7 import type { Agent } from '@atproto/api'; 8 8 import type { PlayRecord } from './types.js'; 9 9 import { RECORD_TYPE } from './config.js'; 10 - import { fetchRepoViaCAR, getPdsUrlFromAgent, getAgentToken } from './car-fetch.js'; 10 + import { fetchRepoViaCAR, getPdsUrlFromAgent, getAgentToken, CARFetchUnauthorizedError } from './car-fetch.js'; 11 11 12 12 export interface ExistingRecord { 13 13 uri: string; ··· 44 44 signal?.throwIfAborted(); 45 45 46 46 const pdsUrl = getPdsUrlFromAgent(agent); 47 - const token = await getAgentToken(agent); 48 - const carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, signal, token); 47 + let token = await getAgentToken(agent); 48 + let carRecords; 49 + try { 50 + carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, signal, token); 51 + } catch (err) { 52 + if (err instanceof CARFetchUnauthorizedError) { 53 + // The token we sent was invalid or expired. Try to silently refresh the 54 + // session (works for both CredentialSession / AtpAgent and OAuth agents 55 + // that expose a refreshSession method on their session manager) then 56 + // retry the CAR fetch exactly once before giving up. 57 + const sm = (agent as any)?.sessionManager; 58 + let retried = false; 59 + if (typeof sm?.refreshSession === 'function') { 60 + try { 61 + await sm.refreshSession(); 62 + const freshToken = await getAgentToken(agent); 63 + if (freshToken && freshToken !== token) { 64 + carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, signal, freshToken); 65 + token = freshToken; 66 + retried = true; 67 + } 68 + } catch { 69 + // Refresh or second fetch failed — fall through and throw below. 70 + } 71 + } 72 + if (!retried) { 73 + // Clear the stale session cache so the next call starts clean. 74 + sessionCache.delete(did); 75 + throw err; 76 + } 77 + } else { 78 + throw err; 79 + } 80 + } 49 81 50 82 const map = new Map<string, ExistingRecord>(); 51 - for (const rec of carRecords) { 83 + for (const rec of carRecords!) { 52 84 const value = rec.value as unknown as PlayRecord; 53 85 map.set(recordKey(value), { uri: rec.uri, cid: rec.cid, value }); 54 86 } ··· 76 108 signal?.throwIfAborted(); 77 109 78 110 const pdsUrl = getPdsUrlFromAgent(agent); 79 - const token = await getAgentToken(agent); 80 - const carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, signal, token); 111 + let token = await getAgentToken(agent); 112 + let carRecords; 113 + try { 114 + carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, signal, token); 115 + } catch (err) { 116 + if (err instanceof CARFetchUnauthorizedError) { 117 + const sm = (agent as any)?.sessionManager; 118 + let retried = false; 119 + if (typeof sm?.refreshSession === 'function') { 120 + try { 121 + await sm.refreshSession(); 122 + const freshToken = await getAgentToken(agent); 123 + if (freshToken && freshToken !== token) { 124 + carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, signal, freshToken); 125 + token = freshToken; 126 + retried = true; 127 + } 128 + } catch { 129 + // fall through 130 + } 131 + } 132 + if (!retried) { 133 + sessionCache.delete(did); 134 + throw err; 135 + } 136 + } else { 137 + throw err; 138 + } 139 + } 81 140 82 - const all: ExistingRecord[] = carRecords.map((rec) => ({ 141 + const all: ExistingRecord[] = carRecords!.map((rec) => ({ 83 142 uri: rec.uri, 84 143 cid: rec.cid, 85 144 value: rec.value as unknown as PlayRecord,
+18 -19
src/lib/publisher.ts
··· 320 320 const rateLimitError = isRateLimitError(err); 321 321 322 322 if (rateLimitError) { 323 - log.warn('⚠️ Rate limit hit (unexpected with proactive pacing) - updating from error headers...'); 324 - 325 - // Extract and update from error headers 326 - let headers: Record<string, string> | undefined; 327 - if (err?.response?.headers) { 328 - headers = err.response.headers; 329 - } else if (err?.headers) { 330 - headers = err.headers; 331 - } 332 - 333 - if (headers && Object.keys(headers).length > 0) { 334 - const normalized = normalizeHeaders(headers); 335 - const hasRateLimitHeaders = Object.keys(normalized).some(k => k.includes('ratelimit')); 336 - if (hasRateLimitHeaders) { 337 - rl.updateFromHeaders(normalized); 338 - } 339 - } 340 - 341 - // Wait for permit and retry 323 + log.warn('⚠️ Rate limit hit — pausing until quota resets…'); 324 + 325 + // XRPCError (from @atproto/xrpc) carries response headers directly on 326 + // err.headers as a plain Record<string, string> built via 327 + // Object.fromEntries(response.headers.entries()), so keys are already 328 + // lowercase. There is no err.response property on this error class. 329 + const errHeaders: Record<string, string> | undefined = 330 + err?.headers && typeof err.headers === 'object' 331 + ? (err.headers as Record<string, string>) 332 + : undefined; 333 + 334 + // handleRateLimitHit zeroes remaining unconditionally — this is the 335 + // critical fix. Previously we only called updateFromHeaders when 336 + // headers were present, meaning a headerless 429 left state untouched 337 + // and waitForPermit returned immediately, sending another request. 338 + rl.handleRateLimitHit(errHeaders ? normalizeHeaders(errHeaders) : undefined); 339 + 340 + // Now waitForPermit will block until the window resets. 342 341 await rl.waitForPermit(batchPoints); 343 342 continue; 344 343
+39
src/utils/rate-limiter.ts
··· 686 686 } 687 687 688 688 /** 689 + * Called when the server returns a 429. 690 + * 691 + * Zeroes `remaining` unconditionally so the next `waitForPermit` call 692 + * actually blocks until the window resets, regardless of whether the 429 693 + * response included rate-limit headers. If headers ARE present they are 694 + * applied first (so we get an accurate `resetAt`), then remaining is 695 + * forced to 0. 696 + * 697 + * @param errHeaders Optional normalised headers from the 429 error response. 698 + */ 699 + handleRateLimitHit(errHeaders?: Record<string, string>): void { 700 + // Apply header info first so resetAt is as accurate as possible. 701 + if (errHeaders && Object.keys(errHeaders).length > 0) { 702 + this.updateFromHeaders(errHeaders); 703 + } 704 + 705 + const now = Math.floor(Date.now() / 1000); 706 + const state = this.readState(); 707 + if (state) { 708 + state.remaining = 0; 709 + state.updatedAt = now; 710 + this.writeState(state); 711 + log.warn(`[RateLimiter] 🛑 429 received — zeroed remaining, will wait until ${new Date(state.resetAt * 1000).toISOString()}`); 712 + } else { 713 + // No state yet — create a blocking stub with a 60-second reset. 714 + this.writeState({ 715 + limit: 5000, 716 + remaining: 0, 717 + resetAt: now + 60, 718 + windowSeconds: 3600, 719 + updatedAt: now, 720 + headroomThreshold: this.headroomThreshold, 721 + }); 722 + this.hasLearnedFromServer = true; 723 + log.warn('[RateLimiter] 🛑 429 received (no prior state) — blocking for 60s'); 724 + } 725 + } 726 + 727 + /** 689 728 * Wait for a permit with the given number of points. 690 729 * Combines reserveQuota and waitForReset - loops until permit granted. 691 730 *