Malachite is a tool to import your Last.fm and Spotify listening history to the AT Protocol network using the fm.teal.alpha.feed.play lexicon.
malachite scrobbles importer atproto music
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: implement comprehensive logging system with locale-aware formatting and rate limiting utilities

+695 -37
+240
LOGGING.md
··· 1 + # Comprehensive Deep Logging Guide 2 + 3 + This document describes the deep logging system now integrated throughout Malachite for troubleshooting, monitoring, and understanding program flow. 4 + 5 + ## Logging Levels 6 + 7 + The logger supports 5 levels: 8 + 9 + - **DEBUG** (0): Very detailed information, useful for developers/debugging 10 + - **INFO** (1): General informational messages (default) 11 + - **WARN** (2): Warning messages 12 + - **ERROR** (3): Error messages 13 + - **SILENT** (4): No logging output 14 + 15 + ## Enabling Debug Logging 16 + 17 + To see comprehensive debug logs, set the environment variable before running: 18 + 19 + ```bash 20 + # Enable DEBUG level logging 21 + DEBUG=* pnpm start -i your-file.csv 22 + 23 + # Or set it explicitly 24 + NODE_DEBUG_LOGS=1 pnpm start -i your-file.csv 25 + ``` 26 + 27 + Then logs will output to: 28 + - **Console**: All log messages with colors 29 + - **File**: `~/.malachite/logs/import-TIMESTAMP.log` (platform-aware paths) 30 + 31 + ## What Gets Logged 32 + 33 + ### Platform Detection & Initialization 34 + - **File**: `src/utils/platform.ts` 35 + - **Logs**: 36 + - Platform detection (Windows/macOS/Linux) 37 + - State directory resolution with env var checking 38 + - Cache and logs directory paths 39 + - Locale number formatting 40 + 41 + **Example**: 42 + ``` 43 + [platform.ts] getPlatform() detected: macos (node os.platform() = darwin) 44 + [platform.ts] getMalachiteStateDir() on macOS: /Users/ewan/Library/Application Support/malachite 45 + [platform.ts] formatLocaleNumber(1234567) => 1,234,567 46 + ``` 47 + 48 + ### Rate Limit Headers Parsing 49 + - **File**: `src/utils/rate-limit-headers.ts` 50 + - **Logs**: 51 + - Header parsing attempts with all variants (ratelimit-*, x-ratelimit-*, RateLimit-*) 52 + - Extracted policy windows 53 + - Rate limit error detection logic 54 + 55 + **Example**: 56 + ``` 57 + [rate-limit-headers.ts] parseRateLimitHeaders() called with headers: ["ratelimit-limit","ratelimit-remaining","ratelimit-reset"] 58 + [rate-limit-headers.ts] Extracted window from policy: 3600s from "5000;w=3600" 59 + [rate-limit-headers.ts] parseRateLimitHeaders() result: limit=5000, remaining=4999, reset=1707213600, window=3600s 60 + [rate-limit-headers.ts] isRateLimitError() detected: status 429 61 + ``` 62 + 63 + ### Rate Limiting State Management 64 + - **File**: `src/utils/rate-limiter.ts` 65 + - **Logs**: 66 + - Batch size/delay calculations with safety margins 67 + - State file read/write operations 68 + - Quota pre-decrement operations 69 + - Wait-for-permit logic with reset windows 70 + - Estimated import duration breakdowns 71 + 72 + **Example**: 73 + ``` 74 + [rate-limiter.ts] calculateRateLimitedBatches(totalRecords=50000) 75 + [rate-limiter.ts] Daily limit: 2,500 (raw limit=5000, safety margin=0.5) 76 + [rate-limiter.ts] Rate limiting needed: true (50,000 records > 2,500 daily limit) 77 + [rate-limiter.ts] Estimated duration: 20 day(s), spreading 2,500 records/day 78 + [RateLimiter] constructor: stateFile=/Users/ewan/Library/Application Support/malachite/state/rate-limit.json, safety=0.5 79 + [RateLimiter] ensureStateDir() creating: /Users/ewan/Library/Application Support/malachite/state 80 + [RateLimiter] readState() loaded from ...: {"limit":5000,"remaining":2450,"windowSeconds":3600,"updatedAt":1707209823} 81 + [RateLimiter] waitForPermit(30) called, current remaining: 2450/5000 82 + [RateLimiter] waitForPermit() quota available, pre-decrementing 2450 -> 2420 83 + [RateLimiter] persistState() writing to ...: {"limit":5000,"remaining":2420} 84 + ``` 85 + 86 + ### Publisher Batch Processing 87 + - **File**: `src/lib/publisher.ts` 88 + - **Logs**: 89 + - Configuration parameters (batch size, delay, safety margin) 90 + - Resume state info 91 + - Per-batch progress with calculations 92 + - Adaptive adjustment triggers (speedups/slowdowns) 93 + - Rate limit hit detection and recovery 94 + - Error details with context 95 + - Performance statistics (speed, elapsed time, estimates) 96 + 97 + **Example**: 98 + ``` 99 + [publisher.ts] MAX_APPLY_WRITES_OPS=200, POINTS_PER_RECORD=3 100 + [publisher.ts] Safety margin: 0.5, Records per day limit: 5,000 101 + [publisher.ts] Starting batch: index=0, size=10 102 + [publisher.ts] Reserving quota: batch_size=10, points=30 (3 per record) 103 + [publisher.ts] Sending applyWrites request for 10 records to PDS... 104 + [publisher.ts] Batch success: 10/10 records published in 542ms 105 + [publisher.ts] Updating rate limiter from response headers 106 + ⚡ Speeding up! Delay: 2000ms → 1600ms 107 + [publisher.ts] Stats - Elapsed: 12s | Speed: 45.3 rec/s | Success: 453/1000 | Remaining: ~12s 108 + [publisher.ts] Rate limit error headers: ["ratelimit-limit","ratelimit-remaining"] 109 + [publisher.ts] Waiting for rate limit reset, requesting 90 points... 110 + [RateLimiter] Rate limited! Quota exhausted (150/5000 remaining). Waiting 3600s for quota reset... 111 + ``` 112 + 113 + ## Log Files 114 + 115 + By default, logs are written to platform-specific directories: 116 + 117 + - **macOS**: `~/Library/Application Support/malachite/logs/` 118 + - **Windows**: `%APPDATA%\malachite\logs\` 119 + - **Linux**: `~/.config/malachite/logs/` (or `$XDG_CONFIG_HOME/malachite/logs/`) 120 + 121 + Each import session creates a new file: `import-2025-02-06T15-23-45.log` 122 + 123 + Files contain all logs with ANSI color codes stripped for cleaner text files. 124 + 125 + ## State Persistence Files 126 + 127 + Rate limit state is persisted to: 128 + 129 + - **macOS**: `~/Library/Application Support/malachite/state/rate-limit.json` 130 + - **Windows**: `%APPDATA%\malachite\state\rate-limit.json` 131 + - **Linux**: `~/.config/malachite/state/rate-limit.json` 132 + 133 + Content example: 134 + ```json 135 + { 136 + "limit": 5000, 137 + "remaining": 2420, 138 + "windowSeconds": 3600, 139 + "updatedAt": 1707209823 140 + } 141 + ``` 142 + 143 + This state is read on startup to continue respecting rate limits across sessions. 144 + 145 + ## Debugging Common Issues 146 + 147 + ### "Rate limit hit!" appearing frequently 148 + 149 + Look for these debug logs: 150 + ``` 151 + [rate-limit-headers.ts] parseRateLimitHeaders() result: remaining=XXXX 152 + [RateLimiter] Rate limited! Quota exhausted (XXX/YYYY remaining). Waiting... 153 + ``` 154 + 155 + This indicates the server is returning 429 responses. The logs show: 156 + - How many requests remaining before hitting the limit 157 + - How long the wait window is 158 + - Whether the reset is happening 159 + 160 + ### Batch size/delay not adjusting 161 + 162 + Look for: 163 + ``` 164 + [publisher.ts] Starting batch: index=X, size=Y 165 + [rate-limiter.ts] Final batch parameters: size=X, delay=Yms 166 + ``` 167 + 168 + If these don't change over time, check: 169 + - Whether speed-up conditions are met (5 consecutive successes) 170 + - Whether failure conditions are triggered (3+ consecutive failures) 171 + 172 + ### Platform path issues 173 + 174 + Look for: 175 + ``` 176 + [platform.ts] getMalachiteStateDir() using APPDATA on Windows: ... 177 + [platform.ts] getMalachiteStateDir() XDG_CONFIG_HOME not set, using default on Linux: ... 178 + ``` 179 + 180 + This confirms which path the application is using. You can override with: 181 + - **Windows**: Set `APPDATA` environment variable 182 + - **Linux**: Set `XDG_CONFIG_HOME` environment variable 183 + 184 + ## Performance Monitoring 185 + 186 + Key metrics to watch in logs: 187 + 188 + 1. **Speed (records/sec)**: 189 + ``` 190 + [publisher.ts] Stats - Speed: 45.3 rec/s 191 + ``` 192 + 193 + 2. **Batch success rate**: 194 + ``` 195 + [publisher.ts] Batch success: 450/500 records 196 + ``` 197 + 198 + 3. **Rate limit headroom**: 199 + ``` 200 + [RateLimiter] Rate limit info: limit=5000, remaining_with_safety=2450 201 + ``` 202 + 203 + 4. **Estimated time remaining**: 204 + ``` 205 + [publisher.ts] Stats - Remaining: ~2m 30s 206 + ``` 207 + 208 + ## Adding More Logging 209 + 210 + When debugging specific issues, you can add more granular logging: 211 + 212 + ```typescript 213 + // In any module: 214 + import { log } from '../utils/logger.js'; 215 + 216 + // At start of function: 217 + log.debug(`[module-name] functionName() called with args: ${JSON.stringify(args)}`); 218 + 219 + // During processing: 220 + log.debug(`[module-name] Processing item ${i}/${total}: ${description}`); 221 + 222 + // On completion: 223 + log.debug(`[module-name] Result: ${JSON.stringify(result)}`); 224 + ``` 225 + 226 + All debug logs will appear when `DEBUG=*` is set and be written to both console and file. 227 + 228 + ## Summary 229 + 230 + The comprehensive logging system now provides visibility into: 231 + 232 + ✅ Platform detection and path resolution 233 + ✅ Rate limit header parsing and error detection 234 + ✅ Stateful rate limit tracking and quota management 235 + ✅ Batch processing progress and adaptive adjustments 236 + ✅ Performance metrics and time estimates 237 + ✅ Error details with full context 238 + ✅ State persistence and resume capabilities 239 + 240 + This enables fast troubleshooting and monitoring of the import process across all operating systems.
+4 -3
src/lib/cli.ts
··· 30 30 clearImportState, 31 31 ImportState, 32 32 } from '../utils/import-state.js'; 33 + import { formatLocaleNumber } from '../utils/platform.js'; 33 34 34 35 /** 35 36 * Show help message ··· 593 594 records = csvRecords.map(record => convertToPlayRecord(record, cfg, isDebug)); 594 595 } 595 596 596 - log.success(`Loaded ${rawRecordCount.toLocaleString()} records`); 597 + log.success(`Loaded ${formatLocaleNumber(rawRecordCount)} records`); 597 598 598 599 const dedupResult = deduplicateInputRecords(records); 599 600 records = dedupResult.unique; 600 601 if (dedupResult.duplicates > 0) { 601 - log.warn(`Removed ${dedupResult.duplicates.toLocaleString()} duplicate(s) from input data`); 602 - log.info(`Unique records: ${records.length.toLocaleString()}`); 602 + log.warn(`Removed ${formatLocaleNumber(dedupResult.duplicates)} duplicate(s) from input data`); 603 + log.info(`Unique records: ${formatLocaleNumber(records.length)}`); 603 604 } else { 604 605 log.info(`No duplicates found in input data`); 605 606 }
+57 -32
src/lib/publisher.ts
··· 7 7 displayRateLimitInfo, 8 8 calculateRateLimitedBatches, 9 9 } from '../utils/rate-limiter.js'; 10 + import { RateLimiter } from '../utils/rate-limiter.js'; 11 + import { isRateLimitError } from '../utils/rate-limit-headers.js'; 12 + import { formatLocaleNumber } from '../utils/platform.js'; 10 13 import { generateTIDFromISO } from '../utils/tid.js'; 11 14 import type { PlayRecord, Config, PublishResult } from '../types.js'; 12 15 import { log } from '../utils/logger.js'; ··· 58 61 let consecutiveSuccesses = 0; 59 62 let consecutiveFailures = 0; 60 63 const MAX_CONSECUTIVE_FAILURES = 3; 64 + const POINTS_PER_RECORD = 3; // approximate cost per create operation 65 + 66 + // Persistent rate limiter (reads/writes ~/.malachite/state/rate-limit.json) 67 + const rl = new RateLimiter({ safety: config.SAFETY_MARGIN }); 61 68 62 69 log.section('Conservative Adaptive Import'); 63 70 log.info(`Initial batch size: ${currentBatchSize} records (conservative)`); 64 71 log.info(`Initial delay: ${currentBatchDelay}ms (2 seconds - very safe)`); 72 + log.debug(`[publisher.ts] MAX_APPLY_WRITES_OPS=${MAX_APPLY_WRITES_OPS}, POINTS_PER_RECORD=${POINTS_PER_RECORD}`); 73 + log.debug(`[publisher.ts] Safety margin: ${config.SAFETY_MARGIN}, Records per day limit: ${formatLocaleNumber(config.RECORDS_PER_DAY_LIMIT)}`); 65 74 log.info(`Will automatically adjust based on server response`); 66 75 log.info(`Using conservative settings to protect your PDS`); 67 76 log.blank(); 68 - log.info(`Publishing ${totalRecords.toLocaleString()} records using adaptive batching...`); 77 + log.info(`Publishing ${formatLocaleNumber(totalRecords)} records using adaptive batching...`); 69 78 log.warn('Press Ctrl+C to stop gracefully after current batch'); 70 79 log.blank(); 71 80 ··· 77 86 let startIndex = importState ? getResumeStartIndex(importState) : 0; 78 87 if (importState && startIndex > 0) { 79 88 log.info(`Resuming from record ${startIndex + 1} (${(startIndex / totalRecords * 100).toFixed(1)}% complete)`); 89 + log.debug(`[publisher.ts] Import state loaded, resuming at index=${startIndex}`); 80 90 log.blank(); 81 91 } 82 92 ··· 94 104 log.progress( 95 105 `[${progress}%] Batch ${batchNum} (records ${i + 1}-${Math.min(i + currentBatchSize, totalRecords)}) [size: ${currentBatchSize}, delay: ${currentBatchDelay}ms]` 96 106 ); 107 + log.debug(`[publisher.ts] Starting batch: index=${i}, size=${batch.length}`); 97 108 98 109 const batchStartTime = Date.now(); 99 110 ··· 107 118 })) 108 119 ); 109 120 121 + // Reserve quota (points) for this batch before sending. This will wait until 122 + // server reset if quota is exhausted (persisted across runs). 123 + const batchPoints = batch.length * POINTS_PER_RECORD; 124 + log.debug(`[publisher.ts] Reserving quota: batch_size=${batch.length}, points=${batchPoints} (${POINTS_PER_RECORD} per record)`); 125 + const permit = await rl.waitForPermit(batchPoints); 126 + if (!permit) { 127 + const backoffMs = Math.min(currentBatchDelay * 2, 30000); 128 + log.warn(`Rate limiter cannot grant permit; backing off ${backoffMs}ms`); 129 + await new Promise((resolve) => setTimeout(resolve, backoffMs)); 130 + } 131 + 110 132 try { 111 133 // Call applyWrites with the batch 134 + log.debug(`[publisher.ts] Sending applyWrites request for ${batch.length} records to PDS...`); 112 135 const response = await agent.com.atproto.repo.applyWrites({ 113 136 repo: agent.session?.did || '', 114 137 writes: writes as any, ··· 121 144 consecutiveFailures = 0; 122 145 123 146 const batchDuration = Date.now() - batchStartTime; 124 - log.debug(`Batch complete in ${batchDuration}ms (${batchSuccessCount} successful)`); 147 + log.debug(`[publisher.ts] Batch success: ${batchSuccessCount}/${batch.length} records published in ${batchDuration}ms`); 125 148 126 149 // Save state after successful batch 127 150 if (importState) { ··· 141 164 consecutiveSuccesses = 0; 142 165 } 143 166 167 + // Update limiter from any headers the server returned 168 + try { 169 + const respHeaders = (response as any)?.headers || (response as any)?.data?.headers; 170 + if (respHeaders) { 171 + log.debug(`[publisher.ts] Updating rate limiter from response headers`); 172 + rl.updateFromHeaders(respHeaders as Record<string,string>); 173 + } 174 + } catch (e) { 175 + log.debug(`[publisher.ts] Could not extract headers from response: ${e}`); 176 + } 177 + 144 178 i += batch.length; 145 179 146 180 } catch (error) { 147 181 const err = error as any; 148 - const isRateLimitError = 149 - err.status === 429 || 150 - err.message?.includes('rate limit') || 151 - err.message?.includes('too many requests'); 152 - 153 182 consecutiveFailures++; 154 183 consecutiveSuccesses = 0; 155 184 156 - if (isRateLimitError) { 157 - log.warn('Rate limit hit! Backing off...'); 158 - 159 - // Exponential backoff 160 - const backoffMultiplier = Math.pow(2, consecutiveFailures); 161 - currentBatchDelay = Math.min( 162 - currentBatchDelay * backoffMultiplier, 163 - 60000 // Max 60 seconds 164 - ); 165 - 166 - // Also reduce batch size 167 - currentBatchSize = Math.max( 168 - Math.floor(currentBatchSize / 2), 169 - 10 // Minimum 10 records 170 - ); 185 + const rateLimitError = isRateLimitError(err); 186 + log.debug(`[publisher.ts] Batch error: rateLimitError=${rateLimitError}, consecutiveFailures=${consecutiveFailures}`); 171 187 172 - log.info(`📉 Adjusted: batch size → ${currentBatchSize}, delay → ${currentBatchDelay}ms`); 173 - log.info(`⏳ Waiting ${currentBatchDelay}ms before retry...`); 174 - 175 - await new Promise((resolve) => setTimeout(resolve, currentBatchDelay)); 176 - 177 - // Don't advance i, retry this batch 188 + if (rateLimitError) { 189 + log.warn('Rate limit hit! Inspecting server headers...'); 190 + const headers = err.response?.headers || err.headers || err.data?.headers || {}; 191 + log.debug(`[publisher.ts] Rate limit error headers: ${JSON.stringify(Object.keys(headers))}`); 192 + // Wait for permit for this batch (this will wait until reset if necessary) 193 + const batchPoints = batch.length * POINTS_PER_RECORD; 194 + log.debug(`[publisher.ts] Waiting for rate limit reset, requesting ${batchPoints} points...`); 195 + const waitOk = await rl.waitForPermit(batchPoints); 196 + if (!waitOk) { 197 + const backoffMs = Math.min(Math.pow(2, consecutiveFailures) * 1000, 60000); 198 + log.info(`Fallback backoff: waiting ${backoffMs}ms before retry...`); 199 + await new Promise((resolve) => setTimeout(resolve, backoffMs)); 200 + } 178 201 continue; 179 202 180 203 } else { 181 204 // Other error - log and continue 182 205 errorCount += batch.length; 183 206 log.error(`Batch failed: ${err.message}`); 207 + log.debug(`[publisher.ts] Error details: status=${err.response?.status}, code=${err.code}`); 184 208 185 209 // Log failed records 186 210 batch.slice(0, 3).forEach((record) => { ··· 199 223 if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) { 200 224 currentBatchDelay = Math.min(currentBatchDelay * 2, 10000); 201 225 currentBatchSize = Math.max(Math.floor(currentBatchSize / 2), 10); 202 - log.warn(`📉 Multiple failures: adjusted to ${currentBatchSize} records, ${currentBatchDelay}ms delay`); 226 + log.warn(`📉 Multiple failures (${consecutiveFailures}): adjusted to ${currentBatchSize} records, ${currentBatchDelay}ms delay`); 227 + log.debug(`[publisher.ts] Slowing down due to consecutive failures`); 203 228 } 204 229 205 230 i += batch.length; // Skip failed batch ··· 212 237 const estimatedRemaining = remainingRecords / Math.max(recordsPerSecond, 1); 213 238 214 239 log.debug( 215 - `Elapsed: ${elapsed} | Speed: ${recordsPerSecond.toFixed(1)} rec/s | Remaining: ~${formatDuration(estimatedRemaining * 1000)}` 240 + `[publisher.ts] Stats - Elapsed: ${elapsed} | Speed: ${recordsPerSecond.toFixed(1)} rec/s | Success: ${successCount}/${totalRecords} | Remaining: ~${formatDuration(estimatedRemaining * 1000)}` 216 241 ); 217 242 log.blank(); 218 243 ··· 296 321 if (syncMode) { 297 322 log.info('Sync mode: Only new records will be published'); 298 323 } 299 - log.info(`Total: ${totalRecords.toLocaleString()} records`); 324 + log.info(`Total: ${formatLocaleNumber(totalRecords)} records`); 300 325 log.info(`Batch: ${Math.min(batchSize, MAX_APPLY_WRITES_OPS)} records per call`); 301 326 302 327 if (rateLimitParams.estimatedDays > 1) { ··· 347 372 } 348 373 349 374 if (totalRecords > previewCount) { 350 - log.info(`... and ${(totalRecords - previewCount).toLocaleString()} more records`); 375 + log.info(`... and ${formatLocaleNumber(totalRecords - previewCount)} more records`); 351 376 log.blank(); 352 377 } 353 378
+45
src/tests/rate-limiter.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert'; 3 + import fs from 'node:fs'; 4 + import path from 'node:path'; 5 + import { RateLimiter } from '../utils/rate-limiter.js'; 6 + import { getMalachiteStateDir } from '../utils/platform.js'; 7 + 8 + describe('RateLimiter', () => { 9 + it('persists state after updateFromHeaders and applies safety margin', () => { 10 + const rl = new RateLimiter({ safety: 0.5 }); 11 + const now = Math.floor(Date.now() / 1000); 12 + const headers = { 13 + 'ratelimit-limit': '100', 14 + 'ratelimit-remaining': '80', 15 + 'ratelimit-reset': String(now + 3600), 16 + 'ratelimit-policy': '100;w=3600', 17 + }; 18 + rl.updateFromHeaders(headers); 19 + const statePath = path.join(getMalachiteStateDir(), 'state', 'rate-limit.json'); 20 + assert.ok(fs.existsSync(statePath), 'Persisted state file should exist'); 21 + const raw = fs.readFileSync(statePath, 'utf8'); 22 + const o = JSON.parse(raw); 23 + // safety = 0.5 so remaining should be floored(80 * 0.5) = 40 24 + assert.strictEqual(o.remaining, Math.floor(80 * 0.5)); 25 + assert.strictEqual(o.limit, 100); 26 + assert.strictEqual(o.windowSeconds, 3600); 27 + }); 28 + 29 + it('waitForPermit pre-decrements remaining when quota available', async () => { 30 + const rl = new RateLimiter({ safety: 1.0 }); 31 + const now = Math.floor(Date.now() / 1000); 32 + const headers = { 33 + 'ratelimit-limit': '10', 34 + 'ratelimit-remaining': '3', 35 + 'ratelimit-reset': String(now + 3600), 36 + 'ratelimit-policy': '10;w=3600', 37 + }; 38 + rl.updateFromHeaders(headers); 39 + const statePath = path.join(getMalachiteStateDir(), 'state', 'rate-limit.json'); 40 + const before = JSON.parse(fs.readFileSync(statePath, 'utf8')).remaining; 41 + await rl.waitForPermit(1); 42 + const after = JSON.parse(fs.readFileSync(statePath, 'utf8')).remaining; 43 + assert.strictEqual(after, Math.max(0, before - 1)); 44 + }); 45 + });
+128
src/utils/platform.ts
··· 1 + import os from 'node:os'; 2 + import path from 'node:path'; 3 + import { log } from './logger.js'; 4 + 5 + /** 6 + * Detect the current platform 7 + */ 8 + export type Platform = 'windows' | 'macos' | 'linux'; 9 + 10 + export function getPlatform(): Platform { 11 + const platform = os.platform(); 12 + let result: Platform; 13 + 14 + if (platform === 'win32') result = 'windows'; 15 + else if (platform === 'darwin') result = 'macos'; 16 + else result = 'linux'; 17 + 18 + log.debug(`[platform.ts] getPlatform() detected: ${result} (node os.platform() = ${platform})`); 19 + return result; 20 + } 21 + 22 + /** 23 + * Get the malachite state directory path, respecting platform conventions. 24 + * Windows: %APPDATA%\malachite (or fallback to ~/.malachite) 25 + * macOS: ~/Library/Application Support/malachite 26 + * Linux: ~/.config/malachite (XDG Base Directory spec) 27 + */ 28 + export function getMalachiteStateDir(): string { 29 + const platform = getPlatform(); 30 + const home = os.homedir(); 31 + let stateDir: string; 32 + 33 + switch (platform) { 34 + case 'windows': { 35 + const appdata = process.env.APPDATA; 36 + if (appdata) { 37 + stateDir = path.join(appdata, 'malachite'); 38 + log.debug(`[platform.ts] getMalachiteStateDir() using APPDATA on Windows: ${stateDir}`); 39 + } else { 40 + stateDir = path.join(home, '.malachite'); 41 + log.debug(`[platform.ts] getMalachiteStateDir() APPDATA not set, falling back to: ${stateDir}`); 42 + } 43 + return stateDir; 44 + } 45 + case 'macos': { 46 + stateDir = path.join(home, 'Library', 'Application Support', 'malachite'); 47 + log.debug(`[platform.ts] getMalachiteStateDir() on macOS: ${stateDir}`); 48 + return stateDir; 49 + } 50 + case 'linux': { 51 + const xdgConfig = process.env.XDG_CONFIG_HOME; 52 + if (xdgConfig) { 53 + stateDir = path.join(xdgConfig, 'malachite'); 54 + log.debug(`[platform.ts] getMalachiteStateDir() using XDG_CONFIG_HOME on Linux: ${stateDir}`); 55 + } else { 56 + stateDir = path.join(home, '.config', 'malachite'); 57 + log.debug(`[platform.ts] getMalachiteStateDir() XDG not set, using default on Linux: ${stateDir}`); 58 + } 59 + return stateDir; 60 + } 61 + } 62 + } 63 + 64 + /** 65 + * Get the malachite cache directory path 66 + */ 67 + export function getMalachiteCacheDir(): string { 68 + const baseDir = getMalachiteStateDir(); 69 + const cacheDir = path.join(baseDir, 'cache'); 70 + log.debug(`[platform.ts] getMalachiteCacheDir(): ${cacheDir}`); 71 + return cacheDir; 72 + } 73 + 74 + /** 75 + * Get the malachite logs directory path 76 + */ 77 + export function getMalachiteLogsDir(): string { 78 + const baseDir = getMalachiteStateDir(); 79 + const logsDir = path.join(baseDir, 'logs'); 80 + log.debug(`[platform.ts] getMalachiteLogsDir(): ${logsDir}`); 81 + return logsDir; 82 + } 83 + 84 + /** 85 + * Get a locale-aware number formatter 86 + */ 87 + export function getNumberFormatter(): Intl.NumberFormat { 88 + try { 89 + return new Intl.NumberFormat(undefined, { useGrouping: true }); 90 + } catch (e) { 91 + // Fallback for environments without Intl 92 + return { 93 + format: (n: number) => n.toLocaleString(), 94 + } as Intl.NumberFormat; 95 + } 96 + } 97 + 98 + /** 99 + * Format a number using the system locale 100 + */ 101 + export function formatLocaleNumber(n: number): string { 102 + const formatted = getNumberFormatter().format(n); 103 + log.debug(`[platform.ts] formatLocaleNumber(${n}) => ${formatted}`); 104 + return formatted; 105 + } 106 + 107 + /** 108 + * Get a locale-aware duration formatter (respects system locale for separators, abbreviations) 109 + */ 110 + export function formatLocaleDuration(ms: number): string { 111 + const days = Math.floor(ms / (24 * 60 * 60 * 1000)); 112 + const hours = Math.floor((ms % (24 * 60 * 60 * 1000)) / (60 * 60 * 1000)); 113 + const minutes = Math.floor((ms % (60 * 60 * 1000)) / (60 * 1000)); 114 + const seconds = Math.floor((ms % (60 * 1000)) / 1000); 115 + 116 + const parts: string[] = []; 117 + if (days > 0) parts.push(`${days}d`); 118 + if (hours > 0) parts.push(`${hours}h`); 119 + if (minutes > 0) parts.push(`${minutes}m`); 120 + if (seconds > 0 && ms < 60 * 1000) parts.push(`${seconds}s`); 121 + 122 + if (parts.length === 0) return '< 1s'; 123 + 124 + // Use locale-aware separator (typically space or comma depending on region) 125 + const platform = getPlatform(); 126 + const separator = platform === 'windows' ? ' ' : ' '; 127 + return parts.join(separator); 128 + }
+101
src/utils/rate-limit-headers.ts
··· 1 + /** 2 + * Rate limit header parsing utilities 3 + * Shared across rate-limiter.ts and publisher.ts 4 + */ 5 + import { log } from './logger.js'; 6 + 7 + export interface RateLimitHeaders { 8 + limit?: number; 9 + remaining?: number; 10 + reset?: number; // unix epoch seconds 11 + windowSeconds?: number; 12 + policy?: string; 13 + } 14 + 15 + /** 16 + * Parse rate-limit headers from a response, supporting multiple header name variants 17 + */ 18 + export function parseRateLimitHeaders(headers: Record<string, string>): RateLimitHeaders { 19 + log.debug(`[rate-limit-headers.ts] parseRateLimitHeaders() called with headers: ${JSON.stringify(Object.keys(headers || {}))}`); 20 + const limit = parseInt( 21 + headers['ratelimit-limit'] || 22 + headers['x-ratelimit-limit'] || 23 + headers['RateLimit-Limit'] || 24 + '0', 25 + 10 26 + ) || undefined; 27 + 28 + const remaining = parseInt( 29 + headers['ratelimit-remaining'] || 30 + headers['x-ratelimit-remaining'] || 31 + headers['RateLimit-Remaining'] || 32 + '0', 33 + 10 34 + ) || undefined; 35 + 36 + const reset = parseInt( 37 + headers['ratelimit-reset'] || 38 + headers['x-ratelimit-reset'] || 39 + headers['RateLimit-Reset'] || 40 + '0', 41 + 10 42 + ) || undefined; 43 + 44 + const policy = 45 + headers['ratelimit-policy'] || 46 + headers['x-ratelimit-policy'] || 47 + headers['RateLimit-Policy']; 48 + 49 + // Parse window seconds from policy (e.g. "5000;w=3600") 50 + let windowSeconds: number | undefined; 51 + if (policy) { 52 + const m = /;w=(\d+)/.exec(policy); 53 + if (m) { 54 + windowSeconds = parseInt(m[1], 10); 55 + log.debug(`[rate-limit-headers.ts] Extracted window from policy: ${windowSeconds}s from "${policy}"`); 56 + } 57 + } 58 + 59 + // If reset is present (unix epoch seconds), compute approximate window 60 + if (reset && !windowSeconds) { 61 + const now = Math.floor(Date.now() / 1000); 62 + windowSeconds = Math.max(0, reset - now); 63 + log.debug(`[rate-limit-headers.ts] Computed window from reset time: ${windowSeconds}s`); 64 + } 65 + 66 + const result: RateLimitHeaders = { 67 + limit: limit && !isNaN(limit) ? limit : undefined, 68 + remaining: remaining && !isNaN(remaining) ? remaining : undefined, 69 + reset: reset && !isNaN(reset) ? reset : undefined, 70 + windowSeconds, 71 + policy, 72 + }; 73 + 74 + log.debug(`[rate-limit-headers.ts] parseRateLimitHeaders() result: limit=${result.limit}, remaining=${result.remaining}, reset=${result.reset}, window=${result.windowSeconds}s`); 75 + 76 + return result; 77 + } 78 + 79 + /** 80 + * Check if headers indicate a rate limit error 81 + */ 82 + export function isRateLimitError(error: any): boolean { 83 + if (error.status === 429) { 84 + log.debug(`[rate-limit-headers.ts] isRateLimitError() detected: status 429`); 85 + return true; 86 + } 87 + if (error.message?.includes('rate limit')) { 88 + log.debug(`[rate-limit-headers.ts] isRateLimitError() detected: message contains 'rate limit'`); 89 + return true; 90 + } 91 + if (error.message?.includes('too many requests')) { 92 + log.debug(`[rate-limit-headers.ts] isRateLimitError() detected: message contains 'too many requests'`); 93 + return true; 94 + } 95 + if (error.message?.includes('Rate Limit')) { 96 + log.debug(`[rate-limit-headers.ts] isRateLimitError() detected: message contains 'Rate Limit'`); 97 + return true; 98 + } 99 + log.debug(`[rate-limit-headers.ts] isRateLimitError() false for error: ${error.message}`); 100 + return false; 101 + }
+120 -2
src/utils/rate-limiter.ts
··· 1 1 import type { Config } from '../types.js'; 2 + import { formatLocaleNumber } from './platform.js'; 3 + import { log } from './logger.js'; 2 4 3 5 /** 4 6 * Calculate rate-limited batch parameters ··· 14 16 recordsPerDay: number; 15 17 needsRateLimiting: boolean; 16 18 } { 19 + log.debug(`[rate-limiter.ts] calculateRateLimitedBatches(totalRecords=${totalRecords})`); 20 + 17 21 const dailyLimit = Math.floor(config.RECORDS_PER_DAY_LIMIT * config.SAFETY_MARGIN); 22 + log.debug(`[rate-limiter.ts] Daily limit: ${formatLocaleNumber(dailyLimit)} (raw limit=${config.RECORDS_PER_DAY_LIMIT}, safety margin=${config.SAFETY_MARGIN})`); 18 23 19 24 // Check if we need rate limiting 20 25 const needsRateLimiting = totalRecords > dailyLimit; 26 + log.info(`[rate-limiter.ts] Rate limiting needed: ${needsRateLimiting} (${formatLocaleNumber(totalRecords)} records > ${formatLocaleNumber(dailyLimit)} daily limit)`); 21 27 22 28 if (!needsRateLimiting) { 23 29 // Can import everything in one go 30 + log.debug(`[rate-limiter.ts] No rate limiting needed - can import all ${formatLocaleNumber(totalRecords)} records in one batch`); 24 31 return { 25 32 batchSize: config.DEFAULT_BATCH_SIZE, 26 33 batchDelay: config.DEFAULT_BATCH_DELAY, ··· 33 40 // Calculate how many days needed 34 41 const estimatedDays = Math.ceil(totalRecords / dailyLimit); 35 42 const recordsPerDay = Math.floor(totalRecords / estimatedDays); 43 + log.info(`[rate-limiter.ts] Estimated duration: ${estimatedDays} day(s), spreading ${formatLocaleNumber(recordsPerDay)} records/day`); 36 44 37 45 // Calculate batch parameters 38 46 // We want to spread records evenly throughout the day 39 47 const minutesPerDay = 24 * 60; 40 48 const batchesPerDay = Math.ceil(recordsPerDay / config.DEFAULT_BATCH_SIZE); 41 49 const delayBetweenBatches = Math.floor((minutesPerDay * 60 * 1000) / batchesPerDay); 50 + log.debug(`[rate-limiter.ts] Batches per day: ${batchesPerDay}, delay between batches: ${delayBetweenBatches}ms`); 42 51 43 52 // Ensure batch delay is at least minimum 44 53 const batchDelay = Math.max(delayBetweenBatches, config.MIN_BATCH_DELAY); 54 + if (batchDelay !== delayBetweenBatches) { 55 + log.debug(`[rate-limiter.ts] Batch delay adjusted from ${delayBetweenBatches}ms to minimum ${config.MIN_BATCH_DELAY}ms`); 56 + } 45 57 46 58 // Adjust batch size if needed to hit the target 47 59 const adjustedBatchSize = Math.min( 48 60 Math.ceil(recordsPerDay / Math.floor((minutesPerDay * 60 * 1000) / batchDelay)), 49 61 config.MAX_BATCH_SIZE 50 62 ); 63 + log.debug(`[rate-limiter.ts] Final batch parameters: size=${adjustedBatchSize}, delay=${batchDelay}ms (default was size=${config.DEFAULT_BATCH_SIZE}, delay=${config.DEFAULT_BATCH_DELAY}ms)`); 51 64 52 65 return { 53 66 batchSize: adjustedBatchSize, ··· 58 71 }; 59 72 } 60 73 74 + // -------------------------------------------------- 75 + // RateLimiter class (persisted state + header handling) 76 + // -------------------------------------------------- 77 + import fs from 'node:fs'; 78 + import path from 'node:path'; 79 + import { getMalachiteStateDir } from './platform.js'; 80 + import { parseRateLimitHeaders } from './rate-limit-headers.js'; 81 + 82 + export class RateLimiter { 83 + stateFile: string; 84 + safety: number; 85 + 86 + constructor(opts?: { safety?: number }) { 87 + this.safety = opts?.safety ?? 1.0; 88 + const stateDir = path.join(getMalachiteStateDir(), 'state'); 89 + this.stateFile = path.join(stateDir, 'rate-limit.json'); 90 + log.debug(`[RateLimiter] constructor: stateFile=${this.stateFile}, safety=${this.safety}`); 91 + this.ensureStateDir(); 92 + } 93 + 94 + ensureStateDir() { 95 + const dir = path.dirname(this.stateFile); 96 + if (!fs.existsSync(dir)) { 97 + log.debug(`[RateLimiter] ensureStateDir() creating: ${dir}`); 98 + fs.mkdirSync(dir, { recursive: true }); 99 + } else { 100 + log.debug(`[RateLimiter] ensureStateDir() directory already exists: ${dir}`); 101 + } 102 + } 103 + 104 + persistState(obj: Record<string, any>) { 105 + log.debug(`[RateLimiter] persistState() writing to ${this.stateFile}: ${JSON.stringify(obj)}`); 106 + fs.writeFileSync(this.stateFile, JSON.stringify(obj, null, 2), 'utf8'); 107 + } 108 + 109 + readState(): Record<string, any> { 110 + try { 111 + const raw = fs.readFileSync(this.stateFile, 'utf8'); 112 + const state = JSON.parse(raw); 113 + log.debug(`[RateLimiter] readState() loaded from ${this.stateFile}: ${JSON.stringify(state)}`); 114 + return state; 115 + } catch (e) { 116 + log.debug(`[RateLimiter] readState() no state file found (${this.stateFile}), returning empty object`); 117 + return {}; 118 + } 119 + } 120 + 121 + /** 122 + * Update internal state based on server rate-limit headers 123 + */ 124 + updateFromHeaders(headers: Record<string, string>) { 125 + log.debug(`[RateLimiter] updateFromHeaders() called`); 126 + const parsed = parseRateLimitHeaders(headers); 127 + const limit = parsed.limit || 0; 128 + const remainingRaw = parsed.remaining || 0; 129 + const windowSeconds = parsed.windowSeconds || 0; 130 + 131 + const remaining = Math.floor(remainingRaw * this.safety); 132 + log.info(`[RateLimiter] Rate limit info: limit=${limit}, remaining_raw=${remainingRaw}, remaining_with_safety=${remaining}, window=${windowSeconds}s`); 133 + 134 + const obj = { 135 + limit, 136 + remaining, 137 + windowSeconds, 138 + updatedAt: Math.floor(Date.now() / 1000), 139 + }; 140 + 141 + this.persistState(obj); 142 + return obj; 143 + } 144 + 145 + /** 146 + * Decrement remaining permits and persist. If not enough remaining, returns false. 147 + */ 148 + async waitForPermit(count = 1): Promise<boolean> { 149 + const state = this.readState(); 150 + const rem = typeof state.remaining === 'number' ? state.remaining : 0; 151 + log.debug(`[RateLimiter] waitForPermit(${count}) called, current remaining: ${rem}/${state.limit}`); 152 + 153 + if (rem >= count) { 154 + state.remaining = Math.max(0, rem - count); 155 + log.debug(`[RateLimiter] waitForPermit() quota available, decrementing ${rem} -> ${state.remaining}`); 156 + this.persistState(state); 157 + return true; 158 + } 159 + 160 + // Not enough quota; if windowSeconds is set, sleep until reset 161 + if (state.windowSeconds && state.windowSeconds > 0) { 162 + const waitMs = state.windowSeconds * 1000 + 1000; 163 + const waitSeconds = Math.floor(waitMs / 1000); 164 + log.warn(`[RateLimiter] Quota exhausted! Remaining ${rem}/${state.limit}. Waiting ${waitSeconds}s for quota reset (window=${state.windowSeconds}s)`); 165 + await new Promise((res) => setTimeout(res, waitMs)); 166 + // After waiting, set remaining to limit * safety 167 + state.remaining = Math.floor((state.limit || 0) * this.safety) - count; 168 + if (state.remaining < 0) state.remaining = 0; 169 + log.debug(`[RateLimiter] waitForPermit() quota reset complete, remaining now: ${state.remaining}`); 170 + this.persistState(state); 171 + return true; 172 + } 173 + 174 + log.warn(`[RateLimiter] waitForPermit() failed: quota exhausted and no reset window info available`); 175 + return false; 176 + } 177 + } 178 + 61 179 /** 62 180 * Calculate daily batches and pause times 63 181 */ ··· 147 265 recordsPerDay: number 148 266 ): void { 149 267 console.log('\n📊 Rate Limiting Information:'); 150 - console.log(` Total records: ${totalRecords.toLocaleString()}`); 151 - console.log(` Daily limit: ${recordsPerDay.toLocaleString()} records/day`); 268 + console.log(` Total records: ${formatLocaleNumber(totalRecords)}`); 269 + console.log(` Daily limit: ${formatLocaleNumber(recordsPerDay)} records/day`); 152 270 console.log(` Estimated duration: ${estimatedDays} day${estimatedDays > 1 ? 's' : ''}`); 153 271 console.log(` Batch size: ${batchSize} records`); 154 272 console.log(` Batch delay: ${(batchDelay / 1000).toFixed(1)}s`);