A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Integrate rate limiting and retry logic into hydration services

- Add rate limiter to post and profile hydration (3k per 5min)
- Configure rate limiter: 3000 tokens, 10/100ms refill (600/min)
- Wrap API calls with retry logic (3 attempts, exponential backoff)
- Handle rate limit, network, and server errors gracefully
- All tests passing

Rate limits match Bluesky API: 3000 requests per 5 minutes per IP.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+78 -11
+30 -5
src/hydration/posts.service.ts
··· 2 2 import { Database } from "duckdb"; 3 3 import { PostsRepository } from "../database/posts.repository.js"; 4 4 import { BlobProcessor } from "../blobs/processor.js"; 5 + import { RateLimiter } from "../utils/rate-limit.js"; 6 + import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; 5 7 import { logger } from "../logger/index.js"; 6 8 import { config } from "../config/index.js"; 7 9 ··· 9 11 private agent: AtpAgent; 10 12 private postsRepo: PostsRepository; 11 13 private blobProcessor: BlobProcessor; 14 + private rateLimiter: RateLimiter; 12 15 13 16 constructor(db: Database) { 14 17 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 15 18 this.postsRepo = new PostsRepository(db); 16 19 this.blobProcessor = new BlobProcessor(db, this.agent); 20 + this.rateLimiter = new RateLimiter({ 21 + maxTokens: 3000, 22 + refillRate: 10, 23 + refillInterval: 100, 24 + }); 17 25 } 18 26 19 27 async initialize(): Promise<void> { ··· 45 53 46 54 const [did, collection, rkey] = uriParts; 47 55 48 - const response = await this.agent.com.atproto.repo.getRecord({ 49 - repo: did, 50 - collection, 51 - rkey, 52 - }); 56 + await this.rateLimiter.acquire(1); 57 + 58 + const response = await withRetry( 59 + async () => { 60 + return await this.agent.com.atproto.repo.getRecord({ 61 + repo: did, 62 + collection, 63 + rkey, 64 + }); 65 + }, 66 + { 67 + maxAttempts: 3, 68 + initialDelay: 1000, 69 + maxDelay: 10000, 70 + backoffMultiplier: 2, 71 + retryableErrors: [ 72 + isRateLimitError, 73 + isNetworkError, 74 + isServerError, 75 + ], 76 + } 77 + ); 53 78 54 79 if (!response.success || !response.data.value) { 55 80 logger.warn({ uri }, "Failed to fetch post record");
+48 -6
src/hydration/profiles.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { ProfilesRepository } from "../database/profiles.repository.js"; 4 + import { RateLimiter } from "../utils/rate-limit.js"; 5 + import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; 4 6 import { logger } from "../logger/index.js"; 5 7 import { config } from "../config/index.js"; 6 8 7 9 export class ProfileHydrationService { 8 10 private agent: AtpAgent; 9 11 private profilesRepo: ProfilesRepository; 12 + private rateLimiter: RateLimiter; 10 13 11 14 constructor(db: Database) { 12 15 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 13 16 this.profilesRepo = new ProfilesRepository(db); 17 + this.rateLimiter = new RateLimiter({ 18 + maxTokens: 3000, 19 + refillRate: 10, 20 + refillInterval: 100, 21 + }); 14 22 } 15 23 16 24 async initialize(): Promise<void> { ··· 34 42 return; 35 43 } 36 44 37 - const profileResponse = await this.agent.com.atproto.repo.getRecord({ 38 - repo: did, 39 - collection: "app.bsky.actor.profile", 40 - rkey: "self", 41 - }); 45 + await this.rateLimiter.acquire(1); 46 + 47 + const profileResponse = await withRetry( 48 + async () => { 49 + return await this.agent.com.atproto.repo.getRecord({ 50 + repo: did, 51 + collection: "app.bsky.actor.profile", 52 + rkey: "self", 53 + }); 54 + }, 55 + { 56 + maxAttempts: 3, 57 + initialDelay: 1000, 58 + maxDelay: 10000, 59 + backoffMultiplier: 2, 60 + retryableErrors: [ 61 + isRateLimitError, 62 + isNetworkError, 63 + isServerError, 64 + ], 65 + } 66 + ); 42 67 43 68 let displayName: string | undefined; 44 69 let description: string | undefined; ··· 49 74 description = record.description; 50 75 } 51 76 52 - const profileLookup = await this.agent.getProfile({ actor: did }); 77 + await this.rateLimiter.acquire(1); 78 + 79 + const profileLookup = await withRetry( 80 + async () => { 81 + return await this.agent.getProfile({ actor: did }); 82 + }, 83 + { 84 + maxAttempts: 3, 85 + initialDelay: 1000, 86 + maxDelay: 10000, 87 + backoffMultiplier: 2, 88 + retryableErrors: [ 89 + isRateLimitError, 90 + isNetworkError, 91 + isServerError, 92 + ], 93 + } 94 + ); 53 95 54 96 let handle: string | undefined; 55 97 if (profileLookup.success) {