atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add rate limiting and abuse prevention across HTTP, gossipsub, and libp2p

Defense-in-depth rate limiting with zero-config defaults:
- Sliding window rate limiter (src/rate-limiter.ts) with per-pool isolation
- HTTP middleware: per-route rate limits (meta/sync/session/read/write/challenge/admin)
- Body size limits: 1MB JSON, 64KB challenge, 60MB blob, 100MB CAR
- Gossipsub: 8KB message size cap, per-topic rate limiting (60/min commits, 10/min identity)
- libp2p: stream size caps (64KB inbound challenges, 1MB responses)
- libp2p: connection manager limits (100 max, 10 pending, 5/s inbound threshold)
- WebSocket firehose: per-IP connection limits (default 3)
- Challenge validation: targetDid check, path/CID count caps, expiration rejection
- All configurable via env vars, disabled by default in tests

+886 -10
+18
src/config.ts
··· 30 30 NODE_MANAGERS: string[]; 31 31 /** Optional human-readable node name. */ 32 32 NODE_NAME?: string; 33 + /** Whether rate limiting is enabled (default true). */ 34 + RATE_LIMIT_ENABLED: boolean; 35 + /** Per-pool rate limit overrides (requests per minute). */ 36 + RATE_LIMIT_READ_PER_MIN: number; 37 + RATE_LIMIT_SYNC_PER_MIN: number; 38 + RATE_LIMIT_SESSION_PER_MIN: number; 39 + RATE_LIMIT_WRITE_PER_MIN: number; 40 + RATE_LIMIT_CHALLENGE_PER_MIN: number; 41 + RATE_LIMIT_MAX_CONNECTIONS: number; 42 + RATE_LIMIT_FIREHOSE_PER_IP: number; 33 43 } 34 44 35 45 const REQUIRED_KEYS = [ ··· 119 129 NODE_DID: `did:web:${pdsHostname.replace(/:/g, "%3A")}`, 120 130 NODE_MANAGERS: (process.env.NODE_MANAGERS ?? "").split(",").map(s => s.trim()).filter(Boolean), 121 131 NODE_NAME: process.env.NODE_NAME || undefined, 132 + RATE_LIMIT_ENABLED: process.env.RATE_LIMIT_ENABLED !== "false", 133 + RATE_LIMIT_READ_PER_MIN: parseInt(process.env.RATE_LIMIT_READ_PER_MIN ?? "300", 10), 134 + RATE_LIMIT_SYNC_PER_MIN: parseInt(process.env.RATE_LIMIT_SYNC_PER_MIN ?? "30", 10), 135 + RATE_LIMIT_SESSION_PER_MIN: parseInt(process.env.RATE_LIMIT_SESSION_PER_MIN ?? "10", 10), 136 + RATE_LIMIT_WRITE_PER_MIN: parseInt(process.env.RATE_LIMIT_WRITE_PER_MIN ?? "200", 10), 137 + RATE_LIMIT_CHALLENGE_PER_MIN: parseInt(process.env.RATE_LIMIT_CHALLENGE_PER_MIN ?? "20", 10), 138 + RATE_LIMIT_MAX_CONNECTIONS: parseInt(process.env.RATE_LIMIT_MAX_CONNECTIONS ?? "100", 10), 139 + RATE_LIMIT_FIREHOSE_PER_IP: parseInt(process.env.RATE_LIMIT_FIREHOSE_PER_IP ?? "3", 10), 122 140 }; 123 141 } 124 142
+134
src/index.ts
··· 1 1 import { Hono } from "hono"; 2 2 import { cors } from "hono/cors"; 3 3 import { requireAuth } from "./middleware/auth.js"; 4 + import { rateLimitMiddleware } from "./middleware/rate-limit.js"; 5 + import { 6 + jsonBodyLimit, 7 + challengeBodyLimit, 8 + blobBodyLimit, 9 + carBodyLimit, 10 + } from "./middleware/body-limit.js"; 11 + import type { RateLimiter } from "./rate-limiter.js"; 4 12 import type { RepoManager } from "./repo-manager.js"; 5 13 import type { Firehose } from "./firehose.js"; 6 14 import type { Config } from "./config.js"; ··· 15 23 import { respondToChallenge } from "./replication/challenge-response/challenge-responder.js"; 16 24 import { serializeResponse } from "./replication/challenge-response/http-transport.js"; 17 25 import type { StorageChallenge } from "./replication/challenge-response/types.js"; 26 + import { MAX_RECORD_PATHS, MAX_BLOCK_CIDS } from "./replication/challenge-response/types.js"; 18 27 import { generateMstProof } from "./replication/mst-proof.js"; 19 28 import { generateNodeDidDocument } from "./node-identity.js"; 20 29 ··· 42 51 replicationManager?: ReplicationManager, 43 52 replicatedRepoReader?: ReplicatedRepoReader, 44 53 nodeOpts?: NodeIdentityOpts, 54 + rateLimiter?: RateLimiter, 45 55 ) { 46 56 const nodeDid = nodeOpts?.nodeDid ?? config.NODE_DID; 47 57 const nodePublicKeyMultibase = nodeOpts?.nodePublicKeyMultibase ?? ""; ··· 71 81 maxAge: 86400, 72 82 }), 73 83 ); 84 + 85 + // ============================================ 86 + // Rate limit + body size middleware (per route group) 87 + // ============================================ 88 + if (rateLimiter && config.RATE_LIMIT_ENABLED) { 89 + const w = 60_000; // 1 minute window 90 + 91 + // Meta: .well-known, health, describeServer, resolveHandle 92 + const metaRL = rateLimitMiddleware(rateLimiter, { 93 + pool: "meta", 94 + rule: { maxRequests: 600, windowMs: w }, 95 + }); 96 + app.use("/.well-known/*", metaRL); 97 + app.use("/xrpc/_health", metaRL); 98 + app.use("/xrpc/com.atproto.server.describeServer", metaRL); 99 + app.use("/xrpc/com.atproto.identity.resolveHandle", metaRL); 100 + 101 + // RASL 102 + app.use( 103 + "/.well-known/rasl/*", 104 + rateLimitMiddleware(rateLimiter, { 105 + pool: "rasl", 106 + rule: { maxRequests: 600, windowMs: w }, 107 + }), 108 + ); 109 + 110 + // Sync endpoints 111 + app.use( 112 + "/xrpc/com.atproto.sync.*", 113 + rateLimitMiddleware(rateLimiter, { 114 + pool: "sync", 115 + rule: { maxRequests: config.RATE_LIMIT_SYNC_PER_MIN, windowMs: w }, 116 + }), 117 + ); 118 + 119 + // Session endpoints (login/refresh) 120 + const sessionRL = rateLimitMiddleware(rateLimiter, { 121 + pool: "session", 122 + rule: { maxRequests: config.RATE_LIMIT_SESSION_PER_MIN, windowMs: w }, 123 + }); 124 + app.use("/xrpc/com.atproto.server.createSession", sessionRL); 125 + app.use("/xrpc/com.atproto.server.refreshSession", sessionRL); 126 + 127 + // Read endpoints (repo reads) 128 + const readRL = rateLimitMiddleware(rateLimiter, { 129 + pool: "read", 130 + rule: { maxRequests: config.RATE_LIMIT_READ_PER_MIN, windowMs: w }, 131 + authRule: { maxRequests: 1000, windowMs: w }, 132 + }); 133 + app.use("/xrpc/com.atproto.repo.getRecord", readRL); 134 + app.use("/xrpc/com.atproto.repo.listRecords", readRL); 135 + app.use("/xrpc/com.atproto.repo.describeRepo", readRL); 136 + 137 + // Write endpoints (authed only) 138 + const writeRL = rateLimitMiddleware(rateLimiter, { 139 + pool: "write", 140 + rule: { maxRequests: config.RATE_LIMIT_WRITE_PER_MIN, windowMs: w }, 141 + }); 142 + app.use("/xrpc/com.atproto.repo.createRecord", writeRL); 143 + app.use("/xrpc/com.atproto.repo.deleteRecord", writeRL); 144 + app.use("/xrpc/com.atproto.repo.putRecord", writeRL); 145 + app.use("/xrpc/com.atproto.repo.applyWrites", writeRL); 146 + 147 + // Challenge endpoint 148 + app.use( 149 + "/xrpc/org.p2pds.verification.challenge", 150 + rateLimitMiddleware(rateLimiter, { 151 + pool: "challenge", 152 + rule: { maxRequests: config.RATE_LIMIT_CHALLENGE_PER_MIN, windowMs: w }, 153 + }), 154 + ); 155 + 156 + // MST proof 157 + app.use( 158 + "/xrpc/org.p2pds.verification.getMstProof", 159 + rateLimitMiddleware(rateLimiter, { 160 + pool: "mstProof", 161 + rule: { maxRequests: config.RATE_LIMIT_SYNC_PER_MIN, windowMs: w }, 162 + }), 163 + ); 164 + 165 + // Admin endpoints 166 + const adminRL = rateLimitMiddleware(rateLimiter, { 167 + pool: "admin", 168 + rule: { maxRequests: 300, windowMs: w }, 169 + }); 170 + app.use("/xrpc/org.p2pds.admin.*", adminRL); 171 + } 172 + 173 + // Body size limits (always active, independent of rate limiting) 174 + app.use("/xrpc/org.p2pds.verification.challenge", challengeBodyLimit()); 175 + app.use("/xrpc/com.atproto.repo.uploadBlob", blobBodyLimit()); 176 + app.use("/xrpc/com.atproto.repo.importRepo", carBodyLimit()); 177 + // Default 1MB JSON body limit for all other POST endpoints 178 + app.post("/xrpc/*", jsonBodyLimit()); 74 179 75 180 // DID document for did:web resolution — serves the node's DID document 76 181 app.get("/.well-known/did.json", (c) => { ··· 463 568 } 464 569 465 570 const challenge = (await c.req.json()) as StorageChallenge; 571 + 572 + // Validate challenge is targeted at this node 573 + if (challenge.targetDid !== nodeDid) { 574 + return c.json( 575 + { error: "InvalidChallenge", message: "Challenge is not targeted at this node" }, 576 + 400, 577 + ); 578 + } 579 + // Limit work: cap record paths and block CIDs 580 + if (challenge.recordPaths && challenge.recordPaths.length > MAX_RECORD_PATHS) { 581 + return c.json( 582 + { error: "InvalidChallenge", message: `Too many recordPaths (max ${MAX_RECORD_PATHS})` }, 583 + 400, 584 + ); 585 + } 586 + if (challenge.blockCids && challenge.blockCids.length > MAX_BLOCK_CIDS) { 587 + return c.json( 588 + { error: "InvalidChallenge", message: `Too many blockCids (max ${MAX_BLOCK_CIDS})` }, 589 + 400, 590 + ); 591 + } 592 + // Reject expired challenges 593 + if (challenge.expiresAt && new Date(challenge.expiresAt).getTime() < Date.now()) { 594 + return c.json( 595 + { error: "InvalidChallenge", message: "Challenge has expired" }, 596 + 400, 597 + ); 598 + } 599 + 466 600 const response = await respondToChallenge( 467 601 challenge, 468 602 blockStore,
+8
src/ipfs.test.ts
··· 45 45 FIREHOSE_ENABLED: false, 46 46 NODE_DID: "did:web:test.example.com", 47 47 NODE_MANAGERS: [], 48 + RATE_LIMIT_ENABLED: false, 49 + RATE_LIMIT_READ_PER_MIN: 300, 50 + RATE_LIMIT_SYNC_PER_MIN: 30, 51 + RATE_LIMIT_SESSION_PER_MIN: 10, 52 + RATE_LIMIT_WRITE_PER_MIN: 200, 53 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 54 + RATE_LIMIT_MAX_CONNECTIONS: 100, 55 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 48 56 }; 49 57 } 50 58
+40
src/ipfs.ts
··· 4 4 import type { Helia } from "@helia/interface"; 5 5 import type { BlockMap } from "@atproto/repo"; 6 6 import { encode as cborEncode, decode as cborDecode } from "./cbor-compat.js"; 7 + import type { RateLimiter } from "./rate-limiter.js"; 8 + import { DEFAULT_RATE_LIMIT_CONFIG } from "./rate-limiter.js"; 9 + 10 + /** Maximum gossipsub message size (8 KB). Messages larger than this are dropped before decoding. */ 11 + const MAX_GOSSIPSUB_MESSAGE_SIZE = 8192; 7 12 8 13 /** 9 14 * Pure storage: put, get, has blocks by CID string. ··· 104 109 private commitHandlers: CommitNotificationHandler[] = []; 105 110 private identityHandlers: IdentityNotificationHandler[] = []; 106 111 private subscribedTopics: Set<string> = new Set(); 112 + private rateLimiter: RateLimiter | null = null; 107 113 108 114 constructor(config: IpfsConfig) { 109 115 this.config = config; 116 + } 117 + 118 + /** 119 + * Set a rate limiter for gossipsub message rate limiting. 120 + * Called from server.ts after construction. 121 + */ 122 + setRateLimiter(limiter: RateLimiter): void { 123 + this.rateLimiter = limiter; 110 124 } 111 125 112 126 async start(): Promise<void> { ··· 121 135 libp2pConfig.services.pubsub = gossipsub({ 122 136 emitSelf: false, 123 137 allowPublishToZeroTopicPeers: true, 138 + maxInboundDataLength: MAX_GOSSIPSUB_MESSAGE_SIZE, 124 139 }); 140 + // Connection manager limits 141 + libp2pConfig.connectionManager = { 142 + ...libp2pConfig.connectionManager, 143 + maxConnections: 100, 144 + maxIncomingPendingConnections: 10, 145 + inboundConnectionThreshold: 5, 146 + }; 125 147 126 148 this.helia = await createHelia({ 127 149 libp2p: libp2pConfig, ··· 393 415 pubsub.addEventListener("message", (evt: unknown) => { 394 416 try { 395 417 const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail; 418 + 419 + // Drop oversized messages before decoding 420 + if (detail.data.length > MAX_GOSSIPSUB_MESSAGE_SIZE) { 421 + return; 422 + } 423 + 424 + // Per-topic rate limiting via the shared RateLimiter 425 + if (this.rateLimiter) { 426 + const isCommit = detail.topic.startsWith(COMMIT_TOPIC_PREFIX); 427 + const isIdentity = detail.topic.startsWith(IDENTITY_TOPIC_PREFIX); 428 + if (isCommit || isIdentity) { 429 + const rule = isCommit 430 + ? DEFAULT_RATE_LIMIT_CONFIG.gossipsubCommit 431 + : DEFAULT_RATE_LIMIT_CONFIG.gossipsubIdentity; 432 + const result = this.rateLimiter.check("gossipsub", detail.topic, rule); 433 + if (!result.allowed) return; // silently drop 434 + } 435 + } 396 436 397 437 if (detail.topic.startsWith(COMMIT_TOPIC_PREFIX)) { 398 438 const notification = cborDecode(detail.data) as CommitNotification;
+41
src/middleware/body-limit.ts
··· 1 + /** 2 + * Body size limit middleware using Hono's built-in bodyLimit. 3 + * 4 + * Returns 413 Payload Too Large with a JSON error body when exceeded. 5 + */ 6 + 7 + import { bodyLimit } from "hono/body-limit"; 8 + 9 + function limitHandler(maxSize: string) { 10 + return () => 11 + new Response( 12 + JSON.stringify({ 13 + error: "PayloadTooLarge", 14 + message: `Request body exceeds ${maxSize} limit`, 15 + }), 16 + { 17 + status: 413, 18 + headers: { "Content-Type": "application/json" }, 19 + }, 20 + ); 21 + } 22 + 23 + /** 1 MB — default for JSON endpoints. */ 24 + export function jsonBodyLimit() { 25 + return bodyLimit({ maxSize: 1024 * 1024, onError: limitHandler("1 MB") }); 26 + } 27 + 28 + /** 64 KB — challenge POST payload. */ 29 + export function challengeBodyLimit() { 30 + return bodyLimit({ maxSize: 64 * 1024, onError: limitHandler("64 KB") }); 31 + } 32 + 33 + /** 60 MB — blob uploads (matches existing uploadBlob check). */ 34 + export function blobBodyLimit() { 35 + return bodyLimit({ maxSize: 60 * 1024 * 1024, onError: limitHandler("60 MB") }); 36 + } 37 + 38 + /** 100 MB — CAR file imports (matches existing importRepo check). */ 39 + export function carBodyLimit() { 40 + return bodyLimit({ maxSize: 100 * 1024 * 1024, onError: limitHandler("100 MB") }); 41 + }
+125
src/middleware/rate-limit.test.ts
··· 1 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 2 + import { Hono } from "hono"; 3 + import { RateLimiter } from "../rate-limiter.js"; 4 + import { rateLimitMiddleware, getClientIp } from "./rate-limit.js"; 5 + 6 + describe("rateLimitMiddleware", () => { 7 + let limiter: RateLimiter; 8 + 9 + beforeEach(() => { 10 + limiter = new RateLimiter(); 11 + }); 12 + 13 + afterEach(() => { 14 + limiter.stop(); 15 + }); 16 + 17 + function createTestApp() { 18 + const app = new Hono(); 19 + app.use( 20 + "/api/*", 21 + rateLimitMiddleware(limiter, { 22 + pool: "test", 23 + rule: { maxRequests: 3, windowMs: 60_000 }, 24 + authRule: { maxRequests: 10, windowMs: 60_000 }, 25 + }), 26 + ); 27 + app.get("/api/hello", (c) => c.json({ ok: true })); 28 + return app; 29 + } 30 + 31 + it("allows requests under the limit", async () => { 32 + const app = createTestApp(); 33 + const res = await app.request("/api/hello", { 34 + headers: { "X-Forwarded-For": "1.2.3.4" }, 35 + }); 36 + expect(res.status).toBe(200); 37 + expect(res.headers.get("X-RateLimit-Limit")).toBe("3"); 38 + expect(res.headers.get("X-RateLimit-Remaining")).toBe("2"); 39 + }); 40 + 41 + it("returns 429 when limit exceeded", async () => { 42 + const app = createTestApp(); 43 + for (let i = 0; i < 3; i++) { 44 + await app.request("/api/hello", { 45 + headers: { "X-Forwarded-For": "1.2.3.4" }, 46 + }); 47 + } 48 + const res = await app.request("/api/hello", { 49 + headers: { "X-Forwarded-For": "1.2.3.4" }, 50 + }); 51 + expect(res.status).toBe(429); 52 + const body = (await res.json()) as { error: string }; 53 + expect(body.error).toBe("RateLimitExceeded"); 54 + expect(res.headers.get("Retry-After")).toBeTruthy(); 55 + }); 56 + 57 + it("uses auth rule for authenticated requests", async () => { 58 + const app = createTestApp(); 59 + // Exhaust unauth limit 60 + for (let i = 0; i < 3; i++) { 61 + await app.request("/api/hello", { 62 + headers: { "X-Forwarded-For": "1.2.3.4" }, 63 + }); 64 + } 65 + // Unauth should be blocked 66 + const unauthRes = await app.request("/api/hello", { 67 + headers: { "X-Forwarded-For": "1.2.3.4" }, 68 + }); 69 + expect(unauthRes.status).toBe(429); 70 + 71 + // Auth requests use a separate pool with higher limit 72 + const authRes = await app.request("/api/hello", { 73 + headers: { 74 + "X-Forwarded-For": "1.2.3.4", 75 + Authorization: "Bearer test-token", 76 + }, 77 + }); 78 + expect(authRes.status).toBe(200); 79 + expect(authRes.headers.get("X-RateLimit-Limit")).toBe("10"); 80 + }); 81 + 82 + it("isolates different IPs", async () => { 83 + const app = createTestApp(); 84 + for (let i = 0; i < 3; i++) { 85 + await app.request("/api/hello", { 86 + headers: { "X-Forwarded-For": "1.2.3.4" }, 87 + }); 88 + } 89 + const res = await app.request("/api/hello", { 90 + headers: { "X-Forwarded-For": "5.6.7.8" }, 91 + }); 92 + expect(res.status).toBe(200); 93 + }); 94 + }); 95 + 96 + describe("getClientIp", () => { 97 + function mockContext(headers: Record<string, string>) { 98 + return { 99 + req: { 100 + header: (name: string) => headers[name], 101 + }, 102 + } as unknown as Parameters<typeof getClientIp>[0]; 103 + } 104 + 105 + it("prefers X-Forwarded-For", () => { 106 + expect( 107 + getClientIp( 108 + mockContext({ 109 + "X-Forwarded-For": "1.2.3.4, 5.6.7.8", 110 + "X-Real-IP": "9.0.0.1", 111 + }), 112 + ), 113 + ).toBe("1.2.3.4"); 114 + }); 115 + 116 + it("falls back to X-Real-IP", () => { 117 + expect(getClientIp(mockContext({ "X-Real-IP": "9.0.0.1" }))).toBe( 118 + "9.0.0.1", 119 + ); 120 + }); 121 + 122 + it("returns unknown when no headers", () => { 123 + expect(getClientIp(mockContext({}))).toBe("unknown"); 124 + }); 125 + });
+62
src/middleware/rate-limit.ts
··· 1 + /** 2 + * HTTP rate limit middleware for Hono. 3 + * 4 + * Keys by client IP (X-Forwarded-For → X-Real-IP → "unknown"). 5 + * Supports separate rules for authenticated vs unauthenticated requests. 6 + * Sets standard rate limit response headers. 7 + */ 8 + 9 + import type { Context, Next, MiddlewareHandler } from "hono"; 10 + import type { RateLimiter, RateLimitRule } from "../rate-limiter.js"; 11 + 12 + export interface RateLimitOptions { 13 + pool: string; 14 + rule: RateLimitRule; 15 + /** Higher limits for authenticated requests (optional). */ 16 + authRule?: RateLimitRule; 17 + } 18 + 19 + /** 20 + * Extract client IP from request headers. 21 + * Trusts X-Forwarded-For (first hop) → X-Real-IP → "unknown". 22 + */ 23 + export function getClientIp(c: Context): string { 24 + const xff = c.req.header("X-Forwarded-For"); 25 + if (xff) { 26 + const first = xff.split(",")[0]?.trim(); 27 + if (first) return first; 28 + } 29 + const xri = c.req.header("X-Real-IP"); 30 + if (xri) return xri.trim(); 31 + return "unknown"; 32 + } 33 + 34 + export function rateLimitMiddleware( 35 + rateLimiter: RateLimiter, 36 + options: RateLimitOptions, 37 + ): MiddlewareHandler { 38 + return async (c: Context, next: Next) => { 39 + const ip = getClientIp(c); 40 + const isAuthed = c.req.header("Authorization")?.startsWith("Bearer ") ?? false; 41 + const rule = isAuthed && options.authRule ? options.authRule : options.rule; 42 + const poolName = isAuthed && options.authRule ? `${options.pool}:auth` : options.pool; 43 + 44 + const result = rateLimiter.check(poolName, ip, rule); 45 + 46 + c.header("X-RateLimit-Limit", String(rule.maxRequests)); 47 + c.header("X-RateLimit-Remaining", String(result.remaining)); 48 + 49 + if (!result.allowed) { 50 + c.header( 51 + "Retry-After", 52 + String(Math.ceil((result.retryAfterMs ?? 1000) / 1000)), 53 + ); 54 + return c.json( 55 + { error: "RateLimitExceeded", message: "Too many requests" }, 56 + 429, 57 + ); 58 + } 59 + 60 + await next(); 61 + }; 62 + }
+136
src/rate-limiter.test.ts
··· 1 + import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 2 + import { RateLimiter, DEFAULT_RATE_LIMIT_CONFIG } from "./rate-limiter.js"; 3 + import type { RateLimitRule } from "./rate-limiter.js"; 4 + 5 + describe("RateLimiter", () => { 6 + let limiter: RateLimiter; 7 + 8 + beforeEach(() => { 9 + limiter = new RateLimiter(); 10 + }); 11 + 12 + afterEach(() => { 13 + limiter.stop(); 14 + }); 15 + 16 + describe("check()", () => { 17 + const rule: RateLimitRule = { maxRequests: 5, windowMs: 1000 }; 18 + 19 + it("allows requests under the limit", () => { 20 + for (let i = 0; i < 5; i++) { 21 + const result = limiter.check("test", "ip1", rule); 22 + expect(result.allowed).toBe(true); 23 + expect(result.remaining).toBe(4 - i); 24 + } 25 + }); 26 + 27 + it("rejects requests at the limit", () => { 28 + for (let i = 0; i < 5; i++) { 29 + limiter.check("test", "ip1", rule); 30 + } 31 + const result = limiter.check("test", "ip1", rule); 32 + expect(result.allowed).toBe(false); 33 + expect(result.remaining).toBe(0); 34 + expect(result.retryAfterMs).toBeGreaterThan(0); 35 + }); 36 + 37 + it("isolates different keys", () => { 38 + for (let i = 0; i < 5; i++) { 39 + limiter.check("test", "ip1", rule); 40 + } 41 + const result = limiter.check("test", "ip2", rule); 42 + expect(result.allowed).toBe(true); 43 + }); 44 + 45 + it("isolates different pools", () => { 46 + for (let i = 0; i < 5; i++) { 47 + limiter.check("pool1", "ip1", rule); 48 + } 49 + const result = limiter.check("pool2", "ip1", rule); 50 + expect(result.allowed).toBe(true); 51 + }); 52 + 53 + it("resets after window expires", () => { 54 + vi.useFakeTimers(); 55 + try { 56 + for (let i = 0; i < 5; i++) { 57 + limiter.check("test", "ip1", rule); 58 + } 59 + expect(limiter.check("test", "ip1", rule).allowed).toBe(false); 60 + 61 + // Advance past 2 full windows so previous count is also cleared 62 + vi.advanceTimersByTime(2001); 63 + 64 + const result = limiter.check("test", "ip1", rule); 65 + expect(result.allowed).toBe(true); 66 + expect(result.remaining).toBe(4); 67 + } finally { 68 + vi.useRealTimers(); 69 + } 70 + }); 71 + 72 + it("uses sliding window weight for gradual recovery", () => { 73 + vi.useFakeTimers(); 74 + try { 75 + // Fill up the window 76 + for (let i = 0; i < 5; i++) { 77 + limiter.check("test", "ip1", rule); 78 + } 79 + expect(limiter.check("test", "ip1", rule).allowed).toBe(false); 80 + 81 + // Advance to 80% through the next window 82 + // Previous window count (5) gets weighted by 0.2 = 1.0 effective 83 + // So we should have ~4 remaining 84 + vi.advanceTimersByTime(1800); 85 + 86 + const result = limiter.check("test", "ip1", rule); 87 + expect(result.allowed).toBe(true); 88 + } finally { 89 + vi.useRealTimers(); 90 + } 91 + }); 92 + }); 93 + 94 + describe("cleanup", () => { 95 + it("removes stale entries", () => { 96 + vi.useFakeTimers(); 97 + try { 98 + const rule: RateLimitRule = { maxRequests: 10, windowMs: 1000 }; 99 + limiter.check("test", "ip1", rule); 100 + limiter.startCleanup(100); 101 + 102 + // Advance past 2 minutes (cleanup threshold) 103 + vi.advanceTimersByTime(130_000); 104 + 105 + // Entry should be cleaned up, next check starts fresh 106 + const result = limiter.check("test", "ip1", rule); 107 + expect(result.allowed).toBe(true); 108 + expect(result.remaining).toBe(9); 109 + } finally { 110 + vi.useRealTimers(); 111 + } 112 + }); 113 + }); 114 + 115 + describe("stop()", () => { 116 + it("clears all state", () => { 117 + const rule: RateLimitRule = { maxRequests: 1, windowMs: 60_000 }; 118 + limiter.check("test", "ip1", rule); 119 + limiter.stop(); 120 + 121 + // After stop, state is cleared — new check should succeed 122 + const result = limiter.check("test", "ip1", rule); 123 + expect(result.allowed).toBe(true); 124 + }); 125 + }); 126 + 127 + describe("DEFAULT_RATE_LIMIT_CONFIG", () => { 128 + it("has expected pools", () => { 129 + expect(DEFAULT_RATE_LIMIT_CONFIG.httpUnauthenticated.read.maxRequests).toBe(300); 130 + expect(DEFAULT_RATE_LIMIT_CONFIG.httpUnauthenticated.sync.maxRequests).toBe(30); 131 + expect(DEFAULT_RATE_LIMIT_CONFIG.httpUnauthenticated.session.maxRequests).toBe(10); 132 + expect(DEFAULT_RATE_LIMIT_CONFIG.httpAuthenticated.write.maxRequests).toBe(200); 133 + expect(DEFAULT_RATE_LIMIT_CONFIG.firehosePerIp.maxConnections).toBe(3); 134 + }); 135 + }); 136 + });
+141
src/rate-limiter.ts
··· 1 + /** 2 + * Sliding window rate limiter. 3 + * 4 + * Uses a weighted sliding window counter: tracks the previous and current 5 + * window counts, then computes an effective count by weighting the previous 6 + * window's count by how far into the current window we are. 7 + * 8 + * Each (pool, key) pair has its own counter. Pools separate different 9 + * endpoint groups (e.g. "sync", "read", "session") so their limits 10 + * are independent. Keys are typically client IPs. 11 + */ 12 + 13 + export interface RateLimitRule { 14 + maxRequests: number; 15 + windowMs: number; 16 + } 17 + 18 + export interface RateLimitResult { 19 + allowed: boolean; 20 + remaining: number; 21 + retryAfterMs?: number; 22 + } 23 + 24 + interface WindowEntry { 25 + prevCount: number; 26 + currCount: number; 27 + windowStart: number; 28 + } 29 + 30 + export interface RateLimitConfig { 31 + httpUnauthenticated: Record<string, RateLimitRule>; 32 + httpAuthenticated: Record<string, RateLimitRule>; 33 + challengeIncoming: RateLimitRule; 34 + gossipsubCommit: RateLimitRule; 35 + gossipsubIdentity: RateLimitRule; 36 + firehosePerIp: { maxConnections: number }; 37 + cleanupIntervalMs: number; 38 + } 39 + 40 + export const DEFAULT_RATE_LIMIT_CONFIG: RateLimitConfig = { 41 + httpUnauthenticated: { 42 + read: { maxRequests: 300, windowMs: 60_000 }, 43 + sync: { maxRequests: 30, windowMs: 60_000 }, 44 + session: { maxRequests: 10, windowMs: 60_000 }, 45 + challenge: { maxRequests: 20, windowMs: 60_000 }, 46 + rasl: { maxRequests: 600, windowMs: 60_000 }, 47 + mstProof: { maxRequests: 30, windowMs: 60_000 }, 48 + meta: { maxRequests: 600, windowMs: 60_000 }, 49 + }, 50 + httpAuthenticated: { 51 + read: { maxRequests: 1000, windowMs: 60_000 }, 52 + write: { maxRequests: 200, windowMs: 60_000 }, 53 + admin: { maxRequests: 300, windowMs: 60_000 }, 54 + }, 55 + challengeIncoming: { maxRequests: 10, windowMs: 300_000 }, 56 + gossipsubCommit: { maxRequests: 60, windowMs: 60_000 }, 57 + gossipsubIdentity: { maxRequests: 10, windowMs: 60_000 }, 58 + firehosePerIp: { maxConnections: 3 }, 59 + cleanupIntervalMs: 60_000, 60 + }; 61 + 62 + export class RateLimiter { 63 + private pools: Map<string, Map<string, WindowEntry>> = new Map(); 64 + private cleanupTimer: ReturnType<typeof setInterval> | null = null; 65 + 66 + check(pool: string, key: string, rule: RateLimitRule): RateLimitResult { 67 + const now = Date.now(); 68 + let poolMap = this.pools.get(pool); 69 + if (!poolMap) { 70 + poolMap = new Map(); 71 + this.pools.set(pool, poolMap); 72 + } 73 + 74 + let entry = poolMap.get(key); 75 + if (!entry) { 76 + entry = { prevCount: 0, currCount: 0, windowStart: now }; 77 + poolMap.set(key, entry); 78 + } 79 + 80 + const elapsed = now - entry.windowStart; 81 + 82 + // If we've moved past the current window, slide forward 83 + if (elapsed >= rule.windowMs) { 84 + // How many full windows have passed? 85 + if (elapsed >= rule.windowMs * 2) { 86 + // Two or more windows: previous window is also gone 87 + entry.prevCount = 0; 88 + } else { 89 + // One window: current becomes previous 90 + entry.prevCount = entry.currCount; 91 + } 92 + entry.currCount = 0; 93 + entry.windowStart = now - (elapsed % rule.windowMs); 94 + } 95 + 96 + // Weighted sliding window: weight previous window by how far into current window 97 + const currentElapsed = now - entry.windowStart; 98 + const weight = 1 - currentElapsed / rule.windowMs; 99 + const effectiveCount = entry.prevCount * weight + entry.currCount; 100 + 101 + if (effectiveCount >= rule.maxRequests) { 102 + const retryAfterMs = rule.windowMs - currentElapsed; 103 + return { 104 + allowed: false, 105 + remaining: 0, 106 + retryAfterMs: Math.max(retryAfterMs, 1), 107 + }; 108 + } 109 + 110 + entry.currCount++; 111 + const remaining = Math.max(0, Math.floor(rule.maxRequests - effectiveCount - 1)); 112 + return { allowed: true, remaining }; 113 + } 114 + 115 + startCleanup(intervalMs: number = 60_000): void { 116 + if (this.cleanupTimer) return; 117 + this.cleanupTimer = setInterval(() => { 118 + const now = Date.now(); 119 + for (const [poolName, poolMap] of this.pools) { 120 + for (const [key, entry] of poolMap) { 121 + // Remove entries that are older than 2 windows (conservative) 122 + // Use the largest common window (60s) as baseline 123 + if (now - entry.windowStart > 120_000) { 124 + poolMap.delete(key); 125 + } 126 + } 127 + if (poolMap.size === 0) { 128 + this.pools.delete(poolName); 129 + } 130 + } 131 + }, intervalMs); 132 + } 133 + 134 + stop(): void { 135 + if (this.cleanupTimer) { 136 + clearInterval(this.cleanupTimer); 137 + this.cleanupTimer = null; 138 + } 139 + this.pools.clear(); 140 + } 141 + }
+8
src/replication/challenge-response/challenge-response.test.ts
··· 37 37 FIREHOSE_ENABLED: false, 38 38 NODE_DID: "did:web:test.example.com", 39 39 NODE_MANAGERS: [], 40 + RATE_LIMIT_ENABLED: false, 41 + RATE_LIMIT_READ_PER_MIN: 300, 42 + RATE_LIMIT_SYNC_PER_MIN: 30, 43 + RATE_LIMIT_SESSION_PER_MIN: 10, 44 + RATE_LIMIT_WRITE_PER_MIN: 200, 45 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 46 + RATE_LIMIT_MAX_CONNECTIONS: 100, 47 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 40 48 }; 41 49 } 42 50
+8
src/replication/challenge-response/e2e-challenge.test.ts
··· 45 45 FIREHOSE_ENABLED: false, 46 46 NODE_DID: "did:web:test.example.com", 47 47 NODE_MANAGERS: [], 48 + RATE_LIMIT_ENABLED: false, 49 + RATE_LIMIT_READ_PER_MIN: 300, 50 + RATE_LIMIT_SYNC_PER_MIN: 30, 51 + RATE_LIMIT_SESSION_PER_MIN: 10, 52 + RATE_LIMIT_WRITE_PER_MIN: 200, 53 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 54 + RATE_LIMIT_MAX_CONNECTIONS: 100, 55 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 48 56 }; 49 57 } 50 58
+18 -9
src/replication/challenge-response/libp2p-transport.ts
··· 14 14 import type { Libp2p, Stream } from "@libp2p/interface"; 15 15 import { multiaddr } from "@multiformats/multiaddr"; 16 16 import type { StorageChallenge, StorageChallengeResponse } from "./types.js"; 17 + import { MAX_CHALLENGE_SIZE } from "./types.js"; 17 18 import type { ChallengeTransport } from "./transport.js"; 18 19 import { serializeResponse, deserializeResponse } from "./http-transport.js"; 20 + 21 + /** Maximum response size (1 MB) — responses can include proof data. */ 22 + const MAX_RESPONSE_SIZE = 1024 * 1024; 19 23 20 24 export const CHALLENGE_PROTOCOL = "/p2pds/challenge/1.0.0"; 21 25 22 26 /** 23 27 * Collect all chunks from a libp2p stream into a single Uint8Array. 24 28 * Stream chunks may be Uint8Array or Uint8ArrayList; normalize via subarray(). 29 + * Throws if accumulated bytes exceed maxSize (abuse prevention). 25 30 */ 26 - async function collectStream(stream: AsyncIterable<Uint8Array | { subarray(): Uint8Array }>): Promise<Uint8Array> { 31 + async function collectStream( 32 + stream: AsyncIterable<Uint8Array | { subarray(): Uint8Array }>, 33 + maxSize: number = MAX_CHALLENGE_SIZE, 34 + ): Promise<Uint8Array> { 27 35 const chunks: Uint8Array[] = []; 36 + let totalSize = 0; 28 37 for await (const chunk of stream) { 29 - if (chunk instanceof Uint8Array) { 30 - chunks.push(chunk); 31 - } else { 32 - // Uint8ArrayList — convert to Uint8Array 33 - chunks.push(chunk.subarray()); 38 + const bytes = chunk instanceof Uint8Array ? chunk : chunk.subarray(); 39 + totalSize += bytes.length; 40 + if (totalSize > maxSize) { 41 + throw new Error(`Stream exceeded maximum size of ${maxSize} bytes`); 34 42 } 43 + chunks.push(bytes); 35 44 } 36 45 if (chunks.length === 0) return new Uint8Array(0); 37 46 if (chunks.length === 1) return chunks[0]!; 38 - const total = chunks.reduce((acc, c) => acc + c.length, 0); 39 - const result = new Uint8Array(total); 47 + const result = new Uint8Array(totalSize); 40 48 let offset = 0; 41 49 for (const c of chunks) { 42 50 result.set(c, offset); ··· 67 75 stream.send(challengeBytes); 68 76 await stream.close(); // flush + close write; stream remains readable 69 77 70 - // Read response 78 + // Read response (allow up to 1 MB for proof data) 71 79 const responseBytes = await collectStream( 72 80 stream as unknown as AsyncIterable<Uint8Array | { subarray(): Uint8Array }>, 81 + MAX_RESPONSE_SIZE, 73 82 ); 74 83 const raw = JSON.parse(new TextDecoder().decode(responseBytes)); 75 84 return deserializeResponse(raw);
+11
src/replication/challenge-response/types.ts
··· 18 18 export const CHALLENGE_PROTOCOL_VERSION = 1; 19 19 20 20 // ============================================ 21 + // Size / count limits for abuse prevention 22 + // ============================================ 23 + 24 + /** Maximum number of record paths in a single challenge. */ 25 + export const MAX_RECORD_PATHS = 10; 26 + /** Maximum number of block CIDs in a single challenge. */ 27 + export const MAX_BLOCK_CIDS = 20; 28 + /** Maximum serialized challenge size in bytes (64 KB). */ 29 + export const MAX_CHALLENGE_SIZE = 65536; 30 + 31 + // ============================================ 21 32 // Challenge types 22 33 // ============================================ 23 34
+8
src/replication/e2e-multi-node.test.ts
··· 50 50 FIREHOSE_ENABLED: false, 51 51 NODE_DID: "did:web:test.example.com", 52 52 NODE_MANAGERS: [], 53 + RATE_LIMIT_ENABLED: false, 54 + RATE_LIMIT_READ_PER_MIN: 300, 55 + RATE_LIMIT_SYNC_PER_MIN: 30, 56 + RATE_LIMIT_SESSION_PER_MIN: 10, 57 + RATE_LIMIT_WRITE_PER_MIN: 200, 58 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 59 + RATE_LIMIT_MAX_CONNECTIONS: 100, 60 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 53 61 }; 54 62 } 55 63
+8
src/replication/firehose-incremental.test.ts
··· 64 64 FIREHOSE_ENABLED: false, 65 65 NODE_DID: "did:web:local.example.com", 66 66 NODE_MANAGERS: [], 67 + RATE_LIMIT_ENABLED: false, 68 + RATE_LIMIT_READ_PER_MIN: 300, 69 + RATE_LIMIT_SYNC_PER_MIN: 30, 70 + RATE_LIMIT_SESSION_PER_MIN: 10, 71 + RATE_LIMIT_WRITE_PER_MIN: 200, 72 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 73 + RATE_LIMIT_MAX_CONNECTIONS: 100, 74 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 67 75 }; 68 76 } 69 77
+24
src/replication/gossipsub-notifications.test.ts
··· 299 299 FIREHOSE_ENABLED: false, 300 300 NODE_DID: "did:web:test.example.com", 301 301 NODE_MANAGERS: [], 302 + RATE_LIMIT_ENABLED: false, 303 + RATE_LIMIT_READ_PER_MIN: 300, 304 + RATE_LIMIT_SYNC_PER_MIN: 30, 305 + RATE_LIMIT_SESSION_PER_MIN: 10, 306 + RATE_LIMIT_WRITE_PER_MIN: 200, 307 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 308 + RATE_LIMIT_MAX_CONNECTIONS: 100, 309 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 302 310 }; 303 311 304 312 const { RepoManager } = await import("../repo-manager.js"); ··· 359 367 FIREHOSE_ENABLED: false, 360 368 NODE_DID: "did:web:test.example.com", 361 369 NODE_MANAGERS: [], 370 + RATE_LIMIT_ENABLED: false, 371 + RATE_LIMIT_READ_PER_MIN: 300, 372 + RATE_LIMIT_SYNC_PER_MIN: 30, 373 + RATE_LIMIT_SESSION_PER_MIN: 10, 374 + RATE_LIMIT_WRITE_PER_MIN: 200, 375 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 376 + RATE_LIMIT_MAX_CONNECTIONS: 100, 377 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 362 378 }; 363 379 364 380 const { RepoManager } = await import("../repo-manager.js"); ··· 441 457 FIREHOSE_ENABLED: false, 442 458 NODE_DID: "did:web:test.example.com", 443 459 NODE_MANAGERS: [], 460 + RATE_LIMIT_ENABLED: false, 461 + RATE_LIMIT_READ_PER_MIN: 300, 462 + RATE_LIMIT_SYNC_PER_MIN: 30, 463 + RATE_LIMIT_SESSION_PER_MIN: 10, 464 + RATE_LIMIT_WRITE_PER_MIN: 200, 465 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 466 + RATE_LIMIT_MAX_CONNECTIONS: 100, 467 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 444 468 }; 445 469 446 470 const { RepoManager } = await import("../repo-manager.js");
+8
src/replication/mst-proof.test.ts
··· 30 30 FIREHOSE_ENABLED: false, 31 31 NODE_DID: "did:web:test.example.com", 32 32 NODE_MANAGERS: [], 33 + RATE_LIMIT_ENABLED: false, 34 + RATE_LIMIT_READ_PER_MIN: 300, 35 + RATE_LIMIT_SYNC_PER_MIN: 30, 36 + RATE_LIMIT_SESSION_PER_MIN: 10, 37 + RATE_LIMIT_WRITE_PER_MIN: 200, 38 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 39 + RATE_LIMIT_MAX_CONNECTIONS: 100, 40 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 33 41 }; 34 42 } 35 43
+8
src/replication/offer-manager.test.ts
··· 32 32 FIREHOSE_ENABLED: false, 33 33 NODE_DID: "did:web:test.example.com", 34 34 NODE_MANAGERS: [], 35 + RATE_LIMIT_ENABLED: false, 36 + RATE_LIMIT_READ_PER_MIN: 300, 37 + RATE_LIMIT_SYNC_PER_MIN: 30, 38 + RATE_LIMIT_SESSION_PER_MIN: 10, 39 + RATE_LIMIT_WRITE_PER_MIN: 200, 40 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 41 + RATE_LIMIT_MAX_CONNECTIONS: 100, 42 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 35 43 }; 36 44 } 37 45
+8
src/replication/peer-freshness.test.ts
··· 46 46 FIREHOSE_ENABLED: false, 47 47 NODE_DID: "did:web:test.example.com", 48 48 NODE_MANAGERS: [], 49 + RATE_LIMIT_ENABLED: false, 50 + RATE_LIMIT_READ_PER_MIN: 300, 51 + RATE_LIMIT_SYNC_PER_MIN: 30, 52 + RATE_LIMIT_SESSION_PER_MIN: 10, 53 + RATE_LIMIT_WRITE_PER_MIN: 200, 54 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 55 + RATE_LIMIT_MAX_CONNECTIONS: 100, 56 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 49 57 }; 50 58 } 51 59
+8
src/replication/policy-integration.test.ts
··· 54 54 FIREHOSE_ENABLED: false, 55 55 NODE_DID: "did:web:test.example.com", 56 56 NODE_MANAGERS: [], 57 + RATE_LIMIT_ENABLED: false, 58 + RATE_LIMIT_READ_PER_MIN: 300, 59 + RATE_LIMIT_SYNC_PER_MIN: 30, 60 + RATE_LIMIT_SESSION_PER_MIN: 10, 61 + RATE_LIMIT_WRITE_PER_MIN: 200, 62 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 63 + RATE_LIMIT_MAX_CONNECTIONS: 100, 64 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 57 65 }; 58 66 } 59 67
+8
src/replication/replication.test.ts
··· 57 57 FIREHOSE_ENABLED: false, 58 58 NODE_DID: "did:web:test.example.com", 59 59 NODE_MANAGERS: [], 60 + RATE_LIMIT_ENABLED: false, 61 + RATE_LIMIT_READ_PER_MIN: 300, 62 + RATE_LIMIT_SYNC_PER_MIN: 30, 63 + RATE_LIMIT_SESSION_PER_MIN: 10, 64 + RATE_LIMIT_WRITE_PER_MIN: 200, 65 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 66 + RATE_LIMIT_MAX_CONNECTIONS: 100, 67 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 60 68 }; 61 69 } 62 70
+40 -1
src/server.ts
··· 23 23 import type { ChallengeTransport } from "./replication/challenge-response/transport.js"; 24 24 import type { Libp2p } from "@libp2p/interface"; 25 25 import { loadOrCreateNodeIdentity, getPublicKeyMultibase } from "./node-identity.js"; 26 + import { RateLimiter } from "./rate-limiter.js"; 26 27 27 28 // Load configuration 28 29 const config = loadConfig(); 30 + 31 + // Initialize rate limiter 32 + let rateLimiter: RateLimiter | undefined; 33 + if (config.RATE_LIMIT_ENABLED) { 34 + rateLimiter = new RateLimiter(); 35 + rateLimiter.startCleanup(60_000); 36 + } 29 37 30 38 // Ensure data directory exists 31 39 const dataDir = resolve(config.DATA_DIR); ··· 128 136 replicationManager.setReplicatedRepoReader(replicatedRepoReader); 129 137 } 130 138 139 + // Pass rate limiter to IPFS service for gossipsub rate limiting 140 + if (rateLimiter && ipfsService) { 141 + ipfsService.setRateLimiter(rateLimiter); 142 + } 143 + 131 144 // Create Hono app 132 145 const app = createApp( 133 146 config, ··· 143 156 nodeRepoManager, 144 157 repoManager, 145 158 }, 159 + rateLimiter, 146 160 ); 147 161 148 162 // Create HTTP server using @hono/node-server's request listener 149 163 const requestListener = getRequestListener(app.fetch); 150 164 const httpServer = createServer(requestListener); 151 165 152 - // Set up WebSocket server for firehose 166 + // Set up WebSocket server for firehose with per-IP connection limits 153 167 const wss = new WebSocketServer({ noServer: true }); 168 + const firehoseConnections = new Map<string, number>(); 169 + const maxFirehosePerIp = config.RATE_LIMIT_FIREHOSE_PER_IP; 154 170 155 171 httpServer.on("upgrade", (request, socket, head) => { 156 172 const url = new URL(request.url ?? "/", `http://localhost:${config.PORT}`); 157 173 158 174 if (url.pathname === "/xrpc/com.atproto.sync.subscribeRepos") { 175 + const ip = 176 + (request.headers["x-forwarded-for"] as string)?.split(",")[0]?.trim() ?? 177 + request.headers["x-real-ip"] as string ?? 178 + "unknown"; 179 + 180 + const current = firehoseConnections.get(ip) ?? 0; 181 + if (config.RATE_LIMIT_ENABLED && current >= maxFirehosePerIp) { 182 + socket.destroy(); 183 + return; 184 + } 185 + 159 186 wss.handleUpgrade(request, socket, head, (ws) => { 187 + firehoseConnections.set(ip, (firehoseConnections.get(ip) ?? 0) + 1); 188 + ws.on("close", () => { 189 + const count = (firehoseConnections.get(ip) ?? 1) - 1; 190 + if (count <= 0) { 191 + firehoseConnections.delete(ip); 192 + } else { 193 + firehoseConnections.set(ip, count); 194 + } 195 + }); 160 196 firehose.handleConnection(ws, request); 161 197 }); 162 198 } else { ··· 267 303 function shutdown() { 268 304 console.log(pc.dim("\nShutting down...")); 269 305 const cleanup = async () => { 306 + if (rateLimiter) { 307 + rateLimiter.stop(); 308 + } 270 309 if (replicationManager) { 271 310 replicationManager.stop(); 272 311 }
+8
src/xrpc/admin-e2e.test.ts
··· 47 47 FIREHOSE_ENABLED: false, 48 48 NODE_DID: "did:web:test.example.com", 49 49 NODE_MANAGERS: [], 50 + RATE_LIMIT_ENABLED: false, 51 + RATE_LIMIT_READ_PER_MIN: 300, 52 + RATE_LIMIT_SYNC_PER_MIN: 30, 53 + RATE_LIMIT_SESSION_PER_MIN: 10, 54 + RATE_LIMIT_WRITE_PER_MIN: 200, 55 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 56 + RATE_LIMIT_MAX_CONNECTIONS: 100, 57 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 50 58 }; 51 59 } 52 60
+8
src/xrpc/admin.test.ts
··· 33 33 FIREHOSE_ENABLED: false, 34 34 NODE_DID: "did:web:test.example.com", 35 35 NODE_MANAGERS: [], 36 + RATE_LIMIT_ENABLED: false, 37 + RATE_LIMIT_READ_PER_MIN: 300, 38 + RATE_LIMIT_SYNC_PER_MIN: 30, 39 + RATE_LIMIT_SESSION_PER_MIN: 10, 40 + RATE_LIMIT_WRITE_PER_MIN: 200, 41 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 42 + RATE_LIMIT_MAX_CONNECTIONS: 100, 43 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 36 44 }; 37 45 } 38 46