Import your Last.fm and Spotify listening history to the AT Protocol network using the fm.teal.alpha.feed.play lexicon.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

better window-based rate-limiting

karitham a2afa6f7 379e3624

+178 -128
+141 -96
src/lib/atproto/rate-limited-agent.ts
··· 10 10 RateLimitStatus, 11 11 } from './types.js'; 12 12 13 + export interface RateLimitContext { 14 + operation: string; // e.g., 'listRecords', 'applyWrites' 15 + pointsCost: number; // The calculated cost of this specific operation 16 + } 17 + 13 18 export interface RateLimitHooks { 14 - onWaitForReset?: ( 19 + /** 20 + * Triggered when the agent pauses execution to respect limits. 21 + * - 'token_bucket_drain': Normal behavior. We are smoothing out traffic. 22 + * - 'daily_quota': Critical. We hit the hard daily limit and must wait significantly. 23 + */ 24 + onThrottle?: ( 25 + context: RateLimitContext, 26 + throttle: { waitTimeMs: number; reason: 'daily_quota' | 'token_bucket_drain' }, 15 27 status: RateLimitStatus, 16 - waitTimeMs: number, 17 - reason: 'daily' | 'hourly', 28 + ) => void; 29 + 30 + /** 31 + * Triggered when a request fails with a 429/RateLimit error and enters backoff. 32 + * Provides the attempt number and the calculated backoff duration. 33 + */ 34 + onRetry?: ( 35 + context: RateLimitContext, 36 + retry: { attempt: number; backoffMs: number; error: unknown }, 37 + status: RateLimitStatus, 18 38 ) => void; 19 - onRateLimitBackoff?: (status: RateLimitStatus, backoffMs: number) => void; 20 - onRateLimitError?: (status: RateLimitStatus, error: unknown) => void; 39 + 40 + /** 41 + * Triggered after a successful operation. 42 + * Useful for monitoring throughput, latency, and total consumption. 43 + */ 44 + onSuccess?: ( 45 + context: RateLimitContext, 46 + metrics: { durationMs: number; totalPointsUsed: number }, 47 + status: RateLimitStatus, 48 + ) => void; 21 49 } 22 50 23 51 export class RateLimitedAgent implements AtprotoAgentInterface { ··· 26 54 private signal?: AbortSignal; 27 55 private hooks?: RateLimitHooks; 28 56 57 + // Daily Hard Limit (Fixed Window) 29 58 private dailyPointsUsed: number = 0; 30 - private hourlyPointsUsed: number = 0; 31 59 private dailyResetTime: Date; 32 - private hourlyResetTime: Date; 60 + private readonly maxPointsPerDay: number = 30000; 61 + 62 + // Hourly/Short-term Limit (Token Bucket) 63 + private tokenBalance: number; 64 + private lastRefillTime: number; 65 + private refillRatePerMs: number; 66 + private maxTokens: number; 33 67 34 68 private consecutiveFailures: number = 0; 35 69 private readonly baseDelayMs: number = 1000; ··· 41 75 this.hooks = options; 42 76 43 77 this.config = { 44 - pointsPerDay: options?.pointsPerDay ?? 30000, 45 78 safetyMargin: options?.safetyMargin ?? 0.75, 46 - maxPointsPerMinute: options?.maxPointsPerMinute ?? 20, 47 79 pointsPerRead: options?.pointsPerRead ?? 1, 48 80 pointsPerWrite: options?.pointsPerWrite ?? 3, 49 81 pointsPerDelete: options?.pointsPerDelete ?? 1, ··· 51 83 52 84 const now = new Date(); 53 85 this.dailyResetTime = this.getNextDailyReset(now); 54 - this.hourlyResetTime = this.getNextHourlyReset(now); 86 + 87 + // Initialize Token Bucket 88 + this.maxTokens = Math.floor((this.maxPointsPerDay * this.config.safetyMargin) / 24); 89 + this.refillRatePerMs = this.maxTokens / 3600000; 90 + this.tokenBalance = this.maxTokens; 91 + this.lastRefillTime = now.getTime(); 55 92 56 93 if (this.signal) { 57 94 this.signal.addEventListener('abort', () => { ··· 69 106 return reset; 70 107 } 71 108 72 - private getNextHourlyReset(date: Date): Date { 73 - const reset = new Date(date); 74 - reset.setMinutes(0, 0, 0); 75 - if (reset <= date) { 76 - reset.setHours(reset.getHours() + 1); 77 - } 78 - return reset; 79 - } 80 - 81 109 private get usableDailyPoints(): number { 82 - return Math.floor(this.config.pointsPerDay * this.config.safetyMargin); 83 - } 84 - 85 - private get usableHourlyPoints(): number { 86 - return Math.floor((this.config.pointsPerDay * this.config.safetyMargin) / 24); 110 + return Math.floor(this.maxPointsPerDay * this.config.safetyMargin); 87 111 } 88 112 89 113 private get dailyPointsRemaining(): number { 90 - return this.usableDailyPoints - this.dailyPointsUsed; 114 + return Math.max(0, this.usableDailyPoints - this.dailyPointsUsed); 91 115 } 92 116 93 - private get hourlyPointsRemaining(): number { 94 - return this.usableHourlyPoints - this.hourlyPointsUsed; 117 + private refillTokens(): void { 118 + const now = Date.now(); 119 + const elapsedMs = now - this.lastRefillTime; 120 + 121 + if (elapsedMs > 0) { 122 + const newTokens = elapsedMs * this.refillRatePerMs; 123 + this.tokenBalance = Math.min(this.maxTokens, this.tokenBalance + newTokens); 124 + this.lastRefillTime = now; 125 + } 95 126 } 96 127 97 - private checkAndResetTimers(): void { 128 + private checkAndResetDailyTimer(): void { 98 129 const now = new Date(); 99 - 100 130 if (now >= this.dailyResetTime) { 101 131 this.dailyPointsUsed = 0; 102 132 this.dailyResetTime = this.getNextDailyReset(now); 103 133 } 104 - 105 - if (now >= this.hourlyResetTime) { 106 - this.hourlyPointsUsed = 0; 107 - this.hourlyResetTime = this.getNextHourlyReset(now); 108 - } 109 134 } 110 135 111 136 private checkCancelled(): void { ··· 115 140 } 116 141 117 142 private getCurrentStatus(): RateLimitStatus { 118 - this.checkAndResetTimers(); 143 + this.checkAndResetDailyTimer(); 144 + this.refillTokens(); 145 + 146 + const msToFull = (this.maxTokens - this.tokenBalance) / this.refillRatePerMs; 147 + const estimatedHourlyReset = new Date(Date.now() + msToFull); 148 + 119 149 return { 120 - dailyPointsRemaining: Math.max(0, this.dailyPointsRemaining), 121 - hourlyPointsRemaining: Math.max(0, this.hourlyPointsRemaining), 150 + dailyPointsRemaining: this.dailyPointsRemaining, 151 + hourlyPointsRemaining: Math.floor(this.tokenBalance), 122 152 dailyResetAt: this.dailyResetTime, 123 - hourlyResetAt: this.hourlyResetTime, 153 + hourlyResetAt: estimatedHourlyReset, 124 154 }; 125 155 } 126 156 127 - private async waitForReset(targetPoints: number): Promise<void> { 157 + private async waitForPoints(context: RateLimitContext): Promise<void> { 128 158 this.checkCancelled(); 159 + this.checkAndResetDailyTimer(); 160 + this.refillTokens(); 129 161 130 - const status = this.getCurrentStatus(); 131 - const now = new Date(); 162 + const now = Date.now(); 163 + const required = context.pointsCost; 164 + let waitTimeMs = 0; 165 + let reason: 'daily_quota' | 'token_bucket_drain' | null = null; 166 + 167 + if (this.dailyPointsRemaining < required) { 168 + waitTimeMs = this.dailyResetTime.getTime() - now; 169 + reason = 'daily_quota'; 170 + } else if (this.tokenBalance < required) { 171 + const deficit = required - this.tokenBalance; 172 + waitTimeMs = deficit / this.refillRatePerMs; 173 + reason = 'token_bucket_drain'; 174 + } 132 175 133 - let resetTime: Date; 134 - let reason: 'daily' | 'hourly'; 176 + if (waitTimeMs > 0 && reason) { 177 + // Buffer for rounding errors 178 + waitTimeMs = Math.ceil(waitTimeMs) + 50; 135 179 136 - if (status.dailyPointsRemaining < targetPoints && status.hourlyPointsRemaining < targetPoints) { 137 - resetTime = 138 - status.dailyResetAt < status.hourlyResetAt ? status.dailyResetAt : status.hourlyResetAt; 139 - reason = status.dailyResetAt < status.hourlyResetAt ? 'daily' : 'hourly'; 140 - } else if (status.dailyPointsRemaining < targetPoints) { 141 - resetTime = status.dailyResetAt; 142 - reason = 'daily'; 143 - } else { 144 - resetTime = status.hourlyResetAt; 145 - reason = 'hourly'; 146 - } 180 + this.hooks?.onThrottle?.(context, { waitTimeMs, reason }, this.getCurrentStatus()); 147 181 148 - const waitTime = Math.max(0, resetTime.getTime() - now.getTime()); 149 - if (waitTime > 0) { 150 - this.hooks?.onWaitForReset?.(status, waitTime, reason); 151 182 await new Promise((resolve, reject) => { 152 - const timeout = setTimeout(resolve, waitTime); 183 + const timeout = setTimeout(resolve, waitTimeMs); 153 184 this.signal?.addEventListener('abort', () => { 154 185 clearTimeout(timeout); 155 186 reject(new CancellationError('Operation cancelled')); 156 187 }); 157 188 }); 189 + 190 + this.refillTokens(); 158 191 } 159 - 160 - this.checkAndResetTimers(); 161 192 } 162 193 163 194 private calculateBackoff(): number { ··· 172 203 private extractPointsFromError( 173 204 error: unknown, 174 205 ): { dailyConsumed: number; hourlyConsumed: number } | null { 175 - const err = error as { 176 - headers?: Record<string, string>; 177 - status?: number; 178 - message?: string; 179 - }; 206 + const err = error as { headers?: Record<string, string> }; 180 207 const headers = err.headers; 181 208 182 - if (!headers) { 183 - return null; 184 - } 209 + if (!headers) return null; 185 210 186 211 const remainingHeader = headers['ratelimit-remaining'] ?? headers['RateLimit-Remaining']; 187 212 const limitHeader = headers['ratelimit-limit'] ?? headers['RateLimit-Limit']; 188 213 214 + if (remainingHeader) { 215 + const remaining = parseInt(remainingHeader, 10); 216 + // Sync local tokens to server remaining if server is lower 217 + if (!Number.isNaN(remaining) && remaining < this.tokenBalance) { 218 + this.tokenBalance = remaining; 219 + } 220 + } 221 + 189 222 if (remainingHeader && limitHeader) { 190 223 const remaining = parseInt(remainingHeader, 10); 191 224 const limit = parseInt(limitHeader, 10); 192 225 const consumed = limit - remaining; 193 226 return { dailyConsumed: consumed, hourlyConsumed: consumed }; 194 227 } 195 - 196 228 return null; 197 229 } 198 230 ··· 210 242 ); 211 243 } 212 244 213 - private async executeWithRateLimit<T>(operation: () => Promise<T>, points: number): Promise<T> { 245 + private async executeWithRateLimit<T>( 246 + operationName: string, 247 + operation: () => Promise<T>, 248 + points: number, 249 + ): Promise<T> { 250 + const context: RateLimitContext = { 251 + operation: operationName, 252 + pointsCost: points, 253 + }; 254 + const startTime = Date.now(); 255 + 214 256 while (true) { 215 257 this.checkCancelled(); 216 - const status = this.getCurrentStatus(); 217 - 218 - if (status.dailyPointsRemaining < points) { 219 - await this.waitForReset(points); 220 - continue; 221 - } 222 - 223 - if (status.hourlyPointsRemaining < points) { 224 - await this.waitForReset(points); 225 - continue; 226 - } 258 + await this.waitForPoints(context); 227 259 228 260 try { 229 261 const result = await operation(); 262 + 230 263 this.dailyPointsUsed += points; 231 - this.hourlyPointsUsed += points; 264 + this.tokenBalance -= points; 232 265 this.resetBackoff(); 266 + 267 + this.hooks?.onSuccess?.( 268 + context, 269 + { durationMs: Date.now() - startTime, totalPointsUsed: this.dailyPointsUsed }, 270 + this.getCurrentStatus(), 271 + ); 272 + 233 273 return result; 234 274 } catch (error) { 235 - if (error instanceof CancellationError) { 236 - throw error; 237 - } 275 + if (error instanceof CancellationError) throw error; 238 276 239 277 if (this.isRateLimitError(error)) { 240 - const statusBeforeError = this.getCurrentStatus(); 241 - this.hooks?.onRateLimitError?.(statusBeforeError, error); 242 - 243 - const consumedPoints = this.extractPointsFromError(error); 244 - if (consumedPoints !== null) { 245 - this.dailyPointsUsed = Math.max(this.dailyPointsUsed, consumedPoints.dailyConsumed); 246 - this.hourlyPointsUsed = Math.max(this.hourlyPointsUsed, consumedPoints.hourlyConsumed); 278 + // Sync state if headers are present 279 + const syncData = this.extractPointsFromError(error); 280 + if (syncData) { 281 + this.dailyPointsUsed = Math.max(this.dailyPointsUsed, syncData.dailyConsumed); 282 + this.tokenBalance = 0; // Assume drained if 429 received 247 283 } 248 284 249 285 this.consecutiveFailures++; 250 286 const backoff = this.calculateBackoff(); 251 - const statusAfterUpdate = this.getCurrentStatus(); 252 - this.hooks?.onRateLimitBackoff?.(statusAfterUpdate, backoff); 287 + 288 + this.hooks?.onRetry?.( 289 + context, 290 + { attempt: this.consecutiveFailures, backoffMs: backoff, error }, 291 + this.getCurrentStatus(), 292 + ); 253 293 254 294 await new Promise((resolve, reject) => { 255 295 const timeout = setTimeout(resolve, backoff); ··· 258 298 reject(new CancellationError('Operation cancelled')); 259 299 }); 260 300 }); 261 - this.checkAndResetTimers(); 262 301 continue; 263 302 } 264 303 ··· 269 308 270 309 async listRecords(params: ListRecordsParams): Promise<ListRecordsResponse> { 271 310 return this.executeWithRateLimit( 311 + 'listRecords', 272 312 () => this.delegate.listRecords(params), 273 313 this.config.pointsPerRead, 274 314 ); ··· 278 318 const writeCount = params.writes.length; 279 319 const points = writeCount * this.config.pointsPerWrite; 280 320 281 - return this.executeWithRateLimit(() => this.delegate.applyWrites(params), points); 321 + return this.executeWithRateLimit( 322 + 'applyWrites', 323 + () => this.delegate.applyWrites(params), 324 + points, 325 + ); 282 326 } 283 327 284 328 async deleteRecord(params: DeleteRecordParams): Promise<DeleteRecordResponse> { 285 329 return this.executeWithRateLimit( 330 + 'deleteRecord', 286 331 () => this.delegate.deleteRecord(params), 287 332 this.config.pointsPerDelete, 288 333 );
-2
src/lib/atproto/types.ts
··· 63 63 } 64 64 65 65 export interface RateLimitOptions { 66 - pointsPerDay?: number; 67 66 safetyMargin?: number; 68 - maxPointsPerMinute?: number; 69 67 pointsPerRead?: number; 70 68 pointsPerWrite?: number; 71 69 pointsPerDelete?: number;
+18 -16
src/lib/cli.ts
··· 202 202 return normalized as 'lastfm' | 'spotify' | 'combined' | 'sync' | 'deduplicate'; 203 203 } 204 204 205 + const rateLimitHooks: RateLimitHooks = { 206 + onThrottle: (_context, throttle, _status) => { 207 + log.info( 208 + `Rate limit (${throttle.reason}), waiting ${formatDuration(throttle.waitTimeMs)} for reset`, 209 + ); 210 + }, 211 + onRetry: (_context, retry, _status) => { 212 + log.debug( 213 + `Rate limited by server, attempt ${retry.attempt}, backing off ${formatDuration(retry.backoffMs)}`, 214 + ); 215 + }, 216 + onSuccess: (_context, metrics, _status) => { 217 + log.debug( 218 + `Operation completed in ${metrics.durationMs}ms, total points: ${metrics.totalPointsUsed}`, 219 + ); 220 + }, 221 + }; 222 + 205 223 /** 206 224 * The full, real implementation of the CLI 207 225 */ ··· 270 288 throw new Error('Deduplicate mode requires --handle and --password'); 271 289 } 272 290 log.section('Remove Duplicate Records'); 273 - const rateLimitHooks: RateLimitHooks = { 274 - onWaitForReset: (_status, waitTimeMs, reason) => { 275 - log.info(`Rate limit (${reason}), waiting ${formatDuration(waitTimeMs)} for reset`); 276 - }, 277 - onRateLimitBackoff: (_status, backoffMs) => { 278 - log.debug(`Rate limited by server, backing off ${formatDuration(backoffMs)}`); 279 - }, 280 - }; 281 291 agent = new RateLimitedAgent( 282 292 await login({ identifier: args.handle, password: args.password }), 283 293 { ··· 314 324 throw new Error('Missing required arguments: --handle and --password'); 315 325 } 316 326 log.debug('Authenticating...'); 317 - const rateLimitHooks: RateLimitHooks = { 318 - onWaitForReset: (_status, waitTimeMs, reason) => { 319 - log.info(`Rate limit (${reason}), waiting ${formatDuration(waitTimeMs)} for reset`); 320 - }, 321 - onRateLimitBackoff: (_status, backoffMs) => { 322 - log.debug(`Rate limited by server, backing off ${formatDuration(backoffMs)}`); 323 - }, 324 - }; 325 327 agent = new RateLimitedAgent( 326 328 await login({ identifier: args.handle, password: args.password }), 327 329 {
+19 -14
src/tests/atproto/rate-limited-agent.test.ts
··· 8 8 it('should track daily and hourly points separately', () => { 9 9 const mock = new MockAgent({ did: 'did:test:123' }); 10 10 const agent = new RateLimitedAgent(mock, { 11 - pointsPerDay: 100, 12 11 safetyMargin: 1.0, 13 12 pointsPerWrite: 3, 14 13 pointsPerRead: 1, ··· 16 15 }); 17 16 18 17 const status = agent.getRateLimitStatus(); 19 - assert.strictEqual(status.dailyPointsRemaining, 100); 20 - assert.strictEqual(status.hourlyPointsRemaining, 4); 18 + assert.strictEqual(status.dailyPointsRemaining, 30000); 19 + assert.strictEqual(status.hourlyPointsRemaining, 1250); 21 20 }); 22 21 23 22 it('should consume points on writes', async () => { 24 23 const mock = new MockAgent({ did: 'did:test:123' }); 25 24 const agent = new RateLimitedAgent(mock, { 26 - pointsPerDay: 10000, 27 25 safetyMargin: 1.0, 28 26 pointsPerWrite: 3, 29 27 pointsPerRead: 1, ··· 39 37 }); 40 38 41 39 const status = agent.getRateLimitStatus(); 42 - assert.strictEqual(status.dailyPointsRemaining, 9991); 40 + assert.strictEqual(status.dailyPointsRemaining, 29991); 43 41 }); 44 42 45 43 it('should consume 1 point per read', async () => { 46 44 const mock = new MockAgent({ did: 'did:test:123' }); 47 45 const agent = new RateLimitedAgent(mock, { 48 - pointsPerDay: 10000, 49 46 safetyMargin: 1.0, 50 47 pointsPerWrite: 3, 51 48 pointsPerRead: 1, ··· 55 52 await agent.listRecords({ collection: 'test' }); 56 53 57 54 const status = agent.getRateLimitStatus(); 58 - assert.strictEqual(status.dailyPointsRemaining, 9999); 55 + assert.strictEqual(status.dailyPointsRemaining, 29999); 59 56 }); 60 57 61 58 it('should consume 1 point per delete', async () => { 62 59 const mock = new MockAgent({ did: 'did:test:123' }); 63 60 const agent = new RateLimitedAgent(mock, { 64 - pointsPerDay: 10000, 65 61 safetyMargin: 1.0, 66 62 pointsPerWrite: 3, 67 63 pointsPerRead: 1, ··· 71 67 await agent.deleteRecord({ collection: 'test', rkey: 'abc123' }); 72 68 73 69 const status = agent.getRateLimitStatus(); 74 - assert.strictEqual(status.dailyPointsRemaining, 9999); 70 + assert.strictEqual(status.dailyPointsRemaining, 29999); 75 71 }); 76 72 }); 77 73 ··· 79 75 it('should reset daily points at midnight', () => { 80 76 const mock = new MockAgent({ did: 'did:test:123' }); 81 77 const agent = new RateLimitedAgent(mock, { 82 - pointsPerDay: 100, 83 78 safetyMargin: 1.0, 84 79 pointsPerWrite: 3, 85 80 }); ··· 88 83 assert.ok(status.dailyResetAt > new Date()); 89 84 }); 90 85 91 - it('should reset hourly points at the next hour', () => { 86 + it('should reset hourly points at the next hour', async () => { 92 87 const mock = new MockAgent({ did: 'did:test:123' }); 93 88 const agent = new RateLimitedAgent(mock, { 94 - pointsPerDay: 100, 95 89 safetyMargin: 1.0, 96 90 pointsPerWrite: 3, 97 91 }); 92 + 93 + await agent.listRecords({ collection: 'test' }); 98 94 99 95 const status = agent.getRateLimitStatus(); 100 96 assert.ok(status.hourlyResetAt > new Date()); ··· 169 165 const mock = new MockAgent({ did: 'did:test:123', delay: 100 }); 170 166 const abortController = new AbortController(); 171 167 const agent = new RateLimitedAgent(mock, { 172 - pointsPerDay: 1, 173 168 safetyMargin: 1.0, 174 169 signal: abortController.signal, 175 170 }); ··· 177 172 setTimeout(() => abortController.abort(), 10); 178 173 179 174 await assert.rejects( 180 - async () => agent.listRecords({ collection: 'test' }), 175 + async () => 176 + agent.applyWrites({ 177 + writes: Array(500) 178 + .fill(null) 179 + .map((_, i) => ({ 180 + $type: 'create', 181 + collection: 'test', 182 + rkey: String(i), 183 + value: {}, 184 + })), 185 + }), 181 186 CancellationError, 182 187 ); 183 188 });