Malachite is a tool to import your Last.fm and Spotify listening history to the AT Protocol network using the fm.teal.alpha.feed.play lexicon.
malachite scrobbles importer atproto music
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: improve rate limiter

+455 -281
+11 -1
README.md
··· 10 10 **CRITICAL**: Bluesky's AppView has rate limits on PDS instances. Exceeding 10K records per day can rate limit your **ENTIRE PDS**, affecting all users on your instance. 11 11 12 12 This importer automatically protects your PDS by: 13 - - Limiting imports to **1,000 records per day** (with 75% safety margin) 13 + - Limiting imports to **7,500 records per day** (with 75% safety margin) 14 14 - Calculating optimal batch sizes and delays 15 15 - Pausing 24 hours between days for large imports 16 16 - Providing clear progress tracking and time estimates 17 + - Persisting state across restarts for safe resume 18 + 19 + ### 📚 Rate Limiting Documentation 20 + 21 + Malachite has comprehensive rate limiting protection built in. 22 + 23 + **New**: Monitor your rate limit status anytime: 24 + ```bash 25 + npm run check-limits 26 + ``` 17 27 18 28 For more details, see the [Bluesky Rate Limits Documentation](https://docs.bsky.app/blog/rate-limits-pds-v3). 19 29
+3 -1
package.json
··· 17 17 "type-check": "tsc --noEmit", 18 18 "test": "npm run build && node --test dist/tests/**/*.test.js", 19 19 "test:tid": "npm run build && node --test dist/tests/tid.test.js", 20 - "test:watch": "npm run build && node --watch --test dist/tests/**/*.test.js" 20 + "test:watch": "npm run build && node --watch --test dist/tests/**/*.test.js", 21 + "monitor:rate-limit": "node scripts/rate-limit-monitor.js", 22 + "check-limits": "node scripts/rate-limit-monitor.js" 21 23 }, 22 24 "keywords": [ 23 25 "lastfm",
+208
scripts/rate-limit-monitor.js
··· 1 + #!/usr/bin/env node 2 + 3 + /** 4 + * Rate Limit Monitor - Check current rate limiting status 5 + * 6 + * Usage: 7 + * node scripts/rate-limit-monitor.js 8 + * npm run monitor:rate-limit 9 + */ 10 + 11 + import fs from 'node:fs'; 12 + import path from 'node:path'; 13 + import { fileURLToPath } from 'node:url'; 14 + 15 + const __filename = fileURLToPath(import.meta.url); 16 + const __dirname = path.dirname(__filename); 17 + 18 + // Get state directory (matching platform.ts logic) 19 + function getMalachiteStateDir() { 20 + if (process.env.MALACHITE_STATE_DIR) { 21 + return process.env.MALACHITE_STATE_DIR; 22 + } 23 + const home = process.env.HOME || process.env.USERPROFILE || ''; 24 + return path.join(home, '.malachite'); 25 + } 26 + 27 + const stateDir = path.join(getMalachiteStateDir(), 'state'); 28 + const rateLimitFile = path.join(stateDir, 'rate-limit.json'); 29 + const importStateFile = path.join(stateDir, 'import-state.json'); 30 + 31 + console.log('═══════════════════════════════════════════════'); 32 + console.log(' Malachite Rate Limit Monitor'); 33 + console.log('═══════════════════════════════════════════════\n'); 34 + 35 + // Check rate limit state 36 + console.log('📊 Rate Limit Status'); 37 + console.log('─────────────────────────────────────────────'); 38 + 39 + if (!fs.existsSync(rateLimitFile)) { 40 + console.log('Status: No rate limit data available yet'); 41 + console.log(' (Will be created after first API call)\n'); 42 + } else { 43 + try { 44 + const rateLimitData = JSON.parse(fs.readFileSync(rateLimitFile, 'utf8')); 45 + const { limit, remaining, windowSeconds, updatedAt } = rateLimitData; 46 + 47 + const now = Math.floor(Date.now() / 1000); 48 + const age = now - (updatedAt || 0); 49 + const ageMinutes = Math.floor(age / 60); 50 + const ageHours = Math.floor(ageMinutes / 60); 51 + 52 + console.log(`Limit: ${limit?.toLocaleString() || 'Unknown'} points per window`); 53 + console.log(`Remaining: ${remaining?.toLocaleString() || 'Unknown'} points`); 54 + console.log(`Window: ${windowSeconds || 'Unknown'}s (${Math.floor((windowSeconds || 0) / 60)}m)`); 55 + 56 + if (limit && remaining !== undefined) { 57 + const usedPercent = ((limit - remaining) / limit * 100).toFixed(1); 58 + const remainingPercent = (remaining / limit * 100).toFixed(1); 59 + console.log(`Used: ${usedPercent}% of quota`); 60 + console.log(`Available: ${remainingPercent}% remaining`); 61 + 62 + // Visual bar 63 + const barLength = 40; 64 + const usedBars = Math.floor((limit - remaining) / limit * barLength); 65 + const remainingBars = barLength - usedBars; 66 + const bar = '█'.repeat(usedBars) + '░'.repeat(remainingBars); 67 + console.log(`Progress: [${bar}]`); 68 + 69 + // Warning if low 70 + if (remaining < limit * 0.1) { 71 + console.log('\n⚠️ WARNING: Low quota remaining!'); 72 + console.log(' Consider waiting for quota reset before continuing.'); 73 + } else if (remaining < limit * 0.25) { 74 + console.log('\n⚡ NOTICE: Quota below 25%'); 75 + console.log(' Import will slow down or pause if needed.'); 76 + } 77 + } 78 + 79 + console.log(`\nLast Updated: ${ageHours > 0 ? `${ageHours}h` : `${ageMinutes}m`} ago`); 80 + 81 + if (windowSeconds && remaining !== undefined && remaining < 100) { 82 + const resetTime = updatedAt + windowSeconds; 83 + const resetIn = resetTime - now; 84 + if (resetIn > 0) { 85 + const resetMinutes = Math.floor(resetIn / 60); 86 + const resetHours = Math.floor(resetMinutes / 60); 87 + console.log(`Resets in: ${resetHours > 0 ? `${resetHours}h ${resetMinutes % 60}m` : `${resetMinutes}m`}`); 88 + } 89 + } 90 + 91 + } catch (error) { 92 + console.log(`❌ Error reading rate limit file: ${error.message}`); 93 + } 94 + } 95 + 96 + console.log(''); 97 + 98 + // Check import state 99 + console.log('📦 Import Status'); 100 + console.log('─────────────────────────────────────────────'); 101 + 102 + if (!fs.existsSync(importStateFile)) { 103 + console.log('Status: No import in progress\n'); 104 + } else { 105 + try { 106 + const importData = JSON.parse(fs.readFileSync(importStateFile, 'utf8')); 107 + const { totalRecords, processedRecords, successCount, errorCount, completed, startedAt, completedAt } = importData; 108 + 109 + if (completed) { 110 + console.log('Status: ✅ Import completed'); 111 + console.log(`Total: ${totalRecords?.toLocaleString() || 'Unknown'} records`); 112 + console.log(`Success: ${successCount?.toLocaleString() || 'Unknown'} records`); 113 + if (errorCount > 0) { 114 + console.log(`Errors: ${errorCount.toLocaleString()} records`); 115 + } 116 + 117 + if (startedAt && completedAt) { 118 + const duration = new Date(completedAt).getTime() - new Date(startedAt).getTime(); 119 + const hours = Math.floor(duration / (1000 * 60 * 60)); 120 + const minutes = Math.floor((duration % (1000 * 60 * 60)) / (1000 * 60)); 121 + console.log(`Duration: ${hours}h ${minutes}m`); 122 + } 123 + 124 + console.log(`\nCompleted: ${new Date(completedAt).toLocaleString()}`); 125 + } else { 126 + console.log('Status: 🔄 Import in progress'); 127 + console.log(`Total: ${totalRecords?.toLocaleString() || 'Unknown'} records`); 128 + console.log(`Processed: ${processedRecords?.toLocaleString() || 'Unknown'} records`); 129 + console.log(`Success: ${successCount?.toLocaleString() || 'Unknown'} records`); 130 + if (errorCount > 0) { 131 + console.log(`Errors: ${errorCount.toLocaleString()} records`); 132 + } 133 + 134 + if (totalRecords && processedRecords !== undefined) { 135 + const progress = (processedRecords / totalRecords * 100).toFixed(1); 136 + console.log(`Progress: ${progress}%`); 137 + 138 + // Visual bar 139 + const barLength = 40; 140 + const progressBars = Math.floor(processedRecords / totalRecords * barLength); 141 + const remainingBars = barLength - progressBars; 142 + const bar = '█'.repeat(progressBars) + '░'.repeat(remainingBars); 143 + console.log(` [${bar}]`); 144 + 145 + const remaining = totalRecords - processedRecords; 146 + console.log(`Remaining: ${remaining.toLocaleString()} records`); 147 + } 148 + 149 + if (startedAt) { 150 + const elapsed = Date.now() - new Date(startedAt).getTime(); 151 + const hours = Math.floor(elapsed / (1000 * 60 * 60)); 152 + const minutes = Math.floor((elapsed % (1000 * 60 * 60)) / (1000 * 60)); 153 + console.log(`\nElapsed: ${hours}h ${minutes}m`); 154 + console.log(`Started: ${new Date(startedAt).toLocaleString()}`); 155 + } 156 + } 157 + 158 + } catch (error) { 159 + console.log(`❌ Error reading import state: ${error.message}`); 160 + } 161 + } 162 + 163 + console.log(''); 164 + 165 + // Configuration info 166 + console.log('⚙️ Configuration'); 167 + console.log('─────────────────────────────────────────────'); 168 + console.log(`State Directory: ${stateDir}`); 169 + console.log(`Rate Limit File: ${fs.existsSync(rateLimitFile) ? '✅ Present' : '⚠️ Not found'}`); 170 + console.log(`Import State: ${fs.existsSync(importStateFile) ? '✅ Present' : '⚠️ Not found'}`); 171 + 172 + console.log('\n═══════════════════════════════════════════════\n'); 173 + 174 + // Recommendations 175 + if (fs.existsSync(rateLimitFile)) { 176 + try { 177 + const rateLimitData = JSON.parse(fs.readFileSync(rateLimitFile, 'utf8')); 178 + const { limit, remaining } = rateLimitData; 179 + 180 + if (limit && remaining !== undefined) { 181 + console.log('💡 Recommendations:'); 182 + console.log('─────────────────────────────────────────────'); 183 + 184 + if (remaining < limit * 0.1) { 185 + console.log('• WAIT for quota reset before importing more'); 186 + console.log('• Current quota very low (<10%)'); 187 + } else if (remaining < limit * 0.25) { 188 + console.log('• Consider using default safety margin (75%)'); 189 + console.log('• Avoid --aggressive mode for now'); 190 + } else if (remaining > limit * 0.75) { 191 + console.log('• ✅ Good quota available'); 192 + console.log('• Safe to import with default or aggressive settings'); 193 + } else { 194 + console.log('• ✅ Adequate quota available'); 195 + console.log('• Safe to import with default settings'); 196 + } 197 + 198 + console.log('• Always run with --dry-run first to preview'); 199 + console.log('• System will auto-pause if quota is low'); 200 + console.log(''); 201 + } 202 + } catch (error) { 203 + // Silent - already reported above 204 + } 205 + } 206 + 207 + // Exit successfully 208 + process.exit(0);
+20 -48
src/lib/publisher.ts
··· 1 1 import type { AtpAgent } from '@atproto/api'; 2 2 import { formatDuration, formatDate } from '../utils/helpers.js'; 3 3 import { isImportCancelled } from '../utils/killswitch.js'; 4 - import { 5 - calculateDailySchedule, 6 - displayRateLimitWarning, 7 - displayRateLimitInfo, 8 - calculateRateLimitedBatches, 9 - } from '../utils/rate-limiter.js'; 10 4 import { RateLimiter } from '../utils/rate-limiter.js'; 11 5 import { isRateLimitError } from '../utils/rate-limit-headers.js'; 12 6 import { formatLocaleNumber } from '../utils/platform.js'; ··· 275 269 ): PublishResult { 276 270 const totalRecords = records.length; 277 271 278 - // Calculate rate limiting info 279 - const rateLimitParams = calculateRateLimitedBatches(totalRecords, config); 280 - 281 - if (rateLimitParams.needsRateLimiting) { 282 - displayRateLimitWarning(); 283 - batchSize = rateLimitParams.batchSize; 284 - batchDelay = rateLimitParams.batchDelay; 285 - 286 - // Ensure batch size doesn't exceed applyWrites limit 287 - batchSize = Math.min(batchSize, MAX_APPLY_WRITES_OPS); 272 + // Ensure batch size doesn't exceed applyWrites limit 273 + batchSize = Math.min(batchSize, MAX_APPLY_WRITES_OPS); 288 274 289 - displayRateLimitInfo( 290 - totalRecords, 291 - batchSize, 292 - batchDelay, 293 - rateLimitParams.estimatedDays, 294 - rateLimitParams.recordsPerDay 295 - ); 296 - 297 - if (rateLimitParams.estimatedDays > 1) { 298 - // Only show daily schedule in verbose/debug mode 299 - if (log.getLevel() <= 0) { // DEBUG level 300 - const dailySchedule = calculateDailySchedule( 301 - totalRecords, 302 - batchSize, 303 - batchDelay, 304 - rateLimitParams.recordsPerDay 305 - ); 306 - 307 - console.log('📅 Multi-Day Import Schedule:\n'); 308 - dailySchedule.forEach((day) => { 309 - console.log(` Day ${day.day}:`); 310 - console.log(` Records ${day.recordsStart + 1}-${day.recordsEnd} (${day.recordsCount} total)`); 311 - if (day.pauseAfter) { 312 - console.log(` → Pause 24h after completion`); 313 - } 314 - }); 315 - console.log(''); 316 - } 317 - } 275 + // Calculate estimated duration 276 + const estimatedBatches = Math.ceil(totalRecords / batchSize); 277 + const estimatedDuration = estimatedBatches * batchDelay; 278 + 279 + // Check if we'll exceed daily limit 280 + const recordsPerDay = config.RECORDS_PER_DAY_LIMIT || Number.MAX_SAFE_INTEGER; 281 + const needsRateLimiting = totalRecords > recordsPerDay; 282 + 283 + if (needsRateLimiting) { 284 + const estimatedDays = Math.ceil(totalRecords / recordsPerDay); 285 + log.warn('⚠️ Large Import Detected'); 286 + log.warn(`This import exceeds the daily limit of ${formatLocaleNumber(recordsPerDay)} records`); 287 + log.warn(`Estimated duration: ${estimatedDays} days with automatic pauses`); 288 + log.blank(); 318 289 } 319 290 320 291 log.section(`DRY RUN MODE ${syncMode ? '(SYNC)' : ''}`); ··· 324 295 log.info(`Total: ${formatLocaleNumber(totalRecords)} records`); 325 296 log.info(`Batch: ${Math.min(batchSize, MAX_APPLY_WRITES_OPS)} records per call`); 326 297 327 - if (rateLimitParams.estimatedDays > 1) { 328 - log.info(`Duration: ${rateLimitParams.estimatedDays} days with automatic pauses`); 298 + if (needsRateLimiting) { 299 + const estimatedDays = Math.ceil(totalRecords / recordsPerDay); 300 + log.info(`Duration: ${estimatedDays} days with automatic pauses`); 329 301 } else { 330 - log.info(`Time: ~${formatDuration(Math.ceil(totalRecords / batchSize) * batchDelay)}`); 302 + log.info(`Time: ~${formatDuration(estimatedDuration)}`); 331 303 } 332 304 log.blank(); 333 305
+213 -231
src/utils/rate-limiter.ts
··· 1 - import type { Config } from '../types.js'; 2 - import { formatLocaleNumber } from './platform.js'; 3 - import { log } from './logger.js'; 4 - 5 1 /** 6 - * Calculate rate-limited batch parameters 7 - * Ensures we don't exceed daily limits while maintaining efficiency 2 + * IMPROVED Rate Limiter - Based on actual server behavior 3 + * 4 + * KEY LEARNINGS: 5 + * - Rate limits are typically PER HOUR (3600s), not per day 6 + * - Example from selfhosted.social: 7 + * - ratelimit-limit: 5000 points 8 + * - ratelimit-policy: 5000;w=3600 (5000 points per 3600 seconds = 1 hour) 9 + * - Each applyWrites operation costs ~3 points per record 10 + * - MUST respect ratelimit-remaining: 0 as a hard stop 11 + * - MUST wait until ratelimit-reset timestamp before continuing 8 12 */ 9 - export function calculateRateLimitedBatches( 10 - totalRecords: number, 11 - config: Config 12 - ): { 13 - batchSize: number; 14 - batchDelay: number; 15 - estimatedDays: number; 16 - recordsPerDay: number; 17 - needsRateLimiting: boolean; 18 - } { 19 - log.debug(`[rate-limiter.ts] calculateRateLimitedBatches(totalRecords=${totalRecords})`); 13 + 14 + import fs from 'node:fs'; 15 + import path from 'node:path'; 16 + import { getMalachiteStateDir } from './platform.js'; 17 + import { parseRateLimitHeaders } from './rate-limit-headers.js'; 18 + import { log } from './logger.js'; 19 + 20 + export interface RateLimitState { 21 + /** Maximum points allowed in the time window */ 22 + limit: number; 20 23 21 - const dailyLimit = Math.floor(config.RECORDS_PER_DAY_LIMIT * config.SAFETY_MARGIN); 22 - log.debug(`[rate-limiter.ts] Daily limit: ${formatLocaleNumber(dailyLimit)} (raw limit=${config.RECORDS_PER_DAY_LIMIT}, safety margin=${config.SAFETY_MARGIN})`); 24 + /** Points remaining in current window */ 25 + remaining: number; 23 26 24 - // Check if we need rate limiting 25 - const needsRateLimiting = totalRecords > dailyLimit; 26 - log.info(`[rate-limiter.ts] Rate limiting needed: ${needsRateLimiting} (${formatLocaleNumber(totalRecords)} records > ${formatLocaleNumber(dailyLimit)} daily limit)`); 27 + /** Unix timestamp (seconds) when the window resets */ 28 + resetAt: number; 27 29 28 - if (!needsRateLimiting) { 29 - // Can import everything in one go 30 - log.debug(`[rate-limiter.ts] No rate limiting needed - can import all ${formatLocaleNumber(totalRecords)} records in one batch`); 31 - return { 32 - batchSize: config.DEFAULT_BATCH_SIZE, 33 - batchDelay: config.DEFAULT_BATCH_DELAY, 34 - estimatedDays: 1, 35 - recordsPerDay: totalRecords, 36 - needsRateLimiting: false, 37 - }; 38 - } 30 + /** Window duration in seconds (typically 3600 = 1 hour) */ 31 + windowSeconds: number; 39 32 40 - // Calculate how many days needed 41 - const estimatedDays = Math.ceil(totalRecords / dailyLimit); 42 - const recordsPerDay = Math.floor(totalRecords / estimatedDays); 43 - log.info(`[rate-limiter.ts] Estimated duration: ${estimatedDays} day(s), spreading ${formatLocaleNumber(recordsPerDay)} records/day`); 44 - 45 - // Calculate batch parameters 46 - // We want to spread records evenly throughout the day 47 - const minutesPerDay = 24 * 60; 48 - const batchesPerDay = Math.ceil(recordsPerDay / config.DEFAULT_BATCH_SIZE); 49 - const delayBetweenBatches = Math.floor((minutesPerDay * 60 * 1000) / batchesPerDay); 50 - log.debug(`[rate-limiter.ts] Batches per day: ${batchesPerDay}, delay between batches: ${delayBetweenBatches}ms`); 51 - 52 - // Ensure batch delay is at least minimum 53 - const batchDelay = Math.max(delayBetweenBatches, config.MIN_BATCH_DELAY); 54 - if (batchDelay !== delayBetweenBatches) { 55 - log.debug(`[rate-limiter.ts] Batch delay adjusted from ${delayBetweenBatches}ms to minimum ${config.MIN_BATCH_DELAY}ms`); 56 - } 33 + /** When this state was last updated (unix timestamp in seconds) */ 34 + updatedAt: number; 57 35 58 - // Adjust batch size if needed to hit the target 59 - const adjustedBatchSize = Math.min( 60 - Math.ceil(recordsPerDay / Math.floor((minutesPerDay * 60 * 1000) / batchDelay)), 61 - config.MAX_BATCH_SIZE 62 - ); 63 - log.debug(`[rate-limiter.ts] Final batch parameters: size=${adjustedBatchSize}, delay=${batchDelay}ms (default was size=${config.DEFAULT_BATCH_SIZE}, delay=${config.DEFAULT_BATCH_DELAY}ms)`); 64 - 65 - return { 66 - batchSize: adjustedBatchSize, 67 - batchDelay, 68 - estimatedDays, 69 - recordsPerDay, 70 - needsRateLimiting: true, 71 - }; 36 + /** Safety margin to apply (0.0-1.0) */ 37 + safetyMargin: number; 72 38 } 73 39 74 - // -------------------------------------------------- 75 - // RateLimiter class (persisted state + header handling) 76 - // -------------------------------------------------- 77 - import fs from 'node:fs'; 78 - import path from 'node:path'; 79 - import { getMalachiteStateDir } from './platform.js'; 80 - import { parseRateLimitHeaders } from './rate-limit-headers.js'; 81 - 82 40 export class RateLimiter { 83 - stateFile: string; 84 - safety: number; 85 - 41 + private stateFile: string; 42 + private safetyMargin: number; 43 + 86 44 constructor(opts?: { safety?: number }) { 87 - this.safety = opts?.safety ?? 1.0; 45 + this.safetyMargin = opts?.safety ?? 0.75; // Default 75% safety margin 88 46 const stateDir = path.join(getMalachiteStateDir(), 'state'); 89 47 this.stateFile = path.join(stateDir, 'rate-limit.json'); 90 - log.debug(`[RateLimiter] constructor: stateFile=${this.stateFile}, safety=${this.safety}`); 48 + 49 + log.debug(`[RateLimiter] constructor: stateFile=${this.stateFile}, safety=${this.safetyMargin}`); 91 50 this.ensureStateDir(); 92 51 } 93 - 94 - ensureStateDir() { 52 + 53 + private ensureStateDir(): void { 95 54 const dir = path.dirname(this.stateFile); 96 55 if (!fs.existsSync(dir)) { 97 - log.debug(`[RateLimiter] ensureStateDir() creating: ${dir}`); 56 + log.debug(`[RateLimiter] Creating state directory: ${dir}`); 98 57 fs.mkdirSync(dir, { recursive: true }); 99 - } else { 100 - log.debug(`[RateLimiter] ensureStateDir() directory already exists: ${dir}`); 101 58 } 102 59 } 103 - 104 - persistState(obj: Record<string, any>) { 105 - log.debug(`[RateLimiter] persistState() writing to ${this.stateFile}: ${JSON.stringify(obj)}`); 106 - fs.writeFileSync(this.stateFile, JSON.stringify(obj, null, 2), 'utf8'); 107 - } 108 - 109 - readState(): Record<string, any> { 60 + 61 + private readState(): RateLimitState | null { 110 62 try { 111 63 const raw = fs.readFileSync(this.stateFile, 'utf8'); 112 - const state = JSON.parse(raw); 113 - log.debug(`[RateLimiter] readState() loaded from ${this.stateFile}: ${JSON.stringify(state)}`); 64 + const state = JSON.parse(raw) as RateLimitState; 65 + log.debug(`[RateLimiter] Loaded state: ${JSON.stringify(state)}`); 114 66 return state; 115 67 } catch (e) { 116 - log.debug(`[RateLimiter] readState() no state file found (${this.stateFile}), returning empty object`); 117 - return {}; 68 + log.debug(`[RateLimiter] No existing state file`); 69 + return null; 118 70 } 119 71 } 120 - 72 + 73 + private writeState(state: RateLimitState): void { 74 + log.debug(`[RateLimiter] Writing state: ${JSON.stringify(state)}`); 75 + fs.writeFileSync(this.stateFile, JSON.stringify(state, null, 2), 'utf8'); 76 + } 77 + 121 78 /** 122 - * Update internal state based on server rate-limit headers 79 + * Update rate limit state from server response headers 123 80 */ 124 - updateFromHeaders(headers: Record<string, string>) { 81 + updateFromHeaders(headers: Record<string, string>): void { 125 82 log.debug(`[RateLimiter] updateFromHeaders() called`); 126 83 const parsed = parseRateLimitHeaders(headers); 127 - const limit = parsed.limit || 0; 128 - const remainingRaw = parsed.remaining || 0; 129 - const windowSeconds = parsed.windowSeconds || 0; 130 - 131 - const remaining = Math.floor(remainingRaw * this.safety); 132 - log.info(`[RateLimiter] Rate limit info: limit=${limit}, remaining_raw=${remainingRaw}, remaining_with_safety=${remaining}, window=${windowSeconds}s`); 133 - 134 - const obj = { 135 - limit, 136 - remaining, 84 + 85 + if (!parsed.limit || parsed.remaining === undefined) { 86 + log.warn('[RateLimiter] Headers missing limit or remaining - cannot update'); 87 + return; 88 + } 89 + 90 + const now = Math.floor(Date.now() / 1000); 91 + let resetAt = parsed.reset || (now + (parsed.windowSeconds || 3600)); 92 + let windowSeconds = parsed.windowSeconds || 3600; 93 + 94 + // Apply safety margin to remaining 95 + const remainingWithSafety = Math.floor(parsed.remaining * this.safetyMargin); 96 + 97 + const state: RateLimitState = { 98 + limit: parsed.limit, 99 + remaining: remainingWithSafety, 100 + resetAt, 137 101 windowSeconds, 138 - updatedAt: Math.floor(Date.now() / 1000), 102 + updatedAt: now, 103 + safetyMargin: this.safetyMargin 139 104 }; 140 - 141 - this.persistState(obj); 142 - return obj; 105 + 106 + log.info(`[RateLimiter] Updated from headers: ${parsed.limit} limit, ${remainingWithSafety}/${parsed.remaining} remaining (${(this.safetyMargin * 100).toFixed(0)}% safety), resets at ${new Date(resetAt * 1000).toISOString()}`); 107 + 108 + this.writeState(state); 143 109 } 144 - 110 + 145 111 /** 146 - * Decrement remaining permits and persist. If not enough remaining, returns false. 112 + * Check if we have enough quota for the requested points 113 + * Returns true if quota is available, false otherwise 147 114 */ 148 - async waitForPermit(count = 1): Promise<boolean> { 115 + async checkQuota(pointsNeeded: number): Promise<boolean> { 149 116 const state = this.readState(); 150 - const rem = typeof state.remaining === 'number' ? state.remaining : 0; 151 - log.debug(`[RateLimiter] waitForPermit(${count}) called, current remaining: ${rem}/${state.limit}`); 117 + if (!state) { 118 + log.warn('[RateLimiter] No state - assuming quota available'); 119 + return true; // No state yet, let first request go through 120 + } 121 + 122 + const now = Math.floor(Date.now() / 1000); 152 123 153 - if (rem >= count) { 154 - state.remaining = Math.max(0, rem - count); 155 - log.debug(`[RateLimiter] waitForPermit() quota available, decrementing ${rem} -> ${state.remaining}`); 156 - this.persistState(state); 124 + // Check if window has reset 125 + if (now >= state.resetAt) { 126 + log.info(`[RateLimiter] Window has reset! Restoring quota to ${state.limit}`); 127 + state.remaining = Math.floor(state.limit * this.safetyMargin); 128 + state.resetAt = now + state.windowSeconds; 129 + state.updatedAt = now; 130 + this.writeState(state); 157 131 return true; 158 132 } 159 - 160 - // Not enough quota; if windowSeconds is set, sleep until reset 161 - if (state.windowSeconds && state.windowSeconds > 0) { 162 - const waitMs = state.windowSeconds * 1000 + 1000; 163 - const waitSeconds = Math.floor(waitMs / 1000); 164 - log.warn(`[RateLimiter] Quota exhausted! Remaining ${rem}/${state.limit}. Waiting ${waitSeconds}s for quota reset (window=${state.windowSeconds}s)`); 165 - await new Promise((res) => setTimeout(res, waitMs)); 166 - // After waiting, set remaining to limit * safety 167 - state.remaining = Math.floor((state.limit || 0) * this.safety) - count; 168 - if (state.remaining < 0) state.remaining = 0; 169 - log.debug(`[RateLimiter] waitForPermit() quota reset complete, remaining now: ${state.remaining}`); 170 - this.persistState(state); 133 + 134 + // Check if we have enough quota 135 + const hasQuota = state.remaining >= pointsNeeded; 136 + log.debug(`[RateLimiter] checkQuota(${pointsNeeded}): remaining=${state.remaining}, hasQuota=${hasQuota}`); 137 + 138 + return hasQuota; 139 + } 140 + 141 + /** 142 + * Reserve quota points before making a request 143 + * Returns true if reservation succeeded, false if quota exhausted 144 + */ 145 + async reserveQuota(pointsNeeded: number): Promise<boolean> { 146 + const state = this.readState(); 147 + if (!state) { 148 + log.warn('[RateLimiter] No state yet - allowing request (will be initialized from response)'); 171 149 return true; 172 150 } 173 - 174 - log.warn(`[RateLimiter] waitForPermit() failed: quota exhausted and no reset window info available`); 175 - return false; 151 + 152 + const now = Math.floor(Date.now() / 1000); 153 + 154 + // Check if window has reset 155 + if (now >= state.resetAt) { 156 + log.info(`[RateLimiter] Window reset! Restoring quota`); 157 + state.remaining = Math.floor(state.limit * this.safetyMargin); 158 + state.resetAt = now + state.windowSeconds; 159 + state.updatedAt = now; 160 + this.writeState(state); 161 + } 162 + 163 + // Check quota 164 + if (state.remaining < pointsNeeded) { 165 + const waitTime = state.resetAt - now; 166 + log.warn(`[RateLimiter] ❌ Quota exhausted! Need ${pointsNeeded} points, only ${state.remaining} remaining`); 167 + log.warn(`[RateLimiter] Must wait ${waitTime}s (${Math.floor(waitTime / 60)}m ${waitTime % 60}s) until ${new Date(state.resetAt * 1000).toISOString()}`); 168 + return false; 169 + } 170 + 171 + // Reserve the quota 172 + state.remaining -= pointsNeeded; 173 + state.updatedAt = now; 174 + this.writeState(state); 175 + 176 + log.debug(`[RateLimiter] ✅ Reserved ${pointsNeeded} points, ${state.remaining} remaining`); 177 + return true; 176 178 } 177 - } 178 - 179 - /** 180 - * Calculate daily batches and pause times 181 - */ 182 - export function calculateDailySchedule( 183 - totalRecords: number, 184 - batchSize: number, 185 - batchDelay: number, 186 - recordsPerDay: number 187 - ) { 188 - const schedule = []; 189 - 190 - // How many batches fit into a 24h window using the actual delay? 191 - const batchesPerDay = Math.floor((24 * 60 * 60 * 1000) / batchDelay); 192 - 193 - // Max records we could process in one day given the spacing 194 - const maxRecordsPerDay = batchesPerDay * batchSize; 195 - 196 - // Respect the external rate limit (recordsPerDay) 197 - const dailyCap = Math.min(maxRecordsPerDay, recordsPerDay); 198 - 199 - let processed = 0; 200 - let day = 1; 201 - 202 - while (processed < totalRecords) { 203 - const recordsStart = processed; 204 - const dailyCount = Math.min(dailyCap, totalRecords - processed); 205 - const recordsEnd = recordsStart + dailyCount; 206 - const isLastDay = recordsEnd >= totalRecords; 207 - 208 - schedule.push({ 209 - day, 210 - recordsStart, 211 - recordsEnd, 212 - recordsCount: dailyCount, 213 - pauseAfter: !isLastDay, 214 - pauseDuration: isLastDay ? 0 : 24 * 60 * 60 * 1000 215 - }); 216 - 217 - processed = recordsEnd; 218 - day++; 179 + 180 + /** 181 + * Wait until the rate limit window resets 182 + */ 183 + async waitForReset(): Promise<void> { 184 + const state = this.readState(); 185 + if (!state) { 186 + log.warn('[RateLimiter] No state - nothing to wait for'); 187 + return; 188 + } 189 + 190 + const now = Math.floor(Date.now() / 1000); 191 + const waitTime = Math.max(0, state.resetAt - now); 192 + 193 + if (waitTime === 0) { 194 + log.info('[ImprovedRateLimiter] Window already reset'); 195 + return; 196 + } 197 + 198 + const minutes = Math.floor(waitTime / 60); 199 + const seconds = waitTime % 60; 200 + 201 + log.warn(`[RateLimiter] ⏳ Waiting ${minutes}m ${seconds}s for quota reset...`); 202 + log.warn(`[RateLimiter] Reset at: ${new Date(state.resetAt * 1000).toISOString()}`); 203 + 204 + // Wait with 1 second added as buffer 205 + await new Promise(resolve => setTimeout(resolve, (waitTime + 1) * 1000)); 206 + 207 + log.info('[RateLimiter] ✅ Wait complete - quota should be reset'); 219 208 } 220 - 221 - return schedule; 222 - } 223 - 224 - 225 - /** 226 - * Format time duration in human-readable format 227 - */ 228 - export function formatTimeRemaining(ms: number): string { 229 - const days = Math.floor(ms / (24 * 60 * 60 * 1000)); 230 - const hours = Math.floor((ms % (24 * 60 * 60 * 1000)) / (60 * 60 * 1000)); 231 - const minutes = Math.floor((ms % (60 * 60 * 1000)) / (60 * 1000)); 232 209 233 - if (days > 0) { 234 - return `${days}d ${hours}h ${minutes}m`; 235 - } else if (hours > 0) { 236 - return `${hours}h ${minutes}m`; 237 - } else if (minutes > 0) { 238 - return `${minutes}m`; 239 - } else { 240 - return '< 1m'; 210 + /** 211 + * Wait for a permit with the given number of points. 212 + * This combines reserveQuota and waitForReset logic: 213 + * - If quota is available, reserves it immediately 214 + * - If quota is exhausted, waits until reset and then reserves 215 + */ 216 + async waitForPermit(pointsNeeded: number): Promise<boolean> { 217 + // Try to reserve quota first 218 + const reserved = await this.reserveQuota(pointsNeeded); 219 + if (reserved) { 220 + return true; // Got the permit immediately 221 + } 222 + 223 + // Quota exhausted - wait for reset 224 + await this.waitForReset(); 225 + 226 + // Try again after reset 227 + return await this.reserveQuota(pointsNeeded); 241 228 } 242 - } 243 - 244 - /** 245 - * Display rate limit warning 246 - */ 247 - export function displayRateLimitWarning(): void { 248 - console.log(''); 249 - console.log('⚠️ IMPORTANT: Rate Limits'); 250 - console.log(' Exceeding 10K records/day can rate limit your ENTIRE PDS.'); 251 - console.log(' This affects ALL users on your PDS, not just your account.'); 252 - console.log(' Import automatically limits to 10K records/day with pauses.'); 253 - console.log(' See: https://docs.bsky.app/blog/rate-limits-pds-v3'); 254 - console.log(''); 255 - } 256 - 257 - /** 258 - * Display rate limiting info 259 - */ 260 - export function displayRateLimitInfo( 261 - totalRecords: number, 262 - batchSize: number, 263 - batchDelay: number, 264 - estimatedDays: number, 265 - recordsPerDay: number 266 - ): void { 267 - console.log('\n📊 Rate Limiting Information:'); 268 - console.log(` Total records: ${formatLocaleNumber(totalRecords)}`); 269 - console.log(` Daily limit: ${formatLocaleNumber(recordsPerDay)} records/day`); 270 - console.log(` Estimated duration: ${estimatedDays} day${estimatedDays > 1 ? 's' : ''}`); 271 - console.log(` Batch size: ${batchSize} records`); 272 - console.log(` Batch delay: ${(batchDelay / 1000).toFixed(1)}s`); 273 229 274 - if (estimatedDays > 1) { 275 - console.log('\n The import will automatically pause between days.'); 276 - console.log(' You can safely close and restart the importer - it will resume from where it left off.'); 230 + /** 231 + * Get current rate limit status for monitoring 232 + */ 233 + getStatus(): { 234 + hasState: boolean; 235 + limit?: number; 236 + remaining?: number; 237 + remainingPercent?: number; 238 + resetAt?: Date; 239 + secondsUntilReset?: number; 240 + windowSeconds?: number; 241 + } { 242 + const state = this.readState(); 243 + if (!state) { 244 + return { hasState: false }; 245 + } 246 + 247 + const now = Math.floor(Date.now() / 1000); 248 + const secondsUntilReset = Math.max(0, state.resetAt - now); 249 + const remainingPercent = (state.remaining / state.limit) * 100; 250 + 251 + return { 252 + hasState: true, 253 + limit: state.limit, 254 + remaining: state.remaining, 255 + remainingPercent, 256 + resetAt: new Date(state.resetAt * 1000), 257 + secondsUntilReset, 258 + windowSeconds: state.windowSeconds 259 + }; 277 260 } 278 - console.log(''); 279 261 }