atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor server.ts into testable startServer() + integration tests

Extract startup logic from server.ts into src/start.ts with an exported
startServer(config, opts?) function that returns a ServerHandle for
programmatic control. This enables testing the full startup sequence
(DB, IPFS, replication, HTTP) in vitest and supports the Tauri sidecar
use case. Add server-startup.test.ts with 5 integration tests, a
scripts/smoke-test.sh for manual/CI smoke testing, and an npm
smoke-test script.

+662 -305
+2 -1
package.json
··· 8 8 "dev": "tsx watch src/server.ts", 9 9 "build": "tsc", 10 10 "start": "node dist/server.js", 11 - "test": "vitest run" 11 + "test": "vitest run", 12 + "smoke-test": "bash scripts/smoke-test.sh" 12 13 }, 13 14 "dependencies": { 14 15 "@atcute/atproto": "^3.1.10",
+139
scripts/smoke-test.sh
··· 1 + #!/usr/bin/env bash 2 + # 3 + # Smoke test: build, start server, verify core HTTP endpoints, shut down. 4 + # Usage: bash scripts/smoke-test.sh 5 + # 6 + 7 + set -euo pipefail 8 + 9 + TIMEOUT=30 10 + PASS=0 11 + FAIL=0 12 + SERVER_PID="" 13 + TMPDIR_PATH="" 14 + 15 + cleanup() { 16 + if [ -n "$SERVER_PID" ] && kill -0 "$SERVER_PID" 2>/dev/null; then 17 + kill "$SERVER_PID" 2>/dev/null || true 18 + wait "$SERVER_PID" 2>/dev/null || true 19 + fi 20 + if [ -n "$TMPDIR_PATH" ] && [ -d "$TMPDIR_PATH" ]; then 21 + rm -rf "$TMPDIR_PATH" 22 + fi 23 + } 24 + trap cleanup EXIT 25 + 26 + # 1. Build 27 + echo "==> Building..." 28 + npm run build 29 + 30 + # 2. Create temp data dir and env 31 + TMPDIR_PATH=$(mktemp -d) 32 + PORT=0 # will be replaced — find a free port 33 + # Find a free port 34 + PORT=$(python3 -c 'import socket; s=socket.socket(); s.bind(("",0)); print(s.getsockname()[1]); s.close()') 35 + 36 + # Export env vars so loadConfig() picks them up (overrides any .env in cwd) 37 + export PDS_HOSTNAME=smoke.test 38 + export AUTH_TOKEN=smoke-test-token 39 + export JWT_SECRET=smoke-jwt-secret 40 + export PASSWORD_HASH='$2a$10$test' 41 + export DID=did:plc:smoketest 42 + export SIGNING_KEY=0000000000000000000000000000000000000000000000000000000000000001 43 + export DATA_DIR="$TMPDIR_PATH/data" 44 + export PORT=$PORT 45 + export IPFS_ENABLED=true 46 + export IPFS_NETWORKING=false 47 + export FIREHOSE_ENABLED=false 48 + export RATE_LIMIT_ENABLED=false 49 + export OAUTH_ENABLED=false 50 + 51 + echo "==> Starting server on port $PORT..." 52 + node dist/server.js & 53 + SERVER_PID=$! 54 + 55 + BASE_URL="http://localhost:$PORT" 56 + 57 + # 3. Poll health endpoint until ready 58 + echo "==> Waiting for server (timeout ${TIMEOUT}s)..." 59 + ELAPSED=0 60 + while [ $ELAPSED -lt $TIMEOUT ]; do 61 + if curl -sf "$BASE_URL/xrpc/_health" > /dev/null 2>&1; then 62 + echo " Server is up after ${ELAPSED}s" 63 + break 64 + fi 65 + sleep 1 66 + ELAPSED=$((ELAPSED + 1)) 67 + done 68 + 69 + if [ $ELAPSED -ge $TIMEOUT ]; then 70 + echo "FAIL: Server did not become healthy within ${TIMEOUT}s" 71 + exit 1 72 + fi 73 + 74 + # Helper: check an endpoint 75 + check() { 76 + local label="$1" 77 + local url="$2" 78 + local expected_status="$3" 79 + local expected_content="${4:-}" 80 + local auth="${5:-}" 81 + 82 + local curl_args=(-s -o /tmp/smoke-body -w "%{http_code}") 83 + if [ -n "$auth" ]; then 84 + curl_args+=(-H "Authorization: Bearer $auth") 85 + fi 86 + 87 + local status 88 + status=$(curl "${curl_args[@]}" "$url") 89 + 90 + if [ "$status" != "$expected_status" ]; then 91 + echo " FAIL: $label — expected $expected_status, got $status" 92 + FAIL=$((FAIL + 1)) 93 + return 94 + fi 95 + 96 + if [ -n "$expected_content" ]; then 97 + if ! grep -q "$expected_content" /tmp/smoke-body; then 98 + echo " FAIL: $label — response missing '$expected_content'" 99 + FAIL=$((FAIL + 1)) 100 + return 101 + fi 102 + fi 103 + 104 + echo " PASS: $label" 105 + PASS=$((PASS + 1)) 106 + } 107 + 108 + # 4. Run checks 109 + echo "==> Running endpoint checks..." 110 + check "_health" \ 111 + "$BASE_URL/xrpc/_health" \ 112 + "200" \ 113 + '"status":"ok"' 114 + 115 + check "admin.dashboard" \ 116 + "$BASE_URL/xrpc/org.p2pds.admin.dashboard" \ 117 + "200" \ 118 + "P2PDS" 119 + 120 + check "admin.getOverview" \ 121 + "$BASE_URL/xrpc/org.p2pds.admin.getOverview" \ 122 + "200" \ 123 + '"version"' \ 124 + "smoke-test-token" 125 + 126 + check "admin.getNetworkStatus" \ 127 + "$BASE_URL/xrpc/org.p2pds.admin.getNetworkStatus" \ 128 + "200" \ 129 + "" \ 130 + "smoke-test-token" 131 + 132 + # 5. Summary 133 + echo 134 + echo "==> Results: $PASS passed, $FAIL failed" 135 + if [ $FAIL -gt 0 ]; then 136 + echo "FAIL" 137 + exit 1 138 + fi 139 + echo "PASS"
+171
src/server-startup.test.ts
··· 1 + /** 2 + * Integration tests for the full server startup sequence. 3 + * 4 + * Uses startServer() with a test config (tmpDir, port 0, networking off, 5 + * firehose off, OAuth off) + mock PDS to validate the complete HTTP surface. 6 + */ 7 + 8 + import { describe, it, expect, afterEach } from "vitest"; 9 + import { mkdtempSync, rmSync } from "node:fs"; 10 + import { tmpdir } from "node:os"; 11 + import { join } from "node:path"; 12 + 13 + import type { Config } from "./config.js"; 14 + import { startServer, type ServerHandle } from "./start.js"; 15 + import { 16 + createTestRepo, 17 + startMockPds, 18 + createMockDidResolver, 19 + type MockPds, 20 + } from "./replication/test-helpers.js"; 21 + 22 + const TEST_DID = "did:plc:testuser1"; 23 + 24 + function testConfig(dataDir: string, replicateDids: string[] = []): Config { 25 + return { 26 + DID: "did:plc:localnode", 27 + HANDLE: "local.test", 28 + PDS_HOSTNAME: "local.test", 29 + AUTH_TOKEN: "test-auth-token", 30 + SIGNING_KEY: 31 + "0000000000000000000000000000000000000000000000000000000000000001", 32 + SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 33 + JWT_SECRET: "test-jwt-secret", 34 + PASSWORD_HASH: "$2a$10$test", 35 + DATA_DIR: dataDir, 36 + PORT: 0, // OS-assigned random port 37 + IPFS_ENABLED: true, 38 + IPFS_NETWORKING: false, 39 + REPLICATE_DIDS: replicateDids, 40 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 41 + FIREHOSE_ENABLED: false, 42 + RATE_LIMIT_ENABLED: false, 43 + RATE_LIMIT_READ_PER_MIN: 300, 44 + RATE_LIMIT_SYNC_PER_MIN: 30, 45 + RATE_LIMIT_SESSION_PER_MIN: 10, 46 + RATE_LIMIT_WRITE_PER_MIN: 200, 47 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 48 + RATE_LIMIT_MAX_CONNECTIONS: 100, 49 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 50 + OAUTH_ENABLED: false, 51 + }; 52 + } 53 + 54 + describe("server startup integration", () => { 55 + let tmpDir: string; 56 + let handle: ServerHandle | undefined; 57 + let mockPds: MockPds | undefined; 58 + 59 + afterEach(async () => { 60 + if (handle) { 61 + await handle.close(); 62 + handle = undefined; 63 + } 64 + if (mockPds) { 65 + await mockPds.close(); 66 + mockPds = undefined; 67 + } 68 + if (tmpDir) { 69 + rmSync(tmpDir, { recursive: true, force: true }); 70 + } 71 + }); 72 + 73 + it("health check returns 200", async () => { 74 + tmpDir = mkdtempSync(join(tmpdir(), "server-startup-")); 75 + const config = testConfig(tmpDir); 76 + handle = await startServer(config); 77 + 78 + const res = await fetch(`${handle.url}/xrpc/_health`); 79 + expect(res.status).toBe(200); 80 + const body = (await res.json()) as { status: string; version: string }; 81 + expect(body.status).toBe("ok"); 82 + expect(body.version).toBeTruthy(); 83 + }); 84 + 85 + it("dashboard HTML returns 200 with expected content", async () => { 86 + tmpDir = mkdtempSync(join(tmpdir(), "server-startup-")); 87 + const config = testConfig(tmpDir); 88 + handle = await startServer(config); 89 + 90 + const res = await fetch(`${handle.url}/xrpc/org.p2pds.admin.dashboard`); 91 + expect(res.status).toBe(200); 92 + const html = await res.text(); 93 + expect(html).toContain("P2PDS"); 94 + }); 95 + 96 + it("admin getOverview returns 200 with version and replication state", async () => { 97 + tmpDir = mkdtempSync(join(tmpdir(), "server-startup-")); 98 + const config = testConfig(tmpDir); 99 + handle = await startServer(config); 100 + 101 + const res = await fetch(`${handle.url}/xrpc/org.p2pds.admin.getOverview`, { 102 + headers: { Authorization: `Bearer ${config.AUTH_TOKEN}` }, 103 + }); 104 + expect(res.status).toBe(200); 105 + const body = (await res.json()) as { version: string; replication: unknown }; 106 + expect(body.version).toBeTruthy(); 107 + expect(body.replication).toBeDefined(); 108 + }); 109 + 110 + it("add DID, sync, and verify in overview", async () => { 111 + tmpDir = mkdtempSync(join(tmpdir(), "server-startup-")); 112 + 113 + // Create a mock PDS with a test repo 114 + const carBytes = await createTestRepo(TEST_DID, [ 115 + { collection: "app.bsky.feed.post", rkey: "abc123", record: { text: "hello", createdAt: new Date().toISOString() } }, 116 + ]); 117 + mockPds = await startMockPds([{ did: TEST_DID, carBytes }]); 118 + const mockResolver = createMockDidResolver({ [TEST_DID]: mockPds.url }); 119 + 120 + const config = testConfig(tmpDir); 121 + handle = await startServer(config, { didResolver: mockResolver }); 122 + 123 + // Add the DID via admin API 124 + const addRes = await fetch(`${handle.url}/xrpc/org.p2pds.admin.addDid`, { 125 + method: "POST", 126 + headers: { 127 + Authorization: `Bearer ${config.AUTH_TOKEN}`, 128 + "Content-Type": "application/json", 129 + }, 130 + body: JSON.stringify({ did: TEST_DID }), 131 + }); 132 + expect(addRes.status).toBe(200); 133 + 134 + // Trigger sync 135 + const syncRes = await fetch(`${handle.url}/xrpc/org.p2pds.replication.syncNow`, { 136 + method: "POST", 137 + headers: { Authorization: `Bearer ${config.AUTH_TOKEN}` }, 138 + }); 139 + expect(syncRes.status).toBe(200); 140 + 141 + // Wait a bit for async sync to complete 142 + await new Promise((r) => setTimeout(r, 2000)); 143 + 144 + // Check overview — the DID should appear in replication state 145 + const overviewRes = await fetch(`${handle.url}/xrpc/org.p2pds.admin.getOverview`, { 146 + headers: { Authorization: `Bearer ${config.AUTH_TOKEN}` }, 147 + }); 148 + expect(overviewRes.status).toBe(200); 149 + const overview = (await overviewRes.json()) as { replication: { trackedDids: string[] } }; 150 + expect(overview.replication.trackedDids.length).toBeGreaterThanOrEqual(1); 151 + }, 15_000); 152 + 153 + it("graceful shutdown completes cleanly", async () => { 154 + tmpDir = mkdtempSync(join(tmpdir(), "server-startup-")); 155 + const config = testConfig(tmpDir); 156 + handle = await startServer(config); 157 + 158 + // Verify server is running 159 + const res = await fetch(`${handle.url}/xrpc/_health`); 160 + expect(res.status).toBe(200); 161 + 162 + // Close and verify it doesn't throw 163 + await handle.close(); 164 + handle = undefined; // Prevent double-close in afterEach 165 + 166 + // Verify server is no longer responding 167 + await expect( 168 + fetch(`http://localhost:${0}/xrpc/_health`).then((r) => r.status), 169 + ).rejects.toThrow(); 170 + }); 171 + });
+7 -304
src/server.ts
··· 1 - import { getRequestListener } from "@hono/node-server"; 2 - import { createServer } from "node:http"; 3 - import { mkdirSync } from "node:fs"; 4 - import { resolve } from "node:path"; 5 - import Database from "better-sqlite3"; 6 - import { WebSocketServer } from "ws"; 7 - import pc from "picocolors"; 1 + import { loadConfig } from "./config.js"; 2 + import { startServer } from "./start.js"; 8 3 9 - import { loadConfig, loadPolicies } from "./config.js"; 10 - import { RepoManager } from "./repo-manager.js"; 11 - import { BlobStore } from "./blobs.js"; 12 - import { Firehose } from "./firehose.js"; 13 - import { IpfsService } from "./ipfs.js"; 14 - import { createApp } from "./index.js"; 15 - import { DidResolver } from "./did-resolver.js"; 16 - import { InMemoryDidCache } from "./did-cache.js"; 17 - import { ReplicationManager } from "./replication/replication-manager.js"; 18 - import { ReplicatedRepoReader } from "./replication/replicated-repo-reader.js"; 19 - import { PolicyEngine } from "./policy/engine.js"; 20 - import { HttpChallengeTransport } from "./replication/challenge-response/http-transport.js"; 21 - import { Libp2pChallengeTransport } from "./replication/challenge-response/libp2p-transport.js"; 22 - import { FailoverChallengeTransport } from "./replication/challenge-response/failover-transport.js"; 23 - import type { ChallengeTransport } from "./replication/challenge-response/transport.js"; 24 - import type { Libp2p } from "@libp2p/interface"; 25 - import { RateLimiter } from "./rate-limiter.js"; 26 - import { createOAuthClient, type OAuthClientManager } from "./oauth/client.js"; 27 - import { PdsClient } from "./oauth/pds-client.js"; 28 - 29 - // Load configuration 30 4 const config = loadConfig(); 31 - 32 - // Initialize rate limiter 33 - let rateLimiter: RateLimiter | undefined; 34 - if (config.RATE_LIMIT_ENABLED) { 35 - rateLimiter = new RateLimiter(); 36 - rateLimiter.startCleanup(60_000); 37 - } 38 - 39 - // Ensure data directory exists 40 - const dataDir = resolve(config.DATA_DIR); 41 - mkdirSync(dataDir, { recursive: true }); 42 - 43 - // Initialize SQLite database 44 - const dbPath = resolve(dataDir, "pds.db"); 45 - const db = new Database(dbPath); 46 - db.pragma("journal_mode = WAL"); 47 - db.pragma("synchronous = NORMAL"); 48 - 49 - // Initialize IPFS service (if enabled) 50 - let ipfsService: IpfsService | undefined; 51 - if (config.IPFS_ENABLED) { 52 - const ipfsBlocksPath = resolve(dataDir, "ipfs", "blocks"); 53 - const ipfsDatastorePath = resolve(dataDir, "ipfs", "datastore"); 54 - mkdirSync(ipfsBlocksPath, { recursive: true }); 55 - mkdirSync(ipfsDatastorePath, { recursive: true }); 5 + const handle = await startServer(config); 56 6 57 - ipfsService = new IpfsService({ 58 - blocksPath: ipfsBlocksPath, 59 - datastorePath: ipfsDatastorePath, 60 - networking: config.IPFS_NETWORKING, 61 - }); 62 - } 63 - 64 - // Initialize repo manager (requires DID + signing key) 65 - let repoManager: RepoManager | undefined; 66 - let blobStore: BlobStore | undefined; 67 - if (config.DID && config.SIGNING_KEY) { 68 - repoManager = new RepoManager(db, config as typeof config & { DID: string; SIGNING_KEY: string }); 69 - blobStore = new BlobStore(dataDir, config.DID); 70 - repoManager.init(blobStore, ipfsService, ipfsService); 71 - } 72 - 73 - // Initialize firehose 74 - const firehose = new Firehose(repoManager!); 75 - 76 - // Initialize DID resolver 77 - const didResolver = new DidResolver({ 78 - didCache: new InMemoryDidCache(), 7 + process.on("SIGINT", () => { 8 + handle.close().finally(() => process.exit(0)); 79 9 }); 80 - 81 - // Load policy engine if configured 82 - let policyEngine: PolicyEngine | undefined; 83 - if (config.POLICY_FILE) { 84 - const policySet = loadPolicies(config); 85 - if (policySet) { 86 - policyEngine = new PolicyEngine(policySet); 87 - console.log(pc.dim(` Policies: loaded ${policySet.policies.length} from ${config.POLICY_FILE}`)); 88 - } 89 - } 90 - 91 - // Initialize OAuth client (if enabled) 92 - let oauthClientManager: OAuthClientManager | undefined; 93 - let pdsClient: PdsClient | undefined; 94 - if (config.OAUTH_ENABLED) { 95 - oauthClientManager = await createOAuthClient(db, config); 96 - if (config.DID) { 97 - pdsClient = new PdsClient(oauthClientManager.client, config.DID); 98 - } 99 - } 100 - 101 - // Initialize replication manager and replicated repo reader (if IPFS enabled) 102 - let replicationManager: ReplicationManager | undefined; 103 - let replicatedRepoReader: ReplicatedRepoReader | undefined; 104 - if (ipfsService && repoManager) { 105 - replicationManager = new ReplicationManager( 106 - db, 107 - config, 108 - repoManager, 109 - ipfsService, 110 - ipfsService, 111 - didResolver, 112 - undefined, 113 - undefined, 114 - policyEngine, 115 - pdsClient, 116 - ); 117 - replicatedRepoReader = new ReplicatedRepoReader( 118 - ipfsService, 119 - replicationManager.getSyncStorage(), 120 - ); 121 - replicationManager.setReplicatedRepoReader(replicatedRepoReader); 122 - } 123 - 124 - // Pass rate limiter to IPFS service for gossipsub rate limiting 125 - if (rateLimiter && ipfsService) { 126 - ipfsService.setRateLimiter(rateLimiter); 127 - } 128 - 129 - // Create Hono app 130 - const app = createApp( 131 - config, 132 - firehose, 133 - ipfsService, 134 - ipfsService, 135 - blobStore, 136 - replicationManager, 137 - replicatedRepoReader, 138 - repoManager, 139 - rateLimiter, 140 - oauthClientManager, 141 - pdsClient, 142 - ); 143 - 144 - // Create HTTP server using @hono/node-server's request listener 145 - const requestListener = getRequestListener(app.fetch); 146 - const httpServer = createServer(requestListener); 147 - 148 - // Set up WebSocket server for firehose with per-IP connection limits 149 - const wss = new WebSocketServer({ noServer: true }); 150 - const firehoseConnections = new Map<string, number>(); 151 - const maxFirehosePerIp = config.RATE_LIMIT_FIREHOSE_PER_IP; 152 - 153 - httpServer.on("upgrade", (request, socket, head) => { 154 - const url = new URL(request.url ?? "/", `http://localhost:${config.PORT}`); 155 - 156 - if (url.pathname === "/xrpc/com.atproto.sync.subscribeRepos") { 157 - const ip = 158 - (request.headers["x-forwarded-for"] as string)?.split(",")[0]?.trim() ?? 159 - request.headers["x-real-ip"] as string ?? 160 - "unknown"; 161 - 162 - const current = firehoseConnections.get(ip) ?? 0; 163 - if (config.RATE_LIMIT_ENABLED && current >= maxFirehosePerIp) { 164 - socket.destroy(); 165 - return; 166 - } 167 - 168 - wss.handleUpgrade(request, socket, head, (ws) => { 169 - firehoseConnections.set(ip, (firehoseConnections.get(ip) ?? 0) + 1); 170 - ws.on("close", () => { 171 - const count = (firehoseConnections.get(ip) ?? 1) - 1; 172 - if (count <= 0) { 173 - firehoseConnections.delete(ip); 174 - } else { 175 - firehoseConnections.set(ip, count); 176 - } 177 - }); 178 - firehose.handleConnection(ws, request); 179 - }); 180 - } else { 181 - socket.destroy(); 182 - } 10 + process.on("SIGTERM", () => { 11 + handle.close().finally(() => process.exit(0)); 183 12 }); 184 - 185 - // Backfill existing blocks to IPFS 186 - async function backfillIpfs(): Promise<void> { 187 - if (!ipfsService) return; 188 - 189 - const rows = db 190 - .prepare("SELECT cid, bytes FROM blocks") 191 - .all() as Array<{ cid: string; bytes: Buffer }>; 192 - 193 - if (rows.length === 0) return; 194 - 195 - let count = 0; 196 - for (const row of rows) { 197 - const hasIt = await ipfsService.hasBlock(row.cid); 198 - if (!hasIt) { 199 - await ipfsService.putBlock(row.cid, new Uint8Array(row.bytes)); 200 - count++; 201 - } 202 - } 203 - 204 - if (count > 0) { 205 - console.log(pc.dim(` IPFS: backfilled ${count} blocks`)); 206 - } 207 - } 208 - 209 - // Start server 210 - httpServer.listen(config.PORT, async () => { 211 - console.log( 212 - pc.bold(`\nP2PDS running at `) + 213 - pc.cyan(`http://localhost:${config.PORT}`), 214 - ); 215 - if (config.DID) { 216 - console.log(pc.dim(` DID: ${config.DID}`)); 217 - } 218 - if (config.HANDLE) { 219 - console.log(pc.dim(` Handle: @${config.HANDLE}`)); 220 - } 221 - console.log(pc.dim(` Data: ${dataDir}`)); 222 - if (oauthClientManager) { 223 - if (pdsClient && await pdsClient.hasSession().catch(() => false)) { 224 - console.log(pc.dim(` OAuth: session active for ${config.DID}`)); 225 - } else { 226 - console.log(pc.dim(` OAuth: enabled (no active session)`)); 227 - } 228 - } 229 - 230 - // Start IPFS after HTTP server is listening (IPFS startup can be slow) 231 - if (ipfsService) { 232 - try { 233 - console.log(pc.dim(` IPFS: starting...`)); 234 - await ipfsService.start(); 235 - const peerId = ipfsService.getPeerId(); 236 - if (peerId) { 237 - console.log(pc.dim(` PeerID: ${peerId}`)); 238 - } else { 239 - console.log(pc.dim(` IPFS: local blockstore only (networking disabled)`)); 240 - } 241 - await backfillIpfs(); 242 - // Start replication after IPFS is ready 243 - if (replicationManager) { 244 - try { 245 - await replicationManager.init(); 246 - replicationManager.startPeriodicSync(); 247 - const trackedDids = replicationManager.getReplicateDids(); 248 - console.log(pc.dim(` Replication: tracking ${trackedDids.length} DIDs`)); 249 - // Start firehose subscription for real-time updates 250 - if (config.FIREHOSE_ENABLED) { 251 - replicationManager.startFirehose(); 252 - } 253 - // Start challenge scheduler if policy engine is available 254 - if (policyEngine) { 255 - const libp2pNode = ipfsService?.getLibp2p(); 256 - let challengeTransport: ChallengeTransport; 257 - if (libp2pNode) { 258 - const libp2pTransport = new Libp2pChallengeTransport(libp2pNode as Libp2p); 259 - const httpTransport = new HttpChallengeTransport(); 260 - const syncStorage = replicationManager.getSyncStorage(); 261 - challengeTransport = new FailoverChallengeTransport( 262 - libp2pTransport, 263 - httpTransport, 264 - { 265 - resolveEndpoint: (httpEndpoint) => syncStorage.getMultiaddrForPdsEndpoint(httpEndpoint), 266 - onFallback: (endpoint, error) => { 267 - console.log(pc.dim(` Challenge: libp2p failed for ${endpoint}, falling back to HTTP: ${error.message}`)); 268 - replicationManager!.refreshPeerInfoForEndpoint(endpoint); 269 - }, 270 - }, 271 - ); 272 - } else { 273 - challengeTransport = new HttpChallengeTransport(); 274 - } 275 - replicationManager.startChallengeScheduler(challengeTransport); 276 - console.log(pc.dim(` Challenges: scheduler started (${libp2pNode ? "libp2p+HTTP failover" : "HTTP"} transport)`)); 277 - } 278 - } catch (err) { 279 - console.error(pc.red(` Replication startup failed:`), err); 280 - } 281 - } 282 - } catch (err) { 283 - console.error(pc.red(` IPFS startup failed:`), err); 284 - } 285 - } 286 - 287 - console.log(); 288 - }); 289 - 290 - // Graceful shutdown 291 - function shutdown() { 292 - console.log(pc.dim("\nShutting down...")); 293 - const cleanup = async () => { 294 - if (rateLimiter) { 295 - rateLimiter.stop(); 296 - } 297 - if (replicationManager) { 298 - replicationManager.stop(); 299 - } 300 - if (ipfsService) { 301 - await ipfsService.stop(); 302 - } 303 - db.close(); 304 - }; 305 - cleanup().finally(() => process.exit(0)); 306 - } 307 - 308 - process.on("SIGINT", shutdown); 309 - process.on("SIGTERM", shutdown);
+343
src/start.ts
··· 1 + /** 2 + * Extracted server startup logic — importable and testable. 3 + * 4 + * startServer(config, opts?) does everything server.ts used to do at module scope: 5 + * creates DB, IPFS, replication, HTTP server, etc. and returns a ServerHandle 6 + * for programmatic control (tests, Tauri sidecar, etc.). 7 + */ 8 + 9 + import { getRequestListener } from "@hono/node-server"; 10 + import { createServer, type Server } from "node:http"; 11 + import { mkdirSync } from "node:fs"; 12 + import { resolve } from "node:path"; 13 + import Database from "better-sqlite3"; 14 + import { WebSocketServer } from "ws"; 15 + import pc from "picocolors"; 16 + 17 + import { loadPolicies } from "./config.js"; 18 + import type { Config } from "./config.js"; 19 + import { RepoManager } from "./repo-manager.js"; 20 + import { BlobStore } from "./blobs.js"; 21 + import { Firehose } from "./firehose.js"; 22 + import { IpfsService } from "./ipfs.js"; 23 + import { createApp } from "./index.js"; 24 + import { DidResolver } from "./did-resolver.js"; 25 + import { InMemoryDidCache } from "./did-cache.js"; 26 + import { ReplicationManager } from "./replication/replication-manager.js"; 27 + import { ReplicatedRepoReader } from "./replication/replicated-repo-reader.js"; 28 + import { PolicyEngine } from "./policy/engine.js"; 29 + import { HttpChallengeTransport } from "./replication/challenge-response/http-transport.js"; 30 + import { Libp2pChallengeTransport } from "./replication/challenge-response/libp2p-transport.js"; 31 + import { FailoverChallengeTransport } from "./replication/challenge-response/failover-transport.js"; 32 + import type { ChallengeTransport } from "./replication/challenge-response/transport.js"; 33 + import type { Libp2p } from "@libp2p/interface"; 34 + import { RateLimiter } from "./rate-limiter.js"; 35 + import { createOAuthClient, type OAuthClientManager } from "./oauth/client.js"; 36 + import { PdsClient } from "./oauth/pds-client.js"; 37 + 38 + export interface StartServerOpts { 39 + /** Override DID resolver (e.g. mock resolver for tests). */ 40 + didResolver?: DidResolver; 41 + } 42 + 43 + export interface ServerHandle { 44 + url: string; 45 + port: number; 46 + close: () => Promise<void>; 47 + replicationManager?: ReplicationManager; 48 + ipfsService?: IpfsService; 49 + } 50 + 51 + export async function startServer( 52 + config: Config, 53 + opts?: StartServerOpts, 54 + ): Promise<ServerHandle> { 55 + // Initialize rate limiter 56 + let rateLimiter: RateLimiter | undefined; 57 + if (config.RATE_LIMIT_ENABLED) { 58 + rateLimiter = new RateLimiter(); 59 + rateLimiter.startCleanup(60_000); 60 + } 61 + 62 + // Ensure data directory exists 63 + const dataDir = resolve(config.DATA_DIR); 64 + mkdirSync(dataDir, { recursive: true }); 65 + 66 + // Initialize SQLite database 67 + const dbPath = resolve(dataDir, "pds.db"); 68 + const db = new Database(dbPath); 69 + db.pragma("journal_mode = WAL"); 70 + db.pragma("synchronous = NORMAL"); 71 + 72 + // Initialize IPFS service (if enabled) 73 + let ipfsService: IpfsService | undefined; 74 + if (config.IPFS_ENABLED) { 75 + const ipfsBlocksPath = resolve(dataDir, "ipfs", "blocks"); 76 + const ipfsDatastorePath = resolve(dataDir, "ipfs", "datastore"); 77 + mkdirSync(ipfsBlocksPath, { recursive: true }); 78 + mkdirSync(ipfsDatastorePath, { recursive: true }); 79 + 80 + ipfsService = new IpfsService({ 81 + blocksPath: ipfsBlocksPath, 82 + datastorePath: ipfsDatastorePath, 83 + networking: config.IPFS_NETWORKING, 84 + }); 85 + } 86 + 87 + // Initialize repo manager (requires DID + signing key) 88 + let repoManager: RepoManager | undefined; 89 + let blobStore: BlobStore | undefined; 90 + if (config.DID && config.SIGNING_KEY) { 91 + repoManager = new RepoManager(db, config as typeof config & { DID: string; SIGNING_KEY: string }); 92 + blobStore = new BlobStore(dataDir, config.DID); 93 + repoManager.init(blobStore, ipfsService, ipfsService); 94 + } 95 + 96 + // Initialize firehose 97 + const firehose = new Firehose(repoManager!); 98 + 99 + // Initialize DID resolver (allow override for tests) 100 + const didResolver = opts?.didResolver ?? new DidResolver({ 101 + didCache: new InMemoryDidCache(), 102 + }); 103 + 104 + // Load policy engine if configured 105 + let policyEngine: PolicyEngine | undefined; 106 + if (config.POLICY_FILE) { 107 + const policySet = loadPolicies(config); 108 + if (policySet) { 109 + policyEngine = new PolicyEngine(policySet); 110 + console.log(pc.dim(` Policies: loaded ${policySet.policies.length} from ${config.POLICY_FILE}`)); 111 + } 112 + } 113 + 114 + // Initialize OAuth client (if enabled) 115 + let oauthClientManager: OAuthClientManager | undefined; 116 + let pdsClient: PdsClient | undefined; 117 + if (config.OAUTH_ENABLED) { 118 + oauthClientManager = await createOAuthClient(db, config); 119 + if (config.DID) { 120 + pdsClient = new PdsClient(oauthClientManager.client, config.DID); 121 + } 122 + } 123 + 124 + // Initialize replication manager and replicated repo reader (if IPFS enabled) 125 + let replicationManager: ReplicationManager | undefined; 126 + let replicatedRepoReader: ReplicatedRepoReader | undefined; 127 + if (ipfsService && repoManager) { 128 + replicationManager = new ReplicationManager( 129 + db, 130 + config, 131 + repoManager, 132 + ipfsService, 133 + ipfsService, 134 + didResolver, 135 + undefined, 136 + undefined, 137 + policyEngine, 138 + pdsClient, 139 + ); 140 + replicatedRepoReader = new ReplicatedRepoReader( 141 + ipfsService, 142 + replicationManager.getSyncStorage(), 143 + ); 144 + replicationManager.setReplicatedRepoReader(replicatedRepoReader); 145 + } 146 + 147 + // Pass rate limiter to IPFS service for gossipsub rate limiting 148 + if (rateLimiter && ipfsService) { 149 + ipfsService.setRateLimiter(rateLimiter); 150 + } 151 + 152 + // Create Hono app 153 + const app = createApp( 154 + config, 155 + firehose, 156 + ipfsService, 157 + ipfsService, 158 + blobStore, 159 + replicationManager, 160 + replicatedRepoReader, 161 + repoManager, 162 + rateLimiter, 163 + oauthClientManager, 164 + pdsClient, 165 + ); 166 + 167 + // Create HTTP server using @hono/node-server's request listener 168 + const requestListener = getRequestListener(app.fetch); 169 + const httpServer = createServer(requestListener); 170 + 171 + // Set up WebSocket server for firehose with per-IP connection limits 172 + const wss = new WebSocketServer({ noServer: true }); 173 + const firehoseConnections = new Map<string, number>(); 174 + const maxFirehosePerIp = config.RATE_LIMIT_FIREHOSE_PER_IP; 175 + 176 + httpServer.on("upgrade", (request, socket, head) => { 177 + const url = new URL(request.url ?? "/", `http://localhost:${config.PORT}`); 178 + 179 + if (url.pathname === "/xrpc/com.atproto.sync.subscribeRepos") { 180 + const ip = 181 + (request.headers["x-forwarded-for"] as string)?.split(",")[0]?.trim() ?? 182 + request.headers["x-real-ip"] as string ?? 183 + "unknown"; 184 + 185 + const current = firehoseConnections.get(ip) ?? 0; 186 + if (config.RATE_LIMIT_ENABLED && current >= maxFirehosePerIp) { 187 + socket.destroy(); 188 + return; 189 + } 190 + 191 + wss.handleUpgrade(request, socket, head, (ws) => { 192 + firehoseConnections.set(ip, (firehoseConnections.get(ip) ?? 0) + 1); 193 + ws.on("close", () => { 194 + const count = (firehoseConnections.get(ip) ?? 1) - 1; 195 + if (count <= 0) { 196 + firehoseConnections.delete(ip); 197 + } else { 198 + firehoseConnections.set(ip, count); 199 + } 200 + }); 201 + firehose.handleConnection(ws, request); 202 + }); 203 + } else { 204 + socket.destroy(); 205 + } 206 + }); 207 + 208 + // Backfill existing blocks to IPFS 209 + async function backfillIpfs(): Promise<void> { 210 + if (!ipfsService) return; 211 + 212 + const rows = db 213 + .prepare("SELECT cid, bytes FROM blocks") 214 + .all() as Array<{ cid: string; bytes: Buffer }>; 215 + 216 + if (rows.length === 0) return; 217 + 218 + let count = 0; 219 + for (const row of rows) { 220 + const hasIt = await ipfsService.hasBlock(row.cid); 221 + if (!hasIt) { 222 + await ipfsService.putBlock(row.cid, new Uint8Array(row.bytes)); 223 + count++; 224 + } 225 + } 226 + 227 + if (count > 0) { 228 + console.log(pc.dim(` IPFS: backfilled ${count} blocks`)); 229 + } 230 + } 231 + 232 + // Start the HTTP server and async initialization 233 + const { url, port } = await new Promise<{ url: string; port: number }>((resolveStart) => { 234 + httpServer.listen(config.PORT, async () => { 235 + const addr = httpServer.address() as { port: number }; 236 + const actualPort = addr.port; 237 + const url = `http://localhost:${actualPort}`; 238 + 239 + console.log( 240 + pc.bold(`\nP2PDS running at `) + 241 + pc.cyan(url), 242 + ); 243 + if (config.DID) { 244 + console.log(pc.dim(` DID: ${config.DID}`)); 245 + } 246 + if (config.HANDLE) { 247 + console.log(pc.dim(` Handle: @${config.HANDLE}`)); 248 + } 249 + console.log(pc.dim(` Data: ${dataDir}`)); 250 + if (oauthClientManager) { 251 + if (pdsClient && await pdsClient.hasSession().catch(() => false)) { 252 + console.log(pc.dim(` OAuth: session active for ${config.DID}`)); 253 + } else { 254 + console.log(pc.dim(` OAuth: enabled (no active session)`)); 255 + } 256 + } 257 + 258 + // Start IPFS after HTTP server is listening (IPFS startup can be slow) 259 + if (ipfsService) { 260 + try { 261 + console.log(pc.dim(` IPFS: starting...`)); 262 + await ipfsService.start(); 263 + const peerId = ipfsService.getPeerId(); 264 + if (peerId) { 265 + console.log(pc.dim(` PeerID: ${peerId}`)); 266 + } else { 267 + console.log(pc.dim(` IPFS: local blockstore only (networking disabled)`)); 268 + } 269 + await backfillIpfs(); 270 + // Start replication after IPFS is ready 271 + if (replicationManager) { 272 + try { 273 + await replicationManager.init(); 274 + replicationManager.startPeriodicSync(); 275 + const trackedDids = replicationManager.getReplicateDids(); 276 + console.log(pc.dim(` Replication: tracking ${trackedDids.length} DIDs`)); 277 + // Start firehose subscription for real-time updates 278 + if (config.FIREHOSE_ENABLED) { 279 + replicationManager.startFirehose(); 280 + } 281 + // Start challenge scheduler if policy engine is available 282 + if (policyEngine) { 283 + const libp2pNode = ipfsService?.getLibp2p(); 284 + let challengeTransport: ChallengeTransport; 285 + if (libp2pNode) { 286 + const libp2pTransport = new Libp2pChallengeTransport(libp2pNode as Libp2p); 287 + const httpTransport = new HttpChallengeTransport(); 288 + const syncStorage = replicationManager.getSyncStorage(); 289 + challengeTransport = new FailoverChallengeTransport( 290 + libp2pTransport, 291 + httpTransport, 292 + { 293 + resolveEndpoint: (httpEndpoint) => syncStorage.getMultiaddrForPdsEndpoint(httpEndpoint), 294 + onFallback: (endpoint, error) => { 295 + console.log(pc.dim(` Challenge: libp2p failed for ${endpoint}, falling back to HTTP: ${error.message}`)); 296 + replicationManager!.refreshPeerInfoForEndpoint(endpoint); 297 + }, 298 + }, 299 + ); 300 + } else { 301 + challengeTransport = new HttpChallengeTransport(); 302 + } 303 + replicationManager.startChallengeScheduler(challengeTransport); 304 + console.log(pc.dim(` Challenges: scheduler started (${libp2pNode ? "libp2p+HTTP failover" : "HTTP"} transport)`)); 305 + } 306 + } catch (err) { 307 + console.error(pc.red(` Replication startup failed:`), err); 308 + } 309 + } 310 + } catch (err) { 311 + console.error(pc.red(` IPFS startup failed:`), err); 312 + } 313 + } 314 + 315 + console.log(); 316 + resolveStart({ url, port: actualPort }); 317 + }); 318 + }); 319 + 320 + // Build close function 321 + const close = async (): Promise<void> => { 322 + if (rateLimiter) { 323 + rateLimiter.stop(); 324 + } 325 + if (replicationManager) { 326 + replicationManager.stop(); 327 + } 328 + wss.close(); 329 + await new Promise<void>((res) => httpServer.close(() => res())); 330 + if (ipfsService) { 331 + await ipfsService.stop(); 332 + } 333 + db.close(); 334 + }; 335 + 336 + return { 337 + url, 338 + port, 339 + close, 340 + replicationManager, 341 + ipfsService, 342 + }; 343 + }